#!/usr/bin/env python3 # -*- coding: utf-8 -*- ''' This is an experimental Python version of libgourou. Right now it only supports part of the authorization (and doesn't support fulfillment at all). All the encryption / decryption stuff works, but I'm stuck at the XML node hashing / signing that's required for the last authorization step. Also, I'm not sure if the nonce function is implemented correctly. Who knows, maybe there will someday be a full Python version of libgourou so it can be used in Calibre on all operating systems without additional dependencies. ''' # pyright: reportUndefinedVariable=false import os, pwd, hashlib, base64, locale, urllib.request, datetime from datetime import datetime, timedelta from Crypto import Random from Crypto.PublicKey import RSA from Crypto.Util.asn1 import DerSequence from Crypto.Cipher import AES from Crypto.Cipher import PKCS1_v1_5 from binascii import a2b_base64 from uuid import getnode from lxml import etree VAR_MAIL = "test@example.com" VAR_PASS = "mypassword" VAR_AUTH_SERVER = "adeactivate.adobe.com" VAR_ACS_SERVER = "http://adeactivate.adobe.com/adept" VAR_HOBBES_VERSION = "10.0.4" FILE_DEVICEKEY = "devicesalt" FILE_DEVICEXML = "device.xml" FILE_ACTIVATIONXML = "activation.xml" devkey_bytes = None authkey_pub = None authkey_priv = None licensekey_pub = None licensekey_priv = None user_uuid = None def createDeviceKeyFile(): # Original implementation: Device::createDeviceKeyFile() global devkey_bytes DEVICE_KEY_SIZE = 16 devkey = Random.get_random_bytes(DEVICE_KEY_SIZE) devkey_bytes = devkey f = open(FILE_DEVICEKEY, "wb") f.write(devkey) f.close() def get_mac_address(): mac1 = getnode() mac2 = getnode() if (mac1 != mac2) or ((mac1 >> 40) % 2): return bytes([1, 2, 3, 4, 5, 0]) return mac1.to_bytes(6, byteorder='big') def makeSerial(random: bool): # Original implementation: std::string Device::makeSerial(bool random) sha_out = None if not random: uid = os.getuid() passwd = pwd.getpwuid(uid) mac_address = get_mac_address() dataToHash = "%d:%s:%02x:%02x:%02x:%02x:%02x:%02x\x00" % (uid, passwd.pw_name, mac_address[0], mac_address[1], mac_address[2], mac_address[3], mac_address[4], mac_address[5]) sha_out = hashlib.sha1(dataToHash.encode('latin-1')).hexdigest().lower() else: sha_out = Random.get_random_bytes(20).hex().lower() return sha_out def makeFingerprint(serial: str): # Original implementation: std::string Device::makeFingerprint(const std::string& serial) # base64(sha1(serial + privateKey)) if (devkey_bytes is None): print("devkey is None!") exit() str_to_hash = serial + devkey_bytes.decode('latin-1') hashed_str = hashlib.sha1(str_to_hash.encode('latin-1')).digest() b64str = base64.b64encode(hashed_str) return b64str def createDeviceFile(hobbes: str, randomSerial: bool): # Original implementation: Device::createDeviceFile(const std::string& hobbes, bool randomSerial) sysname = os.uname() serial = makeSerial(randomSerial) fingerprint = makeFingerprint(serial) NSMAP = { "adept" : "http://ns.adobe.com/adept" } etree.register_namespace("adept", NSMAP["adept"]) root = etree.Element(etree.QName(NSMAP["adept"], "deviceInfo")) etree.SubElement(root, etree.QName(NSMAP["adept"], "deviceClass")).text = "Desktop" etree.SubElement(root, etree.QName(NSMAP["adept"], "deviceSerial")).text = serial etree.SubElement(root, etree.QName(NSMAP["adept"], "deviceName")).text = sysname.nodename etree.SubElement(root, etree.QName(NSMAP["adept"], "deviceType")).text = "standalone" atr_ver = etree.SubElement(root, etree.QName(NSMAP["adept"], "version")) atr_ver.set("name", "hobbes") atr_ver.set("value", hobbes) atr_ver2 = etree.SubElement(root, etree.QName(NSMAP["adept"], "version")) atr_ver2.set("name", "clientOS") atr_ver2.set("value", sysname.sysname + " " + sysname.release) atr_ver3 = etree.SubElement(root, etree.QName(NSMAP["adept"], "version")) atr_ver3.set("name", "clientLocale") atr_ver3.set("value", locale.getdefaultlocale()[0]) etree.SubElement(root, etree.QName(NSMAP["adept"], "fingerprint")).text = fingerprint f = open(FILE_DEVICEXML, "w") f.write("\n") f.write(etree.tostring(root, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("latin-1")) f.close() def createDevice(): createDeviceKeyFile() createDeviceFile(VAR_HOBBES_VERSION, False) def sendHTTPRequest_getSimple(URL: str): headers = { "Accept": "*/*", "User-Agent": "book2png", } req = urllib.request.Request(url=URL, headers=headers) handler = urllib.request.urlopen(req) content = handler.read() loc = None try: loc = req.headers.get("Location") except: pass if loc is not None: return sendHTTPRequest_getSimple(loc) try: ct = req.headers.get("Content-Type") except: ct = None if ct == "application/vnd.adobe.adept+xml": print("Got adobe XML") return content def sendPOSTHTTPRequest(URL: str, document: bytes, type: str): headers = { "Accept": "*/*", "User-Agent": "book2png", "Content-Type": type } req = urllib.request.Request(url=URL, headers=headers, data=document) handler = urllib.request.urlopen(req) content = handler.read() loc = None try: loc = req.headers.get("Location") except: pass if loc is not None: return sendPOSTHTTPRequest(loc, document, type) try: ct = req.headers.get("Content-Type") except: ct = None if ct == "application/vnd.adobe.adept+xml": print("Got adobe XML") return content def sendHTTPRequest(URL: str): return sendHTTPRequest_getSimple(URL) def sendRawRequest(URL: str): return sendHTTPRequest(URL) def sendRequestDocu(document: str, URL: str): return sendPOSTHTTPRequest(URL, document.encode("latin-1"), "application/vnd.adobe.adept+xml") def createUser(): NSMAP = { "adept" : "http://ns.adobe.com/adept" } root = etree.Element("activationInfo") root.set("xmlns", NSMAP["adept"]) etree.register_namespace("adept", NSMAP["adept"]) activationServiceInfo = etree.SubElement(root, etree.QName(NSMAP["adept"], "activationServiceInfo")) activationURL = VAR_ACS_SERVER + "/ActivationServiceInfo" response = sendRawRequest(activationURL) print(response) adobe_response_xml = etree.fromstring(response) adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag) authURL = adobe_response_xml.find("./%s" % (adNS("authURL"))).text userInfoURL = adobe_response_xml.find("./%s" % (adNS("userInfoURL"))).text certificate = adobe_response_xml.find("./%s" % (adNS("certificate"))).text if (authURL is None or userInfoURL is None or certificate is None): print("Error: Unexpected reply from Adobe.") exit() etree.SubElement(activationServiceInfo, etree.QName(NSMAP["adept"], "authURL")).text = authURL etree.SubElement(activationServiceInfo, etree.QName(NSMAP["adept"], "userInfoURL")).text = userInfoURL etree.SubElement(activationServiceInfo, etree.QName(NSMAP["adept"], "activationURL")).text = VAR_ACS_SERVER etree.SubElement(activationServiceInfo, etree.QName(NSMAP["adept"], "certificate")).text = certificate authenticationURL = authURL + "/AuthenticationServiceInfo" response2 = sendRawRequest(authenticationURL) adobe_response_xml2 = etree.fromstring(response2) authCert = adobe_response_xml2.find("./%s" % (adNS("certificate"))).text etree.SubElement(activationServiceInfo, etree.QName(NSMAP["adept"], "authenticationCertificate")).text = authCert f = open(FILE_ACTIVATIONXML, "w") f.write("\n") f.write(etree.tostring(root, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("latin-1")) f.close() return def buildSignInRequest(adobeID: str, adobePassword: str, authenticationCertificate: str): NSMAP = { "adept" : "http://ns.adobe.com/adept" } etree.register_namespace("adept", NSMAP["adept"]) root = etree.Element(etree.QName(NSMAP["adept"], "signIn")) root.set("method", "AdobeID") global devkey_bytes deviceKey = devkey_bytes _authenticationCertificate = base64.b64decode(authenticationCertificate) # Build buffer ar = bytearray(deviceKey) ar.extend(bytearray(len(adobeID).to_bytes(1, 'big'))) ar.extend(bytearray(adobeID.encode("latin-1"))) ar.extend(bytearray(len(adobePassword).to_bytes(1, 'big'))) ar.extend(bytearray(adobePassword.encode("latin-1"))) # Crypt code from https://stackoverflow.com/a/12921889/4991648 cert = DerSequence() cert.decode(_authenticationCertificate) tbsCertificate = DerSequence() tbsCertificate.decode(cert[0]) subjectPublicKeyInfo = tbsCertificate[6] rsakey = RSA.importKey(subjectPublicKeyInfo) cipherAC = PKCS1_v1_5.new(rsakey) crypted_msg = cipherAC.encrypt(bytes(ar)) print(crypted_msg) etree.SubElement(root, etree.QName(NSMAP["adept"], "signInData")).text = base64.b64encode(crypted_msg) # Generate Auth key and License Key authkey = RSA.generate(1024, e=65537) licensekey = RSA.generate(1024, e=65537) global authkey_pub, authkey_priv, licensekey_pub, licensekey_priv authkey_pub = authkey.publickey().exportKey("DER") authkey_priv = authkey.exportKey("DER") authkey_priv_enc = encrypt_with_device_key(authkey_priv) licensekey_pub = licensekey.publickey().exportKey("DER") licensekey_priv = licensekey.exportKey("DER") licensekey_priv_enc = encrypt_with_device_key(licensekey_priv) etree.SubElement(root, etree.QName(NSMAP["adept"], "publicAuthKey")).text = base64.b64encode(authkey_pub) etree.SubElement(root, etree.QName(NSMAP["adept"], "encryptedPrivateAuthKey")).text = base64.b64encode(authkey_priv_enc) etree.SubElement(root, etree.QName(NSMAP["adept"], "publicLicenseKey")).text = base64.b64encode(licensekey_pub) etree.SubElement(root, etree.QName(NSMAP["adept"], "encryptedPrivateLicenseKey")).text = base64.b64encode(licensekey_priv_enc) print(etree.tostring(root, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("latin-1")) return "\n" + etree.tostring(root, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("latin-1") def signIn(username: str, passwd: str): # Get authenticationCertificate activationxml = etree.parse(FILE_ACTIVATIONXML) print(activationxml) adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag) authenticationCertificate = activationxml.find("./%s/%s" % (adNS("activationServiceInfo"), adNS("authenticationCertificate"))).text signInRequest = buildSignInRequest(username, passwd, authenticationCertificate) signInURL = activationxml.find("./%s/%s" % (adNS("activationServiceInfo"), adNS("authURL"))).text + "/SignInDirect" credentials = sendRequestDocu(signInRequest, signInURL) print (credentials) try: credentialsXML = etree.fromstring(credentials) if (credentialsXML.tag == adNS("error")): err = credentialsXML.get("data") if ("E_AUTH_FAILED" in err and "CUS05051" in err): print("Invalid username or password!") else: print("Unknown Adobe error:") print(credentials) exit() elif (credentialsXML.tag == adNS("credentials")): print("Login successful") else: print("Invalid main tag " + credentialsXML.tag) exit() except: print("Invalid response to login request") exit() # Got correct credentials private_key_data_encrypted = credentialsXML.find("./%s" % (adNS("encryptedPrivateLicenseKey"))).text private_key_data_encrypted = base64.b64decode(private_key_data_encrypted) private_key_data = decrypt_with_device_key(private_key_data_encrypted) # Okay, now we got the credential response correct. Now "just" apply all these to the main activation.xml f = open(FILE_ACTIVATIONXML, "w") f.write("\n") f.write(etree.tostring(activationxml, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("latin-1").replace("", "")) # Yeah, that's ugly, but I didn't get etree to work with the different Namespaces ... f.write("\n") f.write("%s\n" % (credentialsXML.find("./%s" % (adNS("user"))).text)) f.write("%s\n" % (credentialsXML.find("./%s" % (adNS("username"))).get("method", "AdobeID"), credentialsXML.find("./%s" % (adNS("username"))).text)) f.write("%s\n" % (credentialsXML.find("./%s" % (adNS("pkcs12"))).text)) f.write("%s\n" % (credentialsXML.find("./%s" % (adNS("licenseCertificate"))).text)) f.write("%s\n" % (base64.b64encode(private_key_data).decode("latin-1"))) f.write("%s\n" % (authenticationCertificate)) f.write("\n") f.write("\n") f.close() global user_uuid user_uuid = credentialsXML.find("./%s" % (adNS("user"))).text def encrypt_with_device_key(data): global devkey_bytes remain = 16 if (len(data) % 16): remain = 16 - (len(data) % 16) data += bytes([remain])*remain iv = Random.get_random_bytes(16) cip = AES.new(devkey_bytes, AES.MODE_CBC, iv) encrypted = cip.encrypt(data) res = iv + encrypted return res def decrypt_with_device_key(data): global devkey_bytes cip = AES.new(devkey_bytes, AES.MODE_CBC, data[:16]) decrypted = cip.decrypt(data[16:]) # Remove padding decrypted = decrypted[:-decrypted[-1]] return decrypted def addNonce(): # Not sure if this code is correct yet. dt = datetime.now() usec = dt.microsecond sec = (dt - datetime(1970,1,1)).total_seconds() nonce320 = int(0x6f046000) nonce321 = int(0x388a) bigtime = int(sec * 1000) nonce320 += int((bigtime & 0xFFFFFFFF) + usec/1000) nonce321 += int(((bigtime >> 32) & 0xFFFFFFFF)) final = bytearray(nonce320.to_bytes(4, 'little')) final.extend(nonce321.to_bytes(4, 'little')) tmp = 0 final.extend(tmp.to_bytes(4, 'little')) ret = "" ret += "%s" % (base64.b64encode(final).decode("latin-1")) m10m = datetime.now() + timedelta(minutes=10) m10m_str = m10m.strftime("%Y-%m-%dT%H:%M:%SZ") ret += "%s" % (m10m_str) return ret def buildActivateReq(): devicexml = etree.parse(FILE_DEVICEXML) adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag) version = None clientOS = None clientLocale = None ver = devicexml.findall("./%s" % (adNS("version"))) for f in ver: if f.get("name") == "hobbes": version = f.get("value") elif f.get("name") == "clientOS": clientOS = f.get("value") elif f.get("name") == "clientLocale": clientLocale = f.get("value") if (version is None or clientOS is None or clientLocale is None): print("err") return ret = "" ret += "" ret += "" ret += "%s" % (devicexml.find("./%s" % (adNS("fingerprint"))).text) ret += "%s" % (devicexml.find("./%s" % (adNS("deviceType"))).text) ret += "%s" % (clientOS) ret += "%s" % (clientLocale) ret += "%s" % (devicexml.find("./%s" % (adNS("deviceClass"))).text) ret += "" ret += "%s" % (version) ret += "%s" % (clientOS) ret += "%s" % (clientLocale) ret += "%s" % (devicexml.find("./%s" % (adNS("deviceClass"))).text) ret += "%s" % (devicexml.find("./%s" % (adNS("deviceType"))).text) ret += "%s" % (devicexml.find("./%s" % (adNS("fingerprint"))).text) ret += "" ret += addNonce() ret += "%s" % (user_uuid) ret += "" return ret def activateDevice(): activate_req = buildActivateReq() print("activate") print(activate_req) req_xml = etree.fromstring(activate_req) print(req_xml) #signature = signNode(req_xml) ''' void DRMProcessor::activateDevice() { pugi::xml_document activateReq; GOUROU_LOG(INFO, "Activate device"); buildActivateReq(activateReq); pugi::xml_node root = activateReq.select_node("adept:activate").node(); std::string signature = signNode(root); root = activateReq.select_node("adept:activate").node(); appendTextElem(root, "adept:signature", signature); pugi::xml_document activationDoc; user->readActivation(activationDoc); std::string activationURL = user->getProperty("//adept:activationURL"); activationURL += "/Activate"; ByteArray reply = sendRequest(activateReq, activationURL); pugi::xml_document activationToken; activationToken.load_buffer(reply.data(), reply.length()); root = activationDoc.select_node("activationInfo").node(); root.append_copy(activationToken.first_child()); user->updateActivationFile(activationDoc); } ''' # I've got no idea how this signing and hashing is supposed to work ... def sign_node(node): sha_hash = hash_node(node) def hash_node(node): hash_ctx = hashlib.sha1() hash_node_ctx(node, hash_ctx) return hash_ctx.digest() def hash_node_ctx(node, hash_ctx): pass def main(): createDevice() createUser() signIn(VAR_MAIL, VAR_PASS) activateDevice() if __name__ == "__main__": main()