from lxml import etree
import base64
import random
import time
#@@CALIBRE_COMPAT_CODE@@
from libadobe import addNonce, sign_node, get_cert_from_pkcs12, sendRequestDocu, sendRequestDocuRC, sendHTTPRequest
from libadobe import get_devkey_path, get_device_path, get_activation_xml_path
from libadobe import VAR_VER_SUPP_VERSIONS, VAR_VER_SUPP_CONFIG_NAMES, VAR_VER_HOBBES_VERSIONS
from libadobe import VAR_VER_BUILD_IDS, VAR_VER_USE_DIFFERENT_NOTIFICATION_XML_ORDER
def buildFulfillRequest(acsm):
adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag)
activationxml = etree.parse(get_activation_xml_path())
devicexml = etree.parse(get_device_path())
user_uuid = activationxml.find("./%s/%s" % (adNS("credentials"), adNS("user"))).text
device_uuid = activationxml.find("./%s/%s" % (adNS("activationToken"), adNS("device"))).text
try:
fingerprint = None
device_type = None
fingerprint = activationxml.find("./%s/%s" % (adNS("activationToken"), adNS("fingerprint"))).text
device_type = activationxml.find("./%s/%s" % (adNS("activationToken"), adNS("deviceType"))).text
except:
pass
if (fingerprint is None or fingerprint == "" or device_type is None or device_type == ""):
# This should usually never happen with a proper activation, but just in case it does,
# I'll leave this code in - it loads the fingerprint from the device data instead.
fingerprint = devicexml.find("./%s" % (adNS("fingerprint"))).text
device_type = devicexml.find("./%s" % (adNS("deviceType"))).text
version = None
clientOS = None
clientLocale = None
ver = devicexml.findall("./%s" % (adNS("version")))
for f in ver:
if f.get("name") == "hobbes":
version = f.get("value")
elif f.get("name") == "clientOS":
clientOS = f.get("value")
elif f.get("name") == "clientLocale":
clientLocale = f.get("value")
# Find matching client version depending on the Hobbes version.
# This way we don't need to store and re-load it for each fulfillment.
try:
v_idx = VAR_VER_HOBBES_VERSIONS.index(version)
clientVersion = VAR_VER_SUPP_VERSIONS[v_idx]
except:
# Version not present, probably the "old" 10.0.4 entry.
# As 10.X is in the 3.0 range, assume we're on ADE 3.0
clientVersion = "3.0.1.91394"
if clientVersion == "ADE WIN 9,0,1131,27":
# Ancient ADE 1.7.2 does this request differently
request = "\n"
request += "%s\n" % (user_uuid)
request += "%s\n" % (device_uuid)
request += "%s\n" % (device_type)
request += etree.tostring(acsm, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8")
request += ""
return request, False
else:
request = ""
request += ""
request += ""
request += "%s" % (user_uuid)
request += "%s" % (device_uuid)
request += "%s" % (device_type)
request += etree.tostring(acsm, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8")
request += ""
request += "%s" % (version)
request += "%s" % (clientOS)
request += "%s" % (clientLocale)
request += "%s" % (clientVersion)
request += "%s" % (device_type)
request += "%s" % ("ADOBE Digitial Editions")
# YES, this typo ("Digitial" instead of "Digital") IS present in ADE!!
request += "%s" % (fingerprint)
request += ""
request += "%s" % (user_uuid)
request += "%s" % (device_uuid)
request += ""
request += ""
request += ""
return request, True
def buildInitLicenseServiceRequest(authURL):
# type: (str) -> str
adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag)
NSMAP = { "adept" : "http://ns.adobe.com/adept" }
etree.register_namespace("adept", NSMAP["adept"])
activationxml = etree.parse(get_activation_xml_path())
user_uuid = activationxml.find("./%s/%s" % (adNS("credentials"), adNS("user"))).text
ret = ""
ret += ""
ret += ""
ret += "%s" % (authURL)
ret += addNonce()
ret += "%s" % (user_uuid)
ret += ""
NSMAP = { "adept" : "http://ns.adobe.com/adept" }
etree.register_namespace("adept", NSMAP["adept"])
req_xml = etree.fromstring(ret)
signature = sign_node(req_xml)
if (signature is None):
return None
etree.SubElement(req_xml, etree.QName(NSMAP["adept"], "signature")).text = signature
return "\n" + etree.tostring(req_xml, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8")
def getDecryptedCert(pkcs12_b64_string = None):
if pkcs12_b64_string is None:
activationxml = etree.parse(get_activation_xml_path())
adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag)
pkcs12_b64_string = activationxml.find("./%s/%s" % (adNS("credentials"), adNS("pkcs12"))).text
pkcs12_data = base64.b64decode(pkcs12_b64_string)
try:
from libadobe import devkey_bytes as devkey_adobe
except:
pass
if devkey_adobe is not None:
devkey_bytes = devkey_adobe
else:
f = open(get_devkey_path(), "rb")
devkey_bytes = f.read()
f.close()
try:
return get_cert_from_pkcs12(pkcs12_data, base64.b64encode(devkey_bytes))
except:
return None
def buildAuthRequest():
activationxml = etree.parse(get_activation_xml_path())
adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag)
my_cert = getDecryptedCert()
if my_cert is None:
print("Can't decrypt pkcs12 with devkey!")
return None
ret = "\n"
ret += "\n"
ret += "%s\n" % (activationxml.find("./%s/%s" % (adNS("credentials"), adNS("user"))).text)
ret += "%s\n" % (base64.b64encode(my_cert).decode("utf-8"))
ret += "%s\n" % (activationxml.find("./%s/%s" % (adNS("credentials"), adNS("licenseCertificate"))).text)
ret += "%s\n" % (activationxml.find("./%s/%s" % (adNS("credentials"), adNS("authenticationCertificate"))).text)
ret += ""
return ret
def doOperatorAuth(operatorURL):
# type: (str) -> str
auth_req = buildAuthRequest()
if auth_req is None:
return "Failed to create auth request"
authURL = operatorURL
if authURL.endswith("Fulfill"):
authURL = authURL.replace("/Fulfill", "")
replyData = sendRequestDocu(auth_req, authURL + "/Auth").decode("utf-8")
if not " str
adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag)
NSMAP = { "adept" : "http://ns.adobe.com/adept" }
etree.register_namespace("adept", NSMAP["adept"])
activationxml = etree.parse(get_activation_xml_path())
try:
operator_url_list = activationxml.findall("./%s/%s" % (adNS("operatorURLList"), adNS("operatorURL")))
for member in operator_url_list:
if member.text.strip() == operatorURL:
#print("Already authenticated to operator")
return None
except:
pass
ret = doOperatorAuth(operatorURL)
if (ret is not None):
return "doOperatorAuth error: %s" % ret
# Check if list exists:
list = activationxml.find("./%s" % (adNS("operatorURLList")))
user_uuid = activationxml.find("./%s/%s" % (adNS("credentials"), adNS("user"))).text
if list is None:
x = etree.SubElement(activationxml.getroot(), etree.QName(NSMAP["adept"], "operatorURLList"), nsmap=NSMAP)
etree.SubElement(x, etree.QName(NSMAP["adept"], "user")).text = user_uuid
list = activationxml.find("./%s" % (adNS("operatorURLList")))
if list is None:
return "Err, this list should not be none right now ..."
etree.SubElement(list, etree.QName(NSMAP["adept"], "operatorURL")).text = operatorURL
f = open(get_activation_xml_path(), "w")
f.write("\n")
f.write(etree.tostring(activationxml, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8"))
f.close()
return None
def buildRights(license_token_node):
ret = "\n"
ret += "\n"
# Add license token
ret += etree.tostring(license_token_node, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8")
ret += "\n"
NSMAP = { "adept" : "http://ns.adobe.com/adept" }
adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag)
lic_token_url = license_token_node.find("./%s" % (adNS("licenseURL"))).text
ret += "%s\n" % lic_token_url
# Get cert for this license URL:
activationxml = etree.parse(get_activation_xml_path())
try:
licInfo = activationxml.findall("./%s/%s" % (adNS("licenseServices"), adNS("licenseServiceInfo")))
found = False
for member in licInfo:
if member.find("./%s" % (adNS("licenseURL"))).text == lic_token_url:
ret += "%s\n" % (member.find("./%s" % (adNS("certificate"))).text)
found = True
break
except:
pass
if not found:
print("Did not find the licenseService certificate in the activation data.")
print("This usually means it failed to download from the distributor's servers.")
print("Please try to download an ACSM book from the Adobe Sample Library, then if that was successful, ")
print("try your ACSM book file again.")
return None
ret += "\n"
ret += "\n"
return ret
def fulfill(acsm_file, do_notify = False):
verbose_logging = False
try:
import calibre_plugins.deacsm.prefs as prefs
deacsmprefs = prefs.ACSMInput_Prefs()
verbose_logging = deacsmprefs["detailed_logging"]
except:
pass
# Get pkcs12:
pkcs12 = None
acsmxml = None
try:
activationxml = etree.parse(get_activation_xml_path())
adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag)
pkcs12 = activationxml.find("./%s/%s" % (adNS("credentials"), adNS("pkcs12"))).text
except:
return False, "Activation not found or invalid"
if pkcs12 is None or len(pkcs12) == 0:
return False, "Activation missing"
try:
acsmxml = etree.parse(acsm_file)
except:
return False, "ACSM not found or invalid"
#print(etree.tostring(acsmxml, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8"))
adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag)
dcNS = lambda tag: '{%s}%s' % ('http://purl.org/dc/elements/1.1/', tag)
try:
mimetype = acsmxml.find("./%s/%s/%s" % (adNS("resourceItemInfo"), adNS("metadata"), dcNS("format"))).text
if (mimetype == "application/pdf"):
#print("You're trying to fulfill a PDF file.")
pass
elif (mimetype == "application/epub+zip"):
#print("Trying to fulfill an EPUB file ...")
pass
else:
print("Weird mimetype: %s" % (mimetype))
print("Continuing anyways ...")
except:
# Some books, like from Google Play books, use a different format and don't have that metadata tag.
pass
fulfill_request, adept_ns = buildFulfillRequest(acsmxml)
if verbose_logging:
print("Fulfill request:")
print(fulfill_request)
fulfill_request_xml = etree.fromstring(fulfill_request)
# Sign the request:
signature = sign_node(fulfill_request_xml)
if (signature is None):
return False, "Signing failed!"
NSMAP = { "adept" : "http://ns.adobe.com/adept" }
adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag)
if adept_ns:
# "new" ADE
etree.SubElement(fulfill_request_xml, etree.QName(NSMAP["adept"], "signature")).text = signature
else:
# ADE 1.7.2
etree.SubElement(fulfill_request_xml, etree.QName("signature")).text = signature
# Get operator URL:
operatorURL = None
try:
operatorURL = acsmxml.find("./%s" % (adNS("operatorURL"))).text.strip()
except:
pass
if (operatorURL is None or len(operatorURL) == 0):
return False, "OperatorURL missing in ACSM"
fulfillURL = operatorURL + "/Fulfill"
ret = operatorAuth(fulfillURL)
if (ret is not None):
return False, "operatorAuth error: %s" % ret
if adept_ns:
# "new" ADE
fulfill_req_signed = "\n" + etree.tostring(fulfill_request_xml, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8")
else:
# ADE 1.7.2
fulfill_req_signed = etree.tostring(fulfill_request_xml, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8")
#print("will send:\n %s" % fulfill_req_signed)
#print("Sending fulfill request to %s" % fulfillURL)
# For debugging only
# fulfillURL = fulfillURL.replace("https:", "http:")
replyData = sendRequestDocu(fulfill_req_signed, fulfillURL).decode("utf-8")
if "= 10:
print("Took us 10 attempts to acquire loan token lock, still didn't work.")
print("(still the same token %s)" % (deacsmprefs["loan_identifier_token"]))
print("If you see this error message please open a bug report.")
# "Mark" the current access with a random token, to prevent multiple instances
# of the plugin overwriting eachother's data.
deacsmprefs.refresh()
if deacsmprefs["loan_identifier_token"] == 0:
random_identifier = random.getrandbits(64)
deacsmprefs.set("loan_identifier_token", random_identifier)
deacsmprefs.commit()
deacsmprefs.refresh()
if random_identifier != deacsmprefs["loan_identifier_token"]:
#print("we broke another thread's token, try again")
last_token = deacsmprefs["loan_identifier_token"]
error_counter = error_counter + 1
continue
else:
if last_token != deacsmprefs["loan_identifier_token"]:
#print("Token changed in the meantime ...")
# Give it another 5 tries
error_counter = max(0, error_counter - 5)
pass
last_token = deacsmprefs["loan_identifier_token"]
#print("waiting on another thread ...")
sleeptime = random.randrange(2, 10) / 1000
print(str(sleeptime))
time.sleep(sleeptime)
error_counter = error_counter + 1
continue
# Okay, now this thread can "use" the config list, and no other thread should overwrite it ...
# Check if that exact loan is already in the list, and if so, delete it:
done = False
while not done:
done = True
for book in deacsmprefs["list_of_rented_books"]:
if book["loanID"] == new_loan_record["loanID"]:
done = False
deacsmprefs["list_of_rented_books"].remove(book)
break
# Add all necessary information for a book return to the JSON array.
# The config widget can then read this and present a list of not-yet-returned
# books, and can then return them.
# Also, the config widget is responsible for cleaning up that list once a book's validity period is up.
deacsmprefs["list_of_rented_books"].append(new_loan_record)
# Okay, now we added our loan record.
# Remove the identifier token so other threads can use the config again:
deacsmprefs.commit()
deacsmprefs.refresh()
if deacsmprefs["loan_identifier_token"] != random_identifier:
print("Another thread stole the loan token while we were working with it - that's not supposed to happen ...")
print("If you see this message, please open a bug report.")
return False
deacsmprefs.set("loan_identifier_token", 0)
deacsmprefs.commit()
return True
def tryReturnBook(bookData):
verbose_logging = False
try:
import calibre_plugins.deacsm.prefs as prefs
deacsmprefs = prefs.ACSMInput_Prefs()
verbose_logging = deacsmprefs["detailed_logging"]
except:
pass
try:
user = bookData["user"]
loanID = bookData["loanID"]
device = bookData["device"]
operatorURL = bookData["operatorURL"]
except:
print("Invalid book data!")
return False, "Invalid book data"
req_data = ""
req_data += ""
req_data += "%s" % (user)
if device is not None:
req_data += "%s" % (device)
req_data += "%s" % (loanID)
req_data += addNonce()
req_data += ""
NSMAP = { "adept" : "http://ns.adobe.com/adept" }
etree.register_namespace("adept", NSMAP["adept"])
full_text_xml = etree.fromstring(req_data)
signature = sign_node(full_text_xml)
if (signature is None):
print("SIGN ERROR!")
return False, "Sign error"
etree.SubElement(full_text_xml, etree.QName(NSMAP["adept"], "signature")).text = signature
print("Notifying loan return server %s" % (operatorURL + "/LoanReturn"))
doc_send = "\n" + etree.tostring(full_text_xml, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8")
if verbose_logging:
print(doc_send)
retval = sendRequestDocu(doc_send, operatorURL + "/LoanReturn").decode("utf-8")
if " tag not found. Guess nobody wants to be notified.")
#print(etree.tostring(fulfillmentResultToken, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8"))
return True, ""
errmsg = ""
errmsg_crit = ""
for element in notifiers:
url = element.find("./%s" % (adNS("notifyURL"))).text
body = element.find("./%s" % (adNS("body")))
critical = True
if element.get("critical", "yes") == "no":
critical = False
print("Notifying optional server %s" % (url))
else:
print("Notifying server %s" % (url))
if (user is None):
try:
# "Normal" Adobe fulfillment
user = fulfillmentResultToken.find("./%s/%s/%s/%s" % (adNS("fulfillmentResult"), adNS("resourceItemInfo"), adNS("licenseToken"), adNS("user"))).text
except AttributeError:
# B&N Adobe PassHash fulfillment. Doesn't use notifications usually ...
#user = body.find("./%s" % (adNS("user"))).text
print("Skipping notify due to passHash?")
print("If this is not a passHash book pls open a bug report.")
continue
if (device is None):
try:
# "Normal" Adobe fulfillment
device = fulfillmentResultToken.find("./%s/%s/%s/%s" % (adNS("fulfillmentResult"), adNS("resourceItemInfo"), adNS("licenseToken"), adNS("device"))).text
except:
print("Missing deviceID for loan metadata ... why?")
print("Reading from device.xml instead.")
# Lets try to read this from the activation ...
activationxml = etree.parse(get_activation_xml_path())
device = activationxml.find("./%s/%s" % (adNS("activationToken"), adNS("device"))).text
full_text = ""
full_text += "%s" % user
full_text += "%s" % device
# ADE 4.0 apparently changed the order of these two elements.
# I still don't know exactly how this order is determined, but in most cases
# ADE 4+ has the body first, then the nonce, while ADE 3 and lower usually has nonce first, then body.
# It probably doesn't matter, but still, we want to behave exactly like ADE, so check the version number:
devicexml = etree.parse(get_device_path())
for f in devicexml.findall("./%s" % (adNS("version"))):
if f.get("name") == "hobbes":
version = f.get("value")
try:
v_idx = VAR_VER_HOBBES_VERSIONS.index(version)
clientVersion = VAR_VER_BUILD_IDS[v_idx]
except:
clientVersion = 0
if (clientVersion >= VAR_VER_USE_DIFFERENT_NOTIFICATION_XML_ORDER):
full_text += etree.tostring(body, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8")
full_text += addNonce()
else:
full_text += addNonce()
full_text += etree.tostring(body, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8")
full_text += ""
NSMAP = { "adept" : "http://ns.adobe.com/adept" }
etree.register_namespace("adept", NSMAP["adept"])
full_text_xml = etree.fromstring(full_text)
signature = sign_node(full_text_xml)
if (signature is None):
print("SIGN ERROR!")
continue
etree.SubElement(full_text_xml, etree.QName(NSMAP["adept"], "signature")).text = signature
doc_send = "\n" + etree.tostring(full_text_xml, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8")
# Debug: Print notify request
if (verbose_logging):
print("Notify payload XML:")
print(doc_send)
try:
code, msg = sendRequestDocuRC(doc_send, url)
except:
if not critical:
print("There was an error during an optional fulfillment notification:")
import traceback
traceback.print_exc()
print("Continuing execution ...")
continue
else:
print("Error during critical notification:")
raise
try:
msg = msg.decode("utf-8")
except:
pass
if verbose_logging:
print("MSG:")
print(msg)
if "\n")
f.write(etree.tostring(activationxml, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8"))
f.close()
return True, "Done"