from lxml import etree import base64 try: from libadobe import addNonce, sign_node, get_cert_from_pkcs12, sendRequestDocu, sendHTTPRequest from libadobe import get_devkey_path, get_device_path, get_activation_xml_path except: from calibre_plugins.deacsm.libadobe import addNonce, sign_node, get_cert_from_pkcs12, sendRequestDocu, sendHTTPRequest from calibre_plugins.deacsm.libadobe import get_devkey_path, get_device_path, get_activation_xml_path def buildFulfillRequest(acsm): adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag) activationxml = etree.parse(get_activation_xml_path()) devicexml = etree.parse(get_device_path()) user_uuid = activationxml.find("./%s/%s" % (adNS("credentials"), adNS("user"))).text device_uuid = activationxml.find("./%s/%s" % (adNS("activationToken"), adNS("device"))).text device_type = devicexml.find("./%s" % (adNS("deviceType"))).text device_class = devicexml.find("./%s" % (adNS("deviceClass"))).text fingerprint = devicexml.find("./%s" % (adNS("fingerprint"))).text version = None clientOS = None clientLocale = None ver = devicexml.findall("./%s" % (adNS("version"))) for f in ver: if f.get("name") == "hobbes": version = f.get("value") elif f.get("name") == "clientOS": clientOS = f.get("value") elif f.get("name") == "clientLocale": clientLocale = f.get("value") request = "" request += "\n" request += "\n" request += "%s\n" % (user_uuid) request += "%s\n" % (device_uuid) request += "%s\n" % (device_type) request += etree.tostring(acsm, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8") request += "\n" request += "%s\n" % (version) request += "%s\n" % (clientOS) request += "%s\n" % (clientLocale) request += "%s\n" % (device_class) request += "%s\n" % (device_type) request += "%s\n" % (fingerprint) request += "\n" request += "%s\n" % (user_uuid) request += "%s\n" % (device_uuid) request += "\n" request += "\n" request += "\n" return request def buildInitLicenseServiceRequest(authURL: str): adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag) NSMAP = { "adept" : "http://ns.adobe.com/adept" } etree.register_namespace("adept", NSMAP["adept"]) activationxml = etree.parse(get_activation_xml_path()) user_uuid = activationxml.find("./%s/%s" % (adNS("credentials"), adNS("user"))).text ret = "" ret += "" ret += "" ret += "%s" % (authURL) ret += addNonce() ret += "%s" % (user_uuid) ret += "" NSMAP = { "adept" : "http://ns.adobe.com/adept" } etree.register_namespace("adept", NSMAP["adept"]) req_xml = etree.fromstring(ret) signature = sign_node(req_xml) if (signature is None): return None etree.SubElement(req_xml, etree.QName(NSMAP["adept"], "signature")).text = signature return "\n" + etree.tostring(req_xml, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8") def buildAuthRequest(): activationxml = etree.parse(get_activation_xml_path()) adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag) user_pkcs12 = base64.b64decode(activationxml.find("./%s/%s" % (adNS("credentials"), adNS("pkcs12"))).text) f = open(get_devkey_path(), "rb") devkey_bytes = f.read() f.close() my_cert = get_cert_from_pkcs12(user_pkcs12, base64.b64encode(devkey_bytes)) ret = "\n" ret += "\n" ret += "%s\n" % (activationxml.find("./%s/%s" % (adNS("credentials"), adNS("user"))).text) ret += "%s\n" % (base64.b64encode(my_cert).decode("utf-8")) ret += "%s\n" % (activationxml.find("./%s/%s" % (adNS("credentials"), adNS("licenseCertificate"))).text) ret += "%s\n" % (activationxml.find("./%s/%s" % (adNS("credentials"), adNS("authenticationCertificate"))).text) ret += "" return ret def doOperatorAuth(operatorURL: str): auth_req = buildAuthRequest() authURL = operatorURL if authURL.endswith("Fulfill"): authURL = authURL.replace("/Fulfill", "") replyData = sendRequestDocu(auth_req, authURL + "/Auth").decode("utf-8") if not "\n") f.write(etree.tostring(activationxml, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8")) f.close() return None def buildRights(license_token_node): ret = "\n" ret += "\n" # Add license token ret += etree.tostring(license_token_node, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8") ret += "\n" NSMAP = { "adept" : "http://ns.adobe.com/adept" } adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag) lic_token_url = license_token_node.find("./%s" % (adNS("licenseURL"))).text ret += "%s\n" % lic_token_url # Get cert for this license URL: activationxml = etree.parse(get_activation_xml_path()) try: licInfo = activationxml.findall("./%s/%s" % (adNS("licenseServices"), adNS("licenseServiceInfo"))) found = False for member in licInfo: if member.find("./%s" % (adNS("licenseURL"))).text == lic_token_url: ret += "%s\n" % (member.find("./%s" % (adNS("certificate"))).text) found = True break except: return None if not found: return None ret += "\n" ret += "\n" return ret def fulfill(acsm_file): # Get pkcs12: pkcs12 = None acsmxml = None try: activationxml = etree.parse(get_activation_xml_path()) adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag) pkcs12 = activationxml.find("./%s/%s" % (adNS("credentials"), adNS("pkcs12"))).text except: return False, "Activation not found or invalid" if pkcs12 is None or len(pkcs12) == 0: return False, "Activation missing" try: acsmxml = etree.parse(acsm_file) except: return False, "ACSM not found or invalid" #print(etree.tostring(acsmxml, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8")) adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag) dcNS = lambda tag: '{%s}%s' % ('http://purl.org/dc/elements/1.1/', tag) mimetype = acsmxml.find("./%s/%s/%s" % (adNS("resourceItemInfo"), adNS("metadata"), dcNS("format"))).text if (mimetype == "application/pdf"): #print("You're trying to fulfill a PDF file.") pass elif (mimetype == "application/epub+zip"): #print("Trying to fulfill an EPUB file ...") pass else: print("Weird mimetype: %s" % (mimetype)) print("Continuing anyways ...") fulfill_request = buildFulfillRequest(acsmxml) #print(fulfill_request) fulfill_request_xml = etree.fromstring(fulfill_request) # Sign the request: signature = sign_node(fulfill_request_xml) if (signature is None): return False, "Signing failed!" NSMAP = { "adept" : "http://ns.adobe.com/adept" } adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag) etree.SubElement(fulfill_request_xml, etree.QName(NSMAP["adept"], "signature")).text = signature # Get operator URL: operatorURL = None try: operatorURL = acsmxml.find("./%s" % (adNS("operatorURL"))).text.strip() except: pass if (operatorURL is None or len(operatorURL) == 0): return False, "OperatorURL missing in ACSM" fulfillURL = operatorURL + "/Fulfill" ret = operatorAuth(fulfillURL) if (ret is not None): return False, "operatorAuth error: %s" % ret fulfill_req_signed = "\n" + etree.tostring(fulfill_request_xml, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8") #print("will send:\n %s" % fulfill_req_signed) #print("Sending fulfill request to %s" % fulfillURL) # For debugging only # fulfillURL = fulfillURL.replace("https:", "http:") replyData = sendRequestDocu(fulfill_req_signed, fulfillURL).decode("utf-8") if "\n") f.write(etree.tostring(activationxml, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8")) f.close() return True, "Done"