From 58f0d57284a848b33514abef0084584ea6c9c082 Mon Sep 17 00:00:00 2001 From: Florian Bach Date: Tue, 21 Sep 2021 17:42:51 +0200 Subject: [PATCH] Begin work on python version --- calibre-plugin/__init__.py | 18 +- calibre-plugin/config.py | 59 ++-- register_ADE_account.py | 596 +++++++++++++++++++++++++++++++++++++ 3 files changed, 634 insertions(+), 39 deletions(-) create mode 100644 register_ADE_account.py diff --git a/calibre-plugin/__init__.py b/calibre-plugin/__init__.py index 0db766d..6516256 100644 --- a/calibre-plugin/__init__.py +++ b/calibre-plugin/__init__.py @@ -4,14 +4,19 @@ # Calibre plugin for ACSM files. +# Revision history: +# v0.0.1: First version. +# v0.0.2: Allow key extraction without extra binary call. + + from calibre.customize import FileTypePlugin # type: ignore -__version__ = '0.0.1' +__version__ = '0.0.2' PLUGIN_NAME = "DeACSM" PLUGIN_VERSION_TUPLE = tuple([int(x) for x in __version__.split(".")]) PLUGIN_VERSION = ".".join([str(x)for x in PLUGIN_VERSION_TUPLE]) -import os +import os, shutil import traceback import subprocess @@ -126,6 +131,15 @@ class DeACSM(FileTypePlugin): print("{0} v{1}: Failed, return original ...".format(PLUGIN_NAME, PLUGIN_VERSION)) return path_to_ebook + if ("Parse PDF" in ret.stdout.decode("latin-1") or "Parse PDF" in ret.stderr.decode("latin-1")): + # Looks like this is a PDF, move to PDF ... + print("{0} v{1}: That's a PDF".format(PLUGIN_NAME, PLUGIN_VERSION)) + outputname2 = self.temporary_file(".pdf").name + os.rename(outputname, outputname2) + shutil.copy(outputname2, "/tmp/test.pdf") + return outputname2 + + return outputname diff --git a/calibre-plugin/config.py b/calibre-plugin/config.py index e9ec052..ffe7862 100644 --- a/calibre-plugin/config.py +++ b/calibre-plugin/config.py @@ -3,7 +3,7 @@ # pyright: reportUndefinedVariable=false -import os, glob, shutil, tarfile, subprocess, time, tempfile, datetime +import os, glob, shutil, tarfile, subprocess, time, tempfile, datetime, base64 from lxml import etree @@ -221,7 +221,6 @@ class ConfigWidget(QWidget): filters = [("DER Files", ["der"])] - filename = choose_save_file(self, "Export ADE keys", _("Export ADE keys"), filters, all_files=False) if (filename is None): @@ -229,50 +228,36 @@ class ConfigWidget(QWidget): print("would export to " + filename) - my_env = os.environ.copy() - my_env["LD_LIBRARY_PATH"] = ".:" + my_env["LD_LIBRARY_PATH"] - - - old_files = glob.glob(os.path.join(verdir, "*.der")) - for file in old_files: - try: - os.remove(file) - except: - pass - - try: - os.chmod(os.path.join(verdir, "acsmdownloader"), 0o775) - except FileNotFoundError: - return error_dialog(None, "Tool not found", "Helper tool not found. Press \"Compile\" then try again.", show=True, show_copy_button=False) - - ret = None - import calibre_plugins.deacsm.prefs as prefs # type: ignore deacsmprefs = prefs.DeACSM_Prefs() - try: - ret = subprocess.run([os.path.join(verdir, "acsmdownloader"), "-d", os.path.join(deacsmprefs["path_to_account_data"], "device.xml"), - "-a", os.path.join(deacsmprefs["path_to_account_data"], "activation.xml"), - "-k", os.path.join(deacsmprefs["path_to_account_data"], "devicesalt"), - "-e" - ], capture_output=True, shell=False, cwd=verdir, env=my_env) - print(ret) + activation_xml_path = os.path.join(self.deacsmprefs["path_to_account_data"], "activation.xml") - except: - return error_dialog(None, "Export failed", "Export failed.", det_msg=str(ret), show=True, show_copy_button=True) + container = None + try: + container = etree.parse(activation_xml_path) + except (FileNotFoundError, OSError) as e: + return error_dialog(None, "Export failed", "Export failed - Can't open activation.xml", show=True, show_copy_button=False) + + key_binary = None + try: + adeptNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag) + usernameXML = container.find(adeptNS("credentials")).find(adeptNS("privateLicenseKey")) + key_base64 = usernameXML.text + key_binary = base64.decodebytes(key_base64.encode())[26:] + except: + return error_dialog(None, "Export failed", "Export failed - Can't read key from activation.xml", show=True, show_copy_button=False) try: - new_key = glob.glob(os.path.join(verdir, "*.der"))[0] - shutil.move(new_key, filename) - info_dialog(None, "Done", "Key successfully exported", show=True, show_copy_button=False) - except IndexError: - return error_dialog(None, "Export failed", "Export failed.", show=True, show_copy_button=True) - - - + output_file = open(filename, "wb") + output_file.write(key_binary) + output_file.close() + except: + return error_dialog(None, "Export failed", "Export failed - Can't write key to file", show=True, show_copy_button=False) + info_dialog(None, "Done", "Key successfully exported", show=True, show_copy_button=False) def compile(self): diff --git a/register_ADE_account.py b/register_ADE_account.py new file mode 100644 index 0000000..9964914 --- /dev/null +++ b/register_ADE_account.py @@ -0,0 +1,596 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +''' +This is an experimental Python version of libgourou. Right now it only supports part of the authorization +(and doesn't support fulfillment at all). All the encryption / decryption stuff works, but +I'm stuck at the XML node hashing / signing that's required for the last authorization step. +Also, I'm not sure if the nonce function is implemented correctly. + +Who knows, maybe there will someday be a full Python version of libgourou so it can be used in +Calibre on all operating systems without additional dependencies. +''' + + +# pyright: reportUndefinedVariable=false + +import os, pwd, hashlib, base64, locale, urllib.request, datetime +from datetime import datetime, timedelta +from Crypto import Random +from Crypto.PublicKey import RSA +from Crypto.Util.asn1 import DerSequence +from Crypto.Cipher import AES +from Crypto.Cipher import PKCS1_v1_5 +from binascii import a2b_base64 +from uuid import getnode +from lxml import etree + + +VAR_MAIL = "test@example.com" +VAR_PASS = "mypassword" +VAR_AUTH_SERVER = "adeactivate.adobe.com" +VAR_ACS_SERVER = "http://adeactivate.adobe.com/adept" +VAR_HOBBES_VERSION = "10.0.4" + +FILE_DEVICEKEY = "devicesalt" +FILE_DEVICEXML = "device.xml" +FILE_ACTIVATIONXML = "activation.xml" + +devkey_bytes = None +authkey_pub = None +authkey_priv = None +licensekey_pub = None +licensekey_priv = None + +user_uuid = None + +def createDeviceKeyFile(): + # Original implementation: Device::createDeviceKeyFile() + + global devkey_bytes + + DEVICE_KEY_SIZE = 16 + devkey = Random.get_random_bytes(DEVICE_KEY_SIZE) + devkey_bytes = devkey + + f = open(FILE_DEVICEKEY, "wb") + f.write(devkey) + f.close() + +def get_mac_address(): + mac1 = getnode() + mac2 = getnode() + if (mac1 != mac2) or ((mac1 >> 40) % 2): + return bytes([1, 2, 3, 4, 5, 0]) + + return mac1.to_bytes(6, byteorder='big') + + +def makeSerial(random: bool): + # Original implementation: std::string Device::makeSerial(bool random) + + sha_out = None + + if not random: + uid = os.getuid() + passwd = pwd.getpwuid(uid) + mac_address = get_mac_address() + + dataToHash = "%d:%s:%02x:%02x:%02x:%02x:%02x:%02x\x00" % (uid, passwd.pw_name, + mac_address[0], mac_address[1], mac_address[2], + mac_address[3], mac_address[4], mac_address[5]) + + sha_out = hashlib.sha1(dataToHash.encode('latin-1')).hexdigest().lower() + else: + sha_out = Random.get_random_bytes(20).hex().lower() + + return sha_out + +def makeFingerprint(serial: str): + # Original implementation: std::string Device::makeFingerprint(const std::string& serial) + # base64(sha1(serial + privateKey)) + + if (devkey_bytes is None): + print("devkey is None!") + exit() + + str_to_hash = serial + devkey_bytes.decode('latin-1') + hashed_str = hashlib.sha1(str_to_hash.encode('latin-1')).digest() + b64str = base64.b64encode(hashed_str) + + return b64str + + +def createDeviceFile(hobbes: str, randomSerial: bool): + # Original implementation: Device::createDeviceFile(const std::string& hobbes, bool randomSerial) + sysname = os.uname() + + serial = makeSerial(randomSerial) + fingerprint = makeFingerprint(serial) + + NSMAP = { "adept" : "http://ns.adobe.com/adept" } + etree.register_namespace("adept", NSMAP["adept"]) + + root = etree.Element(etree.QName(NSMAP["adept"], "deviceInfo")) + etree.SubElement(root, etree.QName(NSMAP["adept"], "deviceClass")).text = "Desktop" + etree.SubElement(root, etree.QName(NSMAP["adept"], "deviceSerial")).text = serial + etree.SubElement(root, etree.QName(NSMAP["adept"], "deviceName")).text = sysname.nodename + etree.SubElement(root, etree.QName(NSMAP["adept"], "deviceType")).text = "standalone" + + atr_ver = etree.SubElement(root, etree.QName(NSMAP["adept"], "version")) + atr_ver.set("name", "hobbes") + atr_ver.set("value", hobbes) + + atr_ver2 = etree.SubElement(root, etree.QName(NSMAP["adept"], "version")) + atr_ver2.set("name", "clientOS") + atr_ver2.set("value", sysname.sysname + " " + sysname.release) + + atr_ver3 = etree.SubElement(root, etree.QName(NSMAP["adept"], "version")) + atr_ver3.set("name", "clientLocale") + atr_ver3.set("value", locale.getdefaultlocale()[0]) + + etree.SubElement(root, etree.QName(NSMAP["adept"], "fingerprint")).text = fingerprint + + f = open(FILE_DEVICEXML, "w") + f.write("\n") + f.write(etree.tostring(root, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("latin-1")) + f.close() + + +def createDevice(): + createDeviceKeyFile() + createDeviceFile(VAR_HOBBES_VERSION, False) + +def sendHTTPRequest_getSimple(URL: str): + + headers = { + "Accept": "*/*", + "User-Agent": "book2png", + } + req = urllib.request.Request(url=URL, headers=headers) + handler = urllib.request.urlopen(req) + + content = handler.read() + + loc = None + try: + loc = req.headers.get("Location") + except: + pass + + if loc is not None: + return sendHTTPRequest_getSimple(loc) + + try: + ct = req.headers.get("Content-Type") + except: + ct = None + + if ct == "application/vnd.adobe.adept+xml": + print("Got adobe XML") + + return content + +def sendPOSTHTTPRequest(URL: str, document: bytes, type: str): + + headers = { + "Accept": "*/*", + "User-Agent": "book2png", + "Content-Type": type + } + req = urllib.request.Request(url=URL, headers=headers, data=document) + handler = urllib.request.urlopen(req) + + content = handler.read() + + loc = None + try: + loc = req.headers.get("Location") + except: + pass + + if loc is not None: + return sendPOSTHTTPRequest(loc, document, type) + + try: + ct = req.headers.get("Content-Type") + except: + ct = None + + if ct == "application/vnd.adobe.adept+xml": + print("Got adobe XML") + + return content + + +def sendHTTPRequest(URL: str): + return sendHTTPRequest_getSimple(URL) + +def sendRawRequest(URL: str): + return sendHTTPRequest(URL) + + +def sendRequestDocu(document: str, URL: str): + return sendPOSTHTTPRequest(URL, document.encode("latin-1"), "application/vnd.adobe.adept+xml") + + +def createUser(): + + NSMAP = { "adept" : "http://ns.adobe.com/adept" } + + root = etree.Element("activationInfo") + root.set("xmlns", NSMAP["adept"]) + + etree.register_namespace("adept", NSMAP["adept"]) + + activationServiceInfo = etree.SubElement(root, etree.QName(NSMAP["adept"], "activationServiceInfo")) + + + activationURL = VAR_ACS_SERVER + "/ActivationServiceInfo" + response = sendRawRequest(activationURL) + + print(response) + + adobe_response_xml = etree.fromstring(response) + + adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag) + + authURL = adobe_response_xml.find("./%s" % (adNS("authURL"))).text + userInfoURL = adobe_response_xml.find("./%s" % (adNS("userInfoURL"))).text + certificate = adobe_response_xml.find("./%s" % (adNS("certificate"))).text + + if (authURL is None or userInfoURL is None or certificate is None): + print("Error: Unexpected reply from Adobe.") + exit() + + etree.SubElement(activationServiceInfo, etree.QName(NSMAP["adept"], "authURL")).text = authURL + etree.SubElement(activationServiceInfo, etree.QName(NSMAP["adept"], "userInfoURL")).text = userInfoURL + etree.SubElement(activationServiceInfo, etree.QName(NSMAP["adept"], "activationURL")).text = VAR_ACS_SERVER + etree.SubElement(activationServiceInfo, etree.QName(NSMAP["adept"], "certificate")).text = certificate + + + authenticationURL = authURL + "/AuthenticationServiceInfo" + response2 = sendRawRequest(authenticationURL) + adobe_response_xml2 = etree.fromstring(response2) + authCert = adobe_response_xml2.find("./%s" % (adNS("certificate"))).text + etree.SubElement(activationServiceInfo, etree.QName(NSMAP["adept"], "authenticationCertificate")).text = authCert + + + f = open(FILE_ACTIVATIONXML, "w") + f.write("\n") + f.write(etree.tostring(root, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("latin-1")) + f.close() + return + +def buildSignInRequest(adobeID: str, adobePassword: str, authenticationCertificate: str): + NSMAP = { "adept" : "http://ns.adobe.com/adept" } + etree.register_namespace("adept", NSMAP["adept"]) + + root = etree.Element(etree.QName(NSMAP["adept"], "signIn")) + root.set("method", "AdobeID") + + global devkey_bytes + deviceKey = devkey_bytes + _authenticationCertificate = base64.b64decode(authenticationCertificate) + + # Build buffer + + ar = bytearray(deviceKey) + ar.extend(bytearray(len(adobeID).to_bytes(1, 'big'))) + ar.extend(bytearray(adobeID.encode("latin-1"))) + ar.extend(bytearray(len(adobePassword).to_bytes(1, 'big'))) + ar.extend(bytearray(adobePassword.encode("latin-1"))) + + # Crypt code from https://stackoverflow.com/a/12921889/4991648 + cert = DerSequence() + cert.decode(_authenticationCertificate) + tbsCertificate = DerSequence() + tbsCertificate.decode(cert[0]) + subjectPublicKeyInfo = tbsCertificate[6] + + rsakey = RSA.importKey(subjectPublicKeyInfo) + cipherAC = PKCS1_v1_5.new(rsakey) + crypted_msg = cipherAC.encrypt(bytes(ar)) + + print(crypted_msg) + + etree.SubElement(root, etree.QName(NSMAP["adept"], "signInData")).text = base64.b64encode(crypted_msg) + + # Generate Auth key and License Key + authkey = RSA.generate(1024, e=65537) + licensekey = RSA.generate(1024, e=65537) + + global authkey_pub, authkey_priv, licensekey_pub, licensekey_priv + + authkey_pub = authkey.publickey().exportKey("DER") + authkey_priv = authkey.exportKey("DER") + authkey_priv_enc = encrypt_with_device_key(authkey_priv) + + licensekey_pub = licensekey.publickey().exportKey("DER") + licensekey_priv = licensekey.exportKey("DER") + licensekey_priv_enc = encrypt_with_device_key(licensekey_priv) + + + etree.SubElement(root, etree.QName(NSMAP["adept"], "publicAuthKey")).text = base64.b64encode(authkey_pub) + etree.SubElement(root, etree.QName(NSMAP["adept"], "encryptedPrivateAuthKey")).text = base64.b64encode(authkey_priv_enc) + + etree.SubElement(root, etree.QName(NSMAP["adept"], "publicLicenseKey")).text = base64.b64encode(licensekey_pub) + etree.SubElement(root, etree.QName(NSMAP["adept"], "encryptedPrivateLicenseKey")).text = base64.b64encode(licensekey_priv_enc) + + print(etree.tostring(root, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("latin-1")) + + return "\n" + etree.tostring(root, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("latin-1") + + +def signIn(username: str, passwd: str): + + + # Get authenticationCertificate + activationxml = etree.parse(FILE_ACTIVATIONXML) + print(activationxml) + adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag) + authenticationCertificate = activationxml.find("./%s/%s" % (adNS("activationServiceInfo"), adNS("authenticationCertificate"))).text + + signInRequest = buildSignInRequest(username, passwd, authenticationCertificate) + + signInURL = activationxml.find("./%s/%s" % (adNS("activationServiceInfo"), adNS("authURL"))).text + "/SignInDirect" + + credentials = sendRequestDocu(signInRequest, signInURL) + print (credentials) + + try: + credentialsXML = etree.fromstring(credentials) + + if (credentialsXML.tag == adNS("error")): + err = credentialsXML.get("data") + if ("E_AUTH_FAILED" in err and "CUS05051" in err): + print("Invalid username or password!") + else: + print("Unknown Adobe error:") + print(credentials) + + exit() + elif (credentialsXML.tag == adNS("credentials")): + print("Login successful") + else: + print("Invalid main tag " + credentialsXML.tag) + exit() + + + except: + print("Invalid response to login request") + exit() + + # Got correct credentials + + private_key_data_encrypted = credentialsXML.find("./%s" % (adNS("encryptedPrivateLicenseKey"))).text + private_key_data_encrypted = base64.b64decode(private_key_data_encrypted) + private_key_data = decrypt_with_device_key(private_key_data_encrypted) + + + # Okay, now we got the credential response correct. Now "just" apply all these to the main activation.xml + + f = open(FILE_ACTIVATIONXML, "w") + + f.write("\n") + f.write(etree.tostring(activationxml, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("latin-1").replace("", "")) + + # Yeah, that's ugly, but I didn't get etree to work with the different Namespaces ... + + f.write("\n") + f.write("%s\n" % (credentialsXML.find("./%s" % (adNS("user"))).text)) + f.write("%s\n" % (credentialsXML.find("./%s" % (adNS("username"))).get("method", "AdobeID"), credentialsXML.find("./%s" % (adNS("username"))).text)) + f.write("%s\n" % (credentialsXML.find("./%s" % (adNS("pkcs12"))).text)) + f.write("%s\n" % (credentialsXML.find("./%s" % (adNS("licenseCertificate"))).text)) + f.write("%s\n" % (base64.b64encode(private_key_data).decode("latin-1"))) + f.write("%s\n" % (authenticationCertificate)) + f.write("\n") + f.write("\n") + + f.close() + + global user_uuid + user_uuid = credentialsXML.find("./%s" % (adNS("user"))).text + + + +def encrypt_with_device_key(data): + + global devkey_bytes + remain = 16 + if (len(data) % 16): + remain = 16 - (len(data) % 16) + + data += bytes([remain])*remain + + iv = Random.get_random_bytes(16) + cip = AES.new(devkey_bytes, AES.MODE_CBC, iv) + encrypted = cip.encrypt(data) + + res = iv + encrypted + return res + +def decrypt_with_device_key(data): + global devkey_bytes + + cip = AES.new(devkey_bytes, AES.MODE_CBC, data[:16]) + decrypted = cip.decrypt(data[16:]) + + # Remove padding + decrypted = decrypted[:-decrypted[-1]] + + return decrypted + + + + +def addNonce(): + + # Not sure if this code is correct yet. + + dt = datetime.now() + usec = dt.microsecond + sec = (dt - datetime(1970,1,1)).total_seconds() + + nonce320 = int(0x6f046000) + nonce321 = int(0x388a) + bigtime = int(sec * 1000) + + nonce320 += int((bigtime & 0xFFFFFFFF) + usec/1000) + nonce321 += int(((bigtime >> 32) & 0xFFFFFFFF)) + + final = bytearray(nonce320.to_bytes(4, 'little')) + final.extend(nonce321.to_bytes(4, 'little')) + tmp = 0 + final.extend(tmp.to_bytes(4, 'little')) + + ret = "" + + ret += "%s" % (base64.b64encode(final).decode("latin-1")) + + m10m = datetime.now() + timedelta(minutes=10) + m10m_str = m10m.strftime("%Y-%m-%dT%H:%M:%SZ") + + ret += "%s" % (m10m_str) + + return ret + + + +def buildActivateReq(): + + devicexml = etree.parse(FILE_DEVICEXML) + adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag) + + + version = None + clientOS = None + clientLocale = None + + ver = devicexml.findall("./%s" % (adNS("version"))) + + + for f in ver: + if f.get("name") == "hobbes": + version = f.get("value") + elif f.get("name") == "clientOS": + clientOS = f.get("value") + elif f.get("name") == "clientLocale": + clientLocale = f.get("value") + + if (version is None or clientOS is None or clientLocale is None): + print("err") + return + + + ret = "" + + ret += "" + ret += "" + ret += "%s" % (devicexml.find("./%s" % (adNS("fingerprint"))).text) + ret += "%s" % (devicexml.find("./%s" % (adNS("deviceType"))).text) + ret += "%s" % (clientOS) + ret += "%s" % (clientLocale) + ret += "%s" % (devicexml.find("./%s" % (adNS("deviceClass"))).text) + ret += "" + + + ret += "%s" % (version) + ret += "%s" % (clientOS) + ret += "%s" % (clientLocale) + ret += "%s" % (devicexml.find("./%s" % (adNS("deviceClass"))).text) + ret += "%s" % (devicexml.find("./%s" % (adNS("deviceType"))).text) + ret += "%s" % (devicexml.find("./%s" % (adNS("fingerprint"))).text) + + ret += "" + + ret += addNonce() + + ret += "%s" % (user_uuid) + + ret += "" + + return ret + + + +def activateDevice(): + + activate_req = buildActivateReq() + + print("activate") + print(activate_req) + + req_xml = etree.fromstring(activate_req) + + print(req_xml) + + #signature = signNode(req_xml) + + +''' + + void DRMProcessor::activateDevice() + { + pugi::xml_document activateReq; + + GOUROU_LOG(INFO, "Activate device"); + + buildActivateReq(activateReq); + + pugi::xml_node root = activateReq.select_node("adept:activate").node(); + + std::string signature = signNode(root); + + root = activateReq.select_node("adept:activate").node(); + appendTextElem(root, "adept:signature", signature); + + pugi::xml_document activationDoc; + user->readActivation(activationDoc); + + std::string activationURL = user->getProperty("//adept:activationURL"); + activationURL += "/Activate"; + + ByteArray reply = sendRequest(activateReq, activationURL); + + pugi::xml_document activationToken; + activationToken.load_buffer(reply.data(), reply.length()); + + root = activationDoc.select_node("activationInfo").node(); + root.append_copy(activationToken.first_child()); + user->updateActivationFile(activationDoc); + } + +''' + +# I've got no idea how this signing and hashing is supposed to work ... + + +def sign_node(node): + + sha_hash = hash_node(node) + + + +def hash_node(node): + + hash_ctx = hashlib.sha1() + hash_node_ctx(node, hash_ctx) + return hash_ctx.digest() + +def hash_node_ctx(node, hash_ctx): + pass + + + + + +def main(): + createDevice() + createUser() + + signIn(VAR_MAIL, VAR_PASS) + activateDevice() + +if __name__ == "__main__": + main() \ No newline at end of file