Try to fix parallel fulfillment issues

This commit is contained in:
Florian Bach 2023-05-06 11:08:44 +02:00
parent ff2d9cf2a7
commit 999354dde9
4 changed files with 81 additions and 190 deletions

View File

@ -99,6 +99,7 @@ __version__ = PLUGIN_VERSION = ".".join([str(x)for x in PLUGIN_VERSION_TUPLE])
from calibre.utils.config import config_dir # type: ignore from calibre.utils.config import config_dir # type: ignore
from calibre.utils.lock import SingleInstance, singleinstance # type: ignore
import os, shutil, traceback, sys, time, io, random import os, shutil, traceback, sys, time, io, random
import zipfile import zipfile
@ -171,9 +172,10 @@ class ACSMInput(FileTypePlugin):
os.mkdir(self.pluginsdir) os.mkdir(self.pluginsdir)
# If the old DeACSM plugin still exists, rename it to BAK or something so it doesn't load. if singleinstance("__acsm_rename_old_plugin"):
if os.path.exists(os.path.join(self.pluginsdir, "DeACSM.zip")): # If the old DeACSM plugin still exists, rename it to BAK or something so it doesn't load.
os.rename(os.path.join(self.pluginsdir, "DeACSM.zip"), os.path.join(self.pluginsdir, "DeACSM.BAK")) if os.path.exists(os.path.join(self.pluginsdir, "DeACSM.zip")):
os.rename(os.path.join(self.pluginsdir, "DeACSM.zip"), os.path.join(self.pluginsdir, "DeACSM.BAK"))
try: try:
@ -237,54 +239,39 @@ class ACSMInput(FileTypePlugin):
print("Module update from \"{0}\" to \"{1}\", extracting ...".format(id, id_plugin)) print("Module update from \"{0}\" to \"{1}\", extracting ...".format(id, id_plugin))
# Something changed, extract modules. # Something changed, extract modules.
if not singleinstance("__acsm_extracting modules"):
print("Skipping because another instance is already doing that.")
else:
if os.path.exists(self.moddir):
shutil.rmtree(self.moddir, ignore_errors=True)
if os.path.exists(self.moddir): os.mkdir(self.moddir)
shutil.rmtree(self.moddir, ignore_errors=True)
rand_path = self.moddir + str(random.randint(0, 1000000000)) names = ["oscrypto.zip", "asn1crypto.zip"]
ctr = 0
while os.path.exists(rand_path):
# None of this code should be necessary since a random number between 0 and a billion should be unique
# enough, but apparently not. Make new ones until we find one that's not in use.
# Should be using Calibre's TemporaryFile class but then I can't be certain it's on the same drive...
ctr += 1
if (ctr > 1000):
print("{0} v{1}: Tried a thousand times to get a temp dir ...".format(PLUGIN_NAME, PLUGIN_VERSION))
raise Exception("Hey!")
rand_path = self.moddir + str(random.randint(0, 1000000000)) # oscrypto is needed to parse the pkcs12 data from Adobe.
# asn1crypto is a dependency of oscrypto.
lib_dict = self.load_resources(names)
os.mkdir(rand_path) for entry, data in lib_dict.items():
try:
with zipfile.ZipFile(io.BytesIO(data), 'r') as ref:
ref.extractall(self.moddir)
names = ["oscrypto.zip", "asn1crypto.zip"] except:
print("{0} v{1}: Exception when copying needed library files".format(PLUGIN_NAME, PLUGIN_VERSION))
# oscrypto is needed to parse the pkcs12 data from Adobe. traceback.print_exc()
# asn1crypto is a dependency of oscrypto. pass
lib_dict = self.load_resources(names)
for entry, data in lib_dict.items():
try:
with zipfile.ZipFile(io.BytesIO(data), 'r') as ref:
ref.extractall(rand_path)
except:
print("{0} v{1}: Exception when copying needed library files".format(PLUGIN_NAME, PLUGIN_VERSION))
traceback.print_exc()
pass
# Write module ID # Write module ID
if id_plugin is not None: if id_plugin is not None:
mod_file = os.path.join(rand_path, "module_id.txt") mod_file = os.path.join(self.moddir, "module_id.txt")
f = open(mod_file, "w") f = open(mod_file, "w")
f.write(id_plugin) f.write(id_plugin)
f.close() f.close()
# Rename temporary path to actual module path so this will be used next time.
os.rename(rand_path, self.moddir)
sys.path.insert(0, os.path.join(self.moddir, "oscrypto")) sys.path.insert(0, os.path.join(self.moddir, "oscrypto"))
sys.path.insert(0, os.path.join(self.moddir, "asn1crypto")) sys.path.insert(0, os.path.join(self.moddir, "asn1crypto"))
@ -441,89 +428,62 @@ class ACSMInput(FileTypePlugin):
print("{0} v{1}: Error: Unsupported file type ...".format(PLUGIN_NAME, PLUGIN_VERSION)) print("{0} v{1}: Error: Unsupported file type ...".format(PLUGIN_NAME, PLUGIN_VERSION))
return None return None
def is_blocked(self):
import calibre_plugins.deacsm.prefs as prefs # type: ignore
deacsmprefs = prefs.ACSMInput_Prefs()
return deacsmprefs['fulfillment_block_token'] != 0
def unblock(self):
import calibre_plugins.deacsm.prefs as prefs # type: ignore
deacsmprefs = prefs.ACSMInput_Prefs()
my_token = deacsmprefs["fulfillment_block_token"]
deacsmprefs.refresh()
if (my_token == deacsmprefs["fulfillment_block_token"]):
# Only unlock if this is my own lock
deacsmprefs.set("fulfillment_block_token", 0)
deacsmprefs.set("fulfillment_block_time", 0)
deacsmprefs.commit()
def wait_and_block(self):
random_identifier = None
import calibre_plugins.deacsm.prefs as prefs # type: ignore
deacsmprefs = prefs.ACSMInput_Prefs()
while True:
deacsmprefs.refresh()
if deacsmprefs["fulfillment_block_token"] == 0:
random_identifier = random.getrandbits(64)
#print("setting block token to %s" % (str(random_identifier)))
deacsmprefs.set("fulfillment_block_token", random_identifier)
deacsmprefs.commit()
deacsmprefs.refresh()
if random_identifier != deacsmprefs["fulfillment_block_token"]:
# print("we broke another thread's global token")
continue
deacsmprefs.set("fulfillment_block_time", int(time.time() * 1000))
#print("Obtained lock!")
return True
else:
# Token already exists, wait for it to finish ...
current_time = int(time.time() * 1000)
saved_time = deacsmprefs["fulfillment_block_time"]
if saved_time + 60000 < current_time:
# Already locked since 60s, assume error
print("{0} v{1}: Looks like the lock was stuck, removing lock {2} ...".format(PLUGIN_NAME, PLUGIN_VERSION, deacsmprefs["fulfillment_block_token"]))
self.unblock()
time.sleep(0.02)
continue
def run(self, path_to_ebook): def run(self, path_to_ebook):
# type: (str) -> str # type: (str) -> str
# This code gets called by Calibre with a path to the new book file.
# Make sure there's only a single instance of this function running ever.
# Calibre loves to run these in parallel when many ACSM files are being imported.
# However that A) messes with the loan records written to a file, and B) that behaviour
# is significantly different from ADE so Adobe could use that to detect this plugin.
# So, we're trying to use Calibre's singleinstance feature to prevent that.
counter = 0
thread_id = -1
try: try:
# This code gets called by Calibre with a path to the new book file. import threading
thread_id = threading.current_thread().ident
except:
pass
while True:
with SingleInstance("__acsm_plugin_execute_run_acsm_file") as si:
if si:
return self.run_single(path_to_ebook)
else:
counter += 1
if (counter % 100 == 0):
print("Thread {0} still waiting for lock, attempt {1}".format(thread_id, counter))
time.sleep(0.1)
def run_single(self, path_to_ebook):
# type: (str) -> str
try:
# We need to check if it's an ACSM file # We need to check if it's an ACSM file
import calibre_plugins.deacsm.prefs as prefs # type: ignore import calibre_plugins.deacsm.prefs as prefs # type: ignore
deacsmprefs = prefs.ACSMInput_Prefs() deacsmprefs = prefs.ACSMInput_Prefs()
if deacsmprefs['allow_parallel_fulfillment'] == False:
self.wait_and_block()
print("{0} v{1}: Trying to parse file {2}".format(PLUGIN_NAME, PLUGIN_VERSION, os.path.basename(path_to_ebook))) print("{0} v{1}: Trying to parse file {2}".format(PLUGIN_NAME, PLUGIN_VERSION, os.path.basename(path_to_ebook)))
ext = os.path.splitext(path_to_ebook)[1].lower() ext = os.path.splitext(path_to_ebook)[1].lower()
if (ext != ".acsm"): if (ext != ".acsm"):
print("{0} v{1}: That's not an ACSM, returning (is {2} instead)... ".format(PLUGIN_NAME, PLUGIN_VERSION, ext)) print("{0} v{1}: That's not an ACSM, returning (is {2} instead)... ".format(PLUGIN_NAME, PLUGIN_VERSION, ext))
self.unblock()
return path_to_ebook return path_to_ebook
# We would fulfill this now, but first perform some sanity checks ... # We would fulfill this now, but first perform some sanity checks ...
if not self.ADE_sanity_check(): if not self.ADE_sanity_check():
print("{0} v{1}: ADE auth is missing or broken ".format(PLUGIN_NAME, PLUGIN_VERSION)) print("{0} v{1}: ADE auth is missing or broken ".format(PLUGIN_NAME, PLUGIN_VERSION))
self.unblock()
return path_to_ebook return path_to_ebook
@ -532,7 +492,6 @@ class ACSMInput(FileTypePlugin):
if not are_ade_version_lists_valid(): if not are_ade_version_lists_valid():
print("{0} v{1}: ADE version list mismatch, please open a bug report.".format(PLUGIN_NAME, PLUGIN_VERSION)) print("{0} v{1}: ADE version list mismatch, please open a bug report.".format(PLUGIN_NAME, PLUGIN_VERSION))
self.unblock()
return path_to_ebook return path_to_ebook
print("{0} v{1}: Try to fulfill ...".format(PLUGIN_NAME, PLUGIN_VERSION)) print("{0} v{1}: Try to fulfill ...".format(PLUGIN_NAME, PLUGIN_VERSION))
@ -643,13 +602,10 @@ class ACSMInput(FileTypePlugin):
# Return path - either the original one or the one modified by the other plugins. # Return path - either the original one or the one modified by the other plugins.
self.unblock()
return rpl return rpl
self.unblock()
return path_to_ebook return path_to_ebook
except: except:
self.unblock()
traceback.print_exc() traceback.print_exc()
return path_to_ebook return path_to_ebook

View File

@ -64,7 +64,6 @@ class ConfigWidget(QWidget):
self.tempdeacsmprefs['notify_fulfillment'] = self.deacsmprefs['notify_fulfillment'] self.tempdeacsmprefs['notify_fulfillment'] = self.deacsmprefs['notify_fulfillment']
self.tempdeacsmprefs['detailed_logging'] = self.deacsmprefs['detailed_logging'] self.tempdeacsmprefs['detailed_logging'] = self.deacsmprefs['detailed_logging']
self.tempdeacsmprefs['delete_acsm_after_fulfill'] = self.deacsmprefs['delete_acsm_after_fulfill'] self.tempdeacsmprefs['delete_acsm_after_fulfill'] = self.deacsmprefs['delete_acsm_after_fulfill']
self.tempdeacsmprefs['allow_parallel_fulfillment'] = self.deacsmprefs['allow_parallel_fulfillment']
self.tempdeacsmprefs['list_of_rented_books'] = self.deacsmprefs['list_of_rented_books'] self.tempdeacsmprefs['list_of_rented_books'] = self.deacsmprefs['list_of_rented_books']
@ -186,10 +185,6 @@ class ConfigWidget(QWidget):
self.chkDeleteAfterFulfill.toggled.connect(self.toggle_acsm_delete) self.chkDeleteAfterFulfill.toggled.connect(self.toggle_acsm_delete)
layout.addWidget(self.chkDeleteAfterFulfill) layout.addWidget(self.chkDeleteAfterFulfill)
self.chkParallelFulfill = QtGui.QCheckBox("Allow parallel fulfillment")
self.chkParallelFulfill.setToolTip("Default: True\n\nIf this is enabled (which was the default in previous versions), \nthe plugin will import multiple ACSM files simultaneously when you add more than one.\n\nIf this is disabled, it will add them one after another like ADE.")
self.chkParallelFulfill.setChecked(self.tempdeacsmprefs["allow_parallel_fulfillment"])
layout.addWidget(self.chkParallelFulfill)
# Key shortcut Ctrl+Shift+D / Cmd+Shift+D to remove authorization, just like in ADE. # Key shortcut Ctrl+Shift+D / Cmd+Shift+D to remove authorization, just like in ADE.
self.deauthShortcut = QShortcut(QKeySequence("Ctrl+Shift+D"), self) self.deauthShortcut = QShortcut(QKeySequence("Ctrl+Shift+D"), self)
@ -1284,7 +1279,6 @@ class ConfigWidget(QWidget):
self.deacsmprefs.set('notify_fulfillment', self.chkNotifyFulfillment.isChecked()) self.deacsmprefs.set('notify_fulfillment', self.chkNotifyFulfillment.isChecked())
self.deacsmprefs.set('detailed_logging', self.chkDetailedLogging.isChecked()) self.deacsmprefs.set('detailed_logging', self.chkDetailedLogging.isChecked())
self.deacsmprefs.set('delete_acsm_after_fulfill', self.chkDeleteAfterFulfill.isChecked()) self.deacsmprefs.set('delete_acsm_after_fulfill', self.chkDeleteAfterFulfill.isChecked())
self.deacsmprefs.set('allow_parallel_fulfillment', self.chkParallelFulfill.isChecked())
self.deacsmprefs.writeprefs() self.deacsmprefs.writeprefs()
def load_resource(self, name): def load_resource(self, name):

View File

@ -581,77 +581,24 @@ def addLoanRecordToConfigFile(new_loan_record):
return False return False
error_counter = 0 # Check if that exact loan is already in the list, and if so, delete it:
last_token = None done = False
random_identifier = None while not done:
done = True
while True: for book in deacsmprefs["list_of_rented_books"]:
if book["loanID"] == new_loan_record["loanID"]:
if error_counter >= 10: done = False
print("Took us 10 attempts to acquire loan token lock, still didn't work.") deacsmprefs["list_of_rented_books"].remove(book)
print("(still the same token %s)" % (deacsmprefs["loan_identifier_token"])) break
print("If you see this error message please open a bug report.")
# "Mark" the current access with a random token, to prevent multiple instances # Add all necessary information for a book return to the JSON array.
# of the plugin overwriting eachother's data. # The config widget can then read this and present a list of not-yet-returned
deacsmprefs.refresh() # books, and can then return them.
if deacsmprefs["loan_identifier_token"] == 0: # Also, the config widget is responsible for cleaning up that list once a book's validity period is up.
random_identifier = random.getrandbits(64) deacsmprefs["list_of_rented_books"].append(new_loan_record)
deacsmprefs.set("loan_identifier_token", random_identifier)
deacsmprefs.commit()
deacsmprefs.refresh()
if random_identifier != deacsmprefs["loan_identifier_token"]:
#print("we broke another thread's token, try again")
last_token = deacsmprefs["loan_identifier_token"]
error_counter = error_counter + 1
continue
else:
if last_token != deacsmprefs["loan_identifier_token"]:
#print("Token changed in the meantime ...")
# Give it another 5 tries
error_counter = max(0, error_counter - 5)
pass
last_token = deacsmprefs["loan_identifier_token"] return True
#print("waiting on another thread ...")
sleeptime = random.randrange(2, 10) / 1000
print(str(sleeptime))
time.sleep(sleeptime)
error_counter = error_counter + 1
continue
# Okay, now this thread can "use" the config list, and no other thread should overwrite it ...
# Check if that exact loan is already in the list, and if so, delete it:
done = False
while not done:
done = True
for book in deacsmprefs["list_of_rented_books"]:
if book["loanID"] == new_loan_record["loanID"]:
done = False
deacsmprefs["list_of_rented_books"].remove(book)
break
# Add all necessary information for a book return to the JSON array.
# The config widget can then read this and present a list of not-yet-returned
# books, and can then return them.
# Also, the config widget is responsible for cleaning up that list once a book's validity period is up.
deacsmprefs["list_of_rented_books"].append(new_loan_record)
# Okay, now we added our loan record.
# Remove the identifier token so other threads can use the config again:
deacsmprefs.commit()
deacsmprefs.refresh()
if deacsmprefs["loan_identifier_token"] != random_identifier:
print("Another thread stole the loan token while we were working with it - that's not supposed to happen ...")
print("If you see this message, please open a bug report.")
return False
deacsmprefs.set("loan_identifier_token", 0)
deacsmprefs.commit()
return True
def tryReturnBook(bookData): def tryReturnBook(bookData):

View File

@ -35,12 +35,6 @@ class ACSMInput_Prefs():
self.deacsmprefs.defaults['notify_fulfillment'] = True self.deacsmprefs.defaults['notify_fulfillment'] = True
self.deacsmprefs.defaults['detailed_logging'] = False self.deacsmprefs.defaults['detailed_logging'] = False
self.deacsmprefs.defaults['delete_acsm_after_fulfill'] = False self.deacsmprefs.defaults['delete_acsm_after_fulfill'] = False
self.deacsmprefs.defaults['allow_parallel_fulfillment'] = True
self.deacsmprefs.defaults['loan_identifier_token'] = 0
self.deacsmprefs.defaults['fulfillment_block_token'] = 0
self.deacsmprefs.defaults['fulfillment_block_time'] = 0
self.deacsmprefs.defaults['list_of_rented_books'] = [] self.deacsmprefs.defaults['list_of_rented_books'] = []