#!/usr/bin/env python3 # -*- coding: utf-8 -*- ''' Helper library with code needed for Adobe stuff. ''' from uuid import getnode import os, hashlib, base64 import urllib.request, ssl from datetime import datetime, timedelta from lxml import etree try: from Cryptodome import Random from Cryptodome.Cipher import AES from Cryptodome.Hash import SHA except ImportError: # Some distros still ship Crypto from Crypto import Random from Crypto.Cipher import AES from Crypto.Hash import SHA try: from customRSA import CustomRSA except: from calibre_plugins.deacsm.customRSA import CustomRSA from oscrypto import keys from oscrypto.asymmetric import dump_certificate, dump_private_key VAR_ACS_SERVER_HTTP = "http://adeactivate.adobe.com/adept" VAR_ACS_SERVER_HTTPS = "https://adeactivate.adobe.com/adept" FILE_DEVICEKEY = "devicesalt" FILE_DEVICEXML = "device.xml" FILE_ACTIVATIONXML = "activation.xml" # Lists of different ADE "versions" we know about VAR_VER_SUPP_CONFIG_NAMES = [ "ADE 1.7.2", "ADE 2.0.1", "ADE 3.0.1", "ADE 4.0.3", "ADE 4.5.10", "ADE 4.5.11" ] VAR_VER_SUPP_VERSIONS = [ "ADE WIN 9,0,1131,27", "2.0.1.78765", "3.0.1.91394", "4.0.3.123281", "com.adobe.adobedigitaleditions.exe v4.5.10.186048", "com.adobe.adobedigitaleditions.exe v4.5.11.187303" ] VAR_VER_HOBBES_VERSIONS = [ "9.0.1131.27", "9.3.58046", "10.0.85385", "12.0.123217", "12.5.4.186049", "12.5.4.187298" ] VAR_VER_OS_IDENTIFIERS = [ "Windows Vista", "Windows Vista", "Windows 8", "Windows 8", "Windows 8", "Windows 8" ] # "Missing" versions: # 1.7.1, 2.0, 3.0, 4.0, 4.0.1, 4.0.2, 4.5 to 4.5.9 # This is a list of ALL versions we know (and can potentially use if present in a config file). # Must have the same length / size as the four lists above. VAR_VER_BUILD_IDS = [ 1131, 78765, 91394, 123281, 186048, 187303 ] # Build ID 185749 also exists, that's a different (older) variant of 4.5.10. # This is a list of versions that can be used for new authorizations: VAR_VER_ALLOWED_BUILD_IDS_AUTHORIZE = [ 78765, 91394, 123281, 187303 ] # This is a list of versions to be displayed in the version changer. VAR_VER_ALLOWED_BUILD_IDS_SWITCH_TO = [ 1131, 78765, 91394, 123281, 187303 ] # Versions >= this one are using HTTPS # According to changelogs, this is implemented as of ADE 4.0.1 - no idea what build ID that is. VAR_VER_NEED_HTTPS_BUILD_ID_LIMIT = 123281 # Default build ID to use - ADE 2.0.1 VAR_VER_DEFAULT_BUILD_ID = 78765 def are_ade_version_lists_valid(): # These five lists MUST all have the same amount of elements. # Otherwise that will cause all kinds of issues. fail = False if len(VAR_VER_SUPP_CONFIG_NAMES) != len(VAR_VER_SUPP_VERSIONS): fail = True if len(VAR_VER_SUPP_CONFIG_NAMES) != len(VAR_VER_HOBBES_VERSIONS): fail = True if len(VAR_VER_SUPP_CONFIG_NAMES) != len(VAR_VER_OS_IDENTIFIERS): fail = True if len(VAR_VER_SUPP_CONFIG_NAMES) != len(VAR_VER_BUILD_IDS): fail = True if fail: print("Internal error in DeACSM: Mismatched version list lenghts.") print("This should never happen, please open a bug report.") return False return True devkey_bytes = None def get_devkey_path(): global FILE_DEVICEKEY return FILE_DEVICEKEY def get_device_path(): global FILE_DEVICEXML return FILE_DEVICEXML def get_activation_xml_path(): global FILE_ACTIVATIONXML return FILE_ACTIVATIONXML def update_account_path(folder_path: str): global FILE_DEVICEKEY, FILE_DEVICEXML, FILE_ACTIVATIONXML FILE_DEVICEKEY = os.path.join(folder_path, "devicesalt") FILE_DEVICEXML = os.path.join(folder_path, "device.xml") FILE_ACTIVATIONXML = os.path.join(folder_path, "activation.xml") def createDeviceKeyFile(): # Original implementation: Device::createDeviceKeyFile() DEVICE_KEY_SIZE = 16 global devkey_bytes devkey_bytes = Random.get_random_bytes(DEVICE_KEY_SIZE) f = open(FILE_DEVICEKEY, "wb") f.write(devkey_bytes) f.close() def get_mac_address(): mac1 = getnode() mac2 = getnode() if (mac1 != mac2) or ((mac1 >> 40) % 2): return bytes([1, 2, 3, 4, 5, 0]) return mac1.to_bytes(6, byteorder='big') def makeSerial(random: bool): # Original implementation: std::string Device::makeSerial(bool random) # It doesn't look like this implementation results in the same fingerprint Adobe is using in ADE. # Given that Adobe only ever sees the SHA1 hash of this value, that probably doesn't matter. sha_out = None if not random: try: # Linux uid = os.getuid() import pwd username = pwd.getpwuid(uid).pw_name.encode("utf-8").decode("latin-1") except: # Windows uid = 1000 username = os.getlogin().encode("utf-8").decode("latin-1") mac_address = get_mac_address() dataToHash = "%d:%s:%02x:%02x:%02x:%02x:%02x:%02x\x00" % (uid, username, mac_address[0], mac_address[1], mac_address[2], mac_address[3], mac_address[4], mac_address[5]) sha_out = hashlib.sha1(dataToHash.encode('latin-1')).hexdigest().lower() else: sha_out = Random.get_random_bytes(20).hex().lower() return sha_out def makeFingerprint(serial: str): # Original implementation: std::string Device::makeFingerprint(const std::string& serial) # base64(sha1(serial + privateKey)) # Fingerprint must be 20 bytes or less. global devkey_bytes if devkey_bytes is None: f = open(FILE_DEVICEKEY, "rb") devkey_bytes = f.read() f.close() str_to_hash = serial + devkey_bytes.decode('latin-1') hashed_str = hashlib.sha1(str_to_hash.encode('latin-1')).digest() b64str = base64.b64encode(hashed_str) return b64str ############################################## HTTP stuff: def sendHTTPRequest_DL2FILE(URL: str, outputfile: str): headers = { "Accept": "*/*", "User-Agent": "book2png", # MacOS uses different User-Agent. Good thing we're emulating a Windows client. } req = urllib.request.Request(url=URL, headers=headers) handler = urllib.request.urlopen(req) chunksize = 16 * 1024 ret_code = handler.getcode() loc = None try: loc = req.headers.get("Location") except: pass if loc is not None: return sendHTTPRequest_DL2FILE(loc) if ret_code != 200: return ret_code with open(outputfile, "wb") as f: while True: chunk = handler.read(chunksize) if not chunk: break f.write(chunk) return 200 def sendHTTPRequest_getSimple(URL: str): headers = { "Accept": "*/*", "User-Agent": "book2png", # MacOS uses different User-Agent. Good thing we're emulating a Windows client. } # Ignore SSL: # It appears as if lots of book distributors have either invalid or expired certs ... # No idea how Adobe handles that (pinning?), but we can just ignore SSL errors and continue anyways. # Not the best solution, but it works. ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE req = urllib.request.Request(url=URL, headers=headers) handler = urllib.request.urlopen(req, context=ctx) content = handler.read() loc = None try: loc = req.headers.get("Location") except: pass if loc is not None: return sendHTTPRequest_getSimple(loc) return content def sendPOSTHTTPRequest(URL: str, document: bytes, type: str, returnRC = False): headers = { "Accept": "*/*", "User-Agent": "book2png", # MacOS uses different User-Agent. Good thing we're emulating a Windows client. "Content-Type": type } # Ignore SSL: # It appears as if lots of book distributors have either invalid or expired certs ... # No idea how Adobe handles that (pinning?), but we can just ignore SSL errors and continue anyways. # Not the best solution, but it works. ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE req = urllib.request.Request(url=URL, headers=headers, data=document) try: handler = urllib.request.urlopen(req, context=ctx) except urllib.error.HTTPError as err: # This happens with HTTP 500 and related errors. print("Post request caused HTTPError %d" % (err.code)) return err.code, "Post request caused HTTPException" ret_code = handler.getcode() if (ret_code == 204 and returnRC): return 204, "" if (ret_code != 200): print("Post request returned something other than 200 - returned %d" % (ret_code)) content = handler.read() loc = None try: loc = req.headers.get("Location") except: pass if loc is not None: return sendPOSTHTTPRequest(loc, document, type, returnRC) if returnRC: return ret_code, content return content def sendHTTPRequest(URL: str): return sendHTTPRequest_getSimple(URL) def sendRequestDocu(document: str, URL: str): return sendPOSTHTTPRequest(URL, document.encode("utf-8"), "application/vnd.adobe.adept+xml", False) def sendRequestDocuRC(document: str, URL: str): return sendPOSTHTTPRequest(URL, document.encode("utf-8"), "application/vnd.adobe.adept+xml", True) ######### Encryption and signing ################### def encrypt_with_device_key(data): global devkey_bytes if devkey_bytes is None: f = open(FILE_DEVICEKEY, "rb") devkey_bytes = f.read() f.close() remain = 16 if (len(data) % 16): remain = 16 - (len(data) % 16) data += bytes([remain])*remain iv = Random.get_random_bytes(16) cip = AES.new(devkey_bytes, AES.MODE_CBC, iv) encrypted = cip.encrypt(data) res = iv + encrypted return res def decrypt_with_device_key(data): global devkey_bytes if devkey_bytes is None: f = open(FILE_DEVICEKEY, "rb") devkey_bytes = f.read() f.close() cip = AES.new(devkey_bytes, AES.MODE_CBC, data[:16]) decrypted = cip.decrypt(data[16:]) # Remove padding decrypted = decrypted[:-decrypted[-1]] return decrypted def addNonce(): # TODO: Update nonce calculation # Currently, the plugin always uses the current time, and the counter (tmp) is always 0. # What Adobe does instead is save the current time on program start, then increase tmp # every time a Nonce is needed. dt = datetime.utcnow() usec = dt.microsecond sec = (dt - datetime(1970,1,1)).total_seconds() Ntime = int(int(sec * 1000) + usec/1000) # Unixtime to gregorian timestamp Ntime += 62167219200000 final = bytearray(Ntime.to_bytes(8, 'little')) # Something is fishy with this tmp value. It usually is 0 in ADE, but not always. # I haven't yet figured out what it means ... tmp = 0 final.extend(tmp.to_bytes(4, 'little')) ret = "" ret += "%s" % (base64.b64encode(final).decode("utf-8")) m10m = dt + timedelta(minutes=10) m10m_str = m10m.strftime("%Y-%m-%dT%H:%M:%SZ") ret += "%s" % (m10m_str) return ret def get_cert_from_pkcs12(_pkcs12, _key): _, cert, _ = keys.parse_pkcs12(_pkcs12, _key) return dump_certificate(cert, encoding="der") def sign_node(node): sha_hash = hash_node(node) sha_hash = sha_hash.digest() # print("Hash is " + sha_hash.hex()) global devkey_bytes global pkcs12 if devkey_bytes is None: f = open(FILE_DEVICEKEY, "rb") devkey_bytes = f.read() f.close() # Get private key try: activationxml = etree.parse(FILE_ACTIVATIONXML) adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag) pkcs12 = activationxml.find("./%s/%s" % (adNS("credentials"), adNS("pkcs12"))).text except: return None my_pkcs12 = base64.b64decode(pkcs12) my_priv_key, _, _ = keys.parse_pkcs12(my_pkcs12, base64.b64encode(devkey_bytes)) my_priv_key = dump_private_key(my_priv_key, None, "der") # textbook RSA with that private key block = CustomRSA.encrypt_for_adobe_signature(my_priv_key, sha_hash) signature = base64.b64encode(block).decode() # Debug # print("sig is %s\n" % block.hex()) return signature def hash_node(node): hash_ctx = SHA.new() hash_node_ctx(node, hash_ctx) return hash_ctx ASN_NONE = 0 ASN_NS_TAG = 1 # aka "BEGIN_ELEMENT" ASN_CHILD = 2 # aka "END_ATTRIBUTES" ASN_END_TAG = 3 # aka "END_ELEMENT" ASN_TEXT = 4 # aka "TEXT_NODE" ASN_ATTRIBUTE = 5 # aka "ATTRIBUTE" debug = False def hash_node_ctx(node, hash_ctx): qtag = etree.QName(node.tag) if (qtag.localname == "hmac" or qtag.localname == "signature"): if (qtag.namespace == "http://ns.adobe.com/adept"): # Adobe HMAC and signature are not hashed return else: print("Warning: Found hmac or signature node in unexpected namespace " + qtag.namespace) hash_do_append_tag(hash_ctx, ASN_NS_TAG) if qtag.namespace is None: hash_do_append_string(hash_ctx, "") else: hash_do_append_string(hash_ctx, qtag.namespace) hash_do_append_string(hash_ctx, qtag.localname) attrKeys = node.keys() # Attributes need to be sorted attrKeys.sort() # TODO Implement UTF-8 bytewise sorting: # "Attributes are sorted first by their namespaces and # then by their names; sorting is done bytewise on UTF-8 # representations." for attribute in attrKeys: # Hash all the attributes hash_do_append_tag(hash_ctx, ASN_ATTRIBUTE) # Check for element namespace and hash that, if present: q_attribute = etree.QName(attribute) # Hash element namespace (usually "") # If namespace is none, use "". Else, use namespace. hash_do_append_string(hash_ctx, "" if q_attribute.namespace is None else q_attribute.namespace) # Hash (local) name and value hash_do_append_string(hash_ctx, q_attribute.localname) hash_do_append_string(hash_ctx, node.get(attribute)) hash_do_append_tag(hash_ctx, ASN_CHILD) if (node.text is not None): # If there's raw text, hash that. # This code block used to just be the following: # hash_do_append_tag(hash_ctx, ASN_TEXT) # hash_do_append_string(hash_ctx, node.text.strip()) # though that only works with text nodes < 0x7fff. # While I doubt we'll ever encounter text nodes larger than 32k in # this application, I want to implement the spec correctly. # So there's a loop going over the text, hashing 32k chunks. text = node.text.strip() textlen = len(text) if textlen > 0: done = 0 remaining = 0 while True: remaining = textlen - done if remaining > 0x7fff: print("Warning: Hashing text node larger than 32k.") print("This usually doesn't happen, and I'm not sure if this is implemented correctly.") print("If you run into issues, please open a bug report.") remaining = 0x7fff hash_do_append_tag(hash_ctx, ASN_TEXT) hash_do_append_string(hash_ctx, text[done:done+remaining]) done += remaining if done >= textlen: break for child in node: # If there's child nodes, hash these as well. hash_node_ctx(child, hash_ctx) hash_do_append_tag(hash_ctx, ASN_END_TAG) def hash_do_append_string(hash_ctx, string: str): str_bytes = bytes(string, encoding="utf-8") length = len(str_bytes) len_upper = int(length / 256) len_lower = int(length & 0xFF) hash_do_append_raw_bytes(hash_ctx, [len_upper, len_lower]) hash_do_append_raw_bytes(hash_ctx, str_bytes) def hash_do_append_tag(hash_ctx, tag: int): if (tag > 5): return hash_do_append_raw_bytes(hash_ctx, [tag]) def hash_do_append_raw_bytes(hash_ctx, data: bytes): hash_ctx.update(bytearray(data))