acsm-calibre-plugin/calibre-plugin/libadobe.py

555 lines
16 KiB
Python
Raw Normal View History

2021-09-25 20:24:03 +06:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
'''
Helper library with code needed for Adobe stuff.
'''
from uuid import getnode
import os, hashlib, base64
import urllib.request, ssl
2021-09-25 20:24:03 +06:00
from datetime import datetime, timedelta
from lxml import etree
import rsa
try:
from Crypto import Random
from Crypto.Cipher import AES
from Crypto.PublicKey import RSA
from Crypto.Hash import SHA
except ImportError:
# Debian (and Ubuntu) ship pycryptodome, but not in its compatible mode with pycrypto
# If `Crypto` can't be found, try under pycryptodome's own namespace
from Cryptodome import Random
from Cryptodome.Cipher import AES
from Cryptodome.PublicKey import RSA
from Cryptodome.Hash import SHA
2021-09-25 20:24:03 +06:00
from oscrypto import keys
from oscrypto.asymmetric import dump_certificate, dump_private_key, dump_public_key
VAR_ACS_SERVER_HTTP = "http://adeactivate.adobe.com/adept"
VAR_ACS_SERVER_HTTPS = "https://adeactivate.adobe.com/adept"
2021-09-25 20:24:03 +06:00
FILE_DEVICEKEY = "devicesalt"
FILE_DEVICEXML = "device.xml"
FILE_ACTIVATIONXML = "activation.xml"
# Lists of different ADE "versions" we know about
VAR_VER_SUPP_CONFIG_NAMES = [ "ADE 1.7.2", "ADE 2.0.1", "ADE 3.0.1", "ADE 4.0.3", "ADE 4.5.10", "ADE 4.5.11" ]
VAR_VER_SUPP_VERSIONS = [ "ADE WIN 9,0,1131,27", "2.0.1.78765", "3.0.1.91394", "4.0.3.123281",
"com.adobe.adobedigitaleditions.exe v4.5.10.186048",
"com.adobe.adobedigitaleditions.exe v4.5.11.187303" ]
VAR_VER_HOBBES_VERSIONS = [ "9.0.1131.27", "9.3.58046", "10.0.85385", "12.0.123217", "12.5.4.186049", "12.5.4.187298" ]
VAR_VER_OS_IDENTIFIERS = [ "Windows Vista", "Windows Vista", "Windows 8", "Windows 8", "Windows 8", "Windows 8" ]
# "Missing" versions:
# 1.7.1, 2.0, 3.0, 4.0, 4.0.1, 4.0.2, 4.5 to 4.5.9
# This is a list of ALL versions we know (and can potentially use if present in a config file).
# Must have the same length / size as the four lists above.
VAR_VER_BUILD_IDS = [ 1131, 78765, 91394, 123281, 186048, 187303 ]
# Build ID 185749 also exists, that's a different (older) variant of 4.5.10.
# This is a list of versions that can be used for new authorizations:
VAR_VER_ALLOWED_BUILD_IDS_AUTHORIZE = [ 78765, 91394, 123281, 187303 ]
# This is a list of versions to be displayed in the version changer.
VAR_VER_ALLOWED_BUILD_IDS_SWITCH_TO = [ 1131, 78765, 91394, 123281, 187303 ]
# Versions >= this one are using HTTPS
# According to changelogs, this is implemented as of ADE 4.0.1 - no idea what build ID that is.
VAR_VER_NEED_HTTPS_BUILD_ID_LIMIT = 123281
# Default build ID to use - ADE 2.0.1
VAR_VER_DEFAULT_BUILD_ID = 78765
def are_ade_version_lists_valid():
# These five lists MUST all have the same amount of elements.
# Otherwise that will cause all kinds of issues.
fail = False
if len(VAR_VER_SUPP_CONFIG_NAMES) != len(VAR_VER_SUPP_VERSIONS):
fail = True
if len(VAR_VER_SUPP_CONFIG_NAMES) != len(VAR_VER_HOBBES_VERSIONS):
fail = True
if len(VAR_VER_SUPP_CONFIG_NAMES) != len(VAR_VER_OS_IDENTIFIERS):
fail = True
if len(VAR_VER_SUPP_CONFIG_NAMES) != len(VAR_VER_BUILD_IDS):
fail = True
if fail:
print("Internal error in DeACSM: Mismatched version list lenghts.")
print("This should never happen, please open a bug report.")
return False
return True
2021-09-25 20:24:03 +06:00
devkey_bytes = None
def get_devkey_path():
global FILE_DEVICEKEY
return FILE_DEVICEKEY
def get_device_path():
global FILE_DEVICEXML
return FILE_DEVICEXML
def get_activation_xml_path():
global FILE_ACTIVATIONXML
return FILE_ACTIVATIONXML
def update_account_path(folder_path: str):
global FILE_DEVICEKEY, FILE_DEVICEXML, FILE_ACTIVATIONXML
FILE_DEVICEKEY = os.path.join(folder_path, "devicesalt")
FILE_DEVICEXML = os.path.join(folder_path, "device.xml")
FILE_ACTIVATIONXML = os.path.join(folder_path, "activation.xml")
def createDeviceKeyFile():
# Original implementation: Device::createDeviceKeyFile()
DEVICE_KEY_SIZE = 16
global devkey_bytes
devkey_bytes = Random.get_random_bytes(DEVICE_KEY_SIZE)
f = open(FILE_DEVICEKEY, "wb")
f.write(devkey_bytes)
f.close()
def get_mac_address():
mac1 = getnode()
mac2 = getnode()
if (mac1 != mac2) or ((mac1 >> 40) % 2):
return bytes([1, 2, 3, 4, 5, 0])
return mac1.to_bytes(6, byteorder='big')
def makeSerial(random: bool):
# Original implementation: std::string Device::makeSerial(bool random)
2021-12-09 22:34:50 +06:00
# It doesn't look like this implementation results in the same fingerprint Adobe is using in ADE.
# Given that Adobe only ever sees the SHA1 hash of this value, that probably doesn't matter.
2021-09-25 20:24:03 +06:00
sha_out = None
if not random:
try:
# Linux
uid = os.getuid()
import pwd
username = pwd.getpwuid(uid).pw_name.encode("utf-8").decode("latin-1")
2021-09-25 20:24:03 +06:00
except:
# Windows
uid = 1000
username = os.getlogin().encode("utf-8").decode("latin-1")
2021-09-25 20:24:03 +06:00
mac_address = get_mac_address()
dataToHash = "%d:%s:%02x:%02x:%02x:%02x:%02x:%02x\x00" % (uid, username,
mac_address[0], mac_address[1], mac_address[2],
mac_address[3], mac_address[4], mac_address[5])
sha_out = hashlib.sha1(dataToHash.encode('latin-1')).hexdigest().lower()
else:
sha_out = Random.get_random_bytes(20).hex().lower()
return sha_out
def makeFingerprint(serial: str):
# Original implementation: std::string Device::makeFingerprint(const std::string& serial)
# base64(sha1(serial + privateKey))
# Fingerprint must be 20 bytes or less.
2021-09-25 20:24:03 +06:00
f = open(FILE_DEVICEKEY, "rb")
devkey_bytes = f.read()
f.close()
str_to_hash = serial + devkey_bytes.decode('latin-1')
hashed_str = hashlib.sha1(str_to_hash.encode('latin-1')).digest()
b64str = base64.b64encode(hashed_str)
return b64str
############################################## HTTP stuff:
def sendHTTPRequest_DL2FILE(URL: str, outputfile: str):
headers = {
"Accept": "*/*",
"User-Agent": "book2png",
# MacOS uses different User-Agent. Good thing we're emulating a Windows client.
}
req = urllib.request.Request(url=URL, headers=headers)
handler = urllib.request.urlopen(req)
chunksize = 16 * 1024
ret_code = handler.getcode()
loc = None
try:
loc = req.headers.get("Location")
except:
pass
if loc is not None:
return sendHTTPRequest_DL2FILE(loc)
if ret_code != 200:
return ret_code
with open(outputfile, "wb") as f:
while True:
chunk = handler.read(chunksize)
if not chunk:
break
f.write(chunk)
return 200
2021-09-25 20:24:03 +06:00
def sendHTTPRequest_getSimple(URL: str):
headers = {
"Accept": "*/*",
"User-Agent": "book2png",
# MacOS uses different User-Agent. Good thing we're emulating a Windows client.
2021-09-25 20:24:03 +06:00
}
# Ignore SSL:
# It appears as if lots of book distributors have either invalid or expired certs ...
# No idea how Adobe handles that (pinning?), but we can just ignore SSL errors and continue anyways.
# Not the best solution, but it works.
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
2021-09-25 20:24:03 +06:00
req = urllib.request.Request(url=URL, headers=headers)
handler = urllib.request.urlopen(req, context=ctx)
2021-09-25 20:24:03 +06:00
content = handler.read()
loc = None
try:
loc = req.headers.get("Location")
except:
pass
if loc is not None:
return sendHTTPRequest_getSimple(loc)
return content
def sendPOSTHTTPRequest(URL: str, document: bytes, type: str, returnRC = False):
2021-09-25 20:24:03 +06:00
headers = {
"Accept": "*/*",
"User-Agent": "book2png",
# MacOS uses different User-Agent. Good thing we're emulating a Windows client.
2021-09-25 20:24:03 +06:00
"Content-Type": type
}
# Ignore SSL:
# It appears as if lots of book distributors have either invalid or expired certs ...
# No idea how Adobe handles that (pinning?), but we can just ignore SSL errors and continue anyways.
# Not the best solution, but it works.
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
2021-09-25 20:24:03 +06:00
req = urllib.request.Request(url=URL, headers=headers, data=document)
handler = urllib.request.urlopen(req, context=ctx)
ret_code = handler.getcode()
if (ret_code == 204 and returnRC):
return 204, ""
if (ret_code != 200):
print("Post request returned something other than 200 - returned %d" % (ret_code))
2021-09-25 20:24:03 +06:00
content = handler.read()
loc = None
try:
loc = req.headers.get("Location")
except:
pass
if loc is not None:
return sendPOSTHTTPRequest(loc, document, type, returnRC)
2021-09-25 20:24:03 +06:00
if returnRC:
return ret_code, content
2021-10-24 01:36:47 +06:00
2021-09-25 20:24:03 +06:00
return content
def sendHTTPRequest(URL: str):
return sendHTTPRequest_getSimple(URL)
def sendRequestDocu(document: str, URL: str):
return sendPOSTHTTPRequest(URL, document.encode("utf-8"), "application/vnd.adobe.adept+xml", False)
def sendRequestDocuRC(document: str, URL: str):
return sendPOSTHTTPRequest(URL, document.encode("utf-8"), "application/vnd.adobe.adept+xml", True)
2021-09-25 20:24:03 +06:00
######### Encryption and signing ###################
def encrypt_with_device_key(data):
global devkey_bytes
if devkey_bytes is None:
f = open(FILE_DEVICEKEY, "rb")
devkey_bytes = f.read()
f.close()
remain = 16
if (len(data) % 16):
remain = 16 - (len(data) % 16)
data += bytes([remain])*remain
iv = Random.get_random_bytes(16)
cip = AES.new(devkey_bytes, AES.MODE_CBC, iv)
encrypted = cip.encrypt(data)
res = iv + encrypted
return res
def decrypt_with_device_key(data):
global devkey_bytes
if devkey_bytes is None:
f = open(FILE_DEVICEKEY, "rb")
devkey_bytes = f.read()
f.close()
cip = AES.new(devkey_bytes, AES.MODE_CBC, data[:16])
decrypted = cip.decrypt(data[16:])
# Remove padding
decrypted = decrypted[:-decrypted[-1]]
return decrypted
def addNonce():
dt = datetime.utcnow()
usec = dt.microsecond
sec = (dt - datetime(1970,1,1)).total_seconds()
2021-10-24 01:36:47 +06:00
Ntime = int(int(sec * 1000) + usec/1000)
# Unixtime to gregorian timestamp
Ntime += 62167219200000
2021-09-25 20:24:03 +06:00
2021-10-24 01:36:47 +06:00
final = bytearray(Ntime.to_bytes(8, 'little'))
2021-12-09 22:34:50 +06:00
# Something is fishy with this tmp value. It usually is 0 in ADE, but not always.
# I haven't yet figured out what it means ...
2021-09-25 20:24:03 +06:00
tmp = 0
final.extend(tmp.to_bytes(4, 'little'))
ret = ""
ret += "<adept:nonce>%s</adept:nonce>" % (base64.b64encode(final).decode("utf-8"))
m10m = dt + timedelta(minutes=10)
m10m_str = m10m.strftime("%Y-%m-%dT%H:%M:%SZ")
ret += "<adept:expiration>%s</adept:expiration>" % (m10m_str)
return ret
def get_cert_from_pkcs12(_pkcs12, _key):
_, cert, _ = keys.parse_pkcs12(_pkcs12, _key)
cert = dump_certificate(cert, encoding="der")
return cert
def sign_node(node):
sha_hash = hash_node(node)
sha_hash = sha_hash.digest()
# print("Hash is " + sha_hash.hex())
global devkey_bytes
global pkcs12
if devkey_bytes is None:
f = open(FILE_DEVICEKEY, "rb")
devkey_bytes = f.read()
f.close()
try:
activationxml = etree.parse(FILE_ACTIVATIONXML)
adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag)
pkcs12 = activationxml.find("./%s/%s" % (adNS("credentials"), adNS("pkcs12"))).text
except:
return None
my_pkcs12 = base64.b64decode(pkcs12)
my_priv_key, _, _ = keys.parse_pkcs12(my_pkcs12, base64.b64encode(devkey_bytes))
my_priv_key = dump_private_key(my_priv_key, None, "der")
key = rsa.PrivateKey.load_pkcs1(RSA.importKey(my_priv_key).exportKey())
keylen = rsa.pkcs1.common.byte_size(key.n)
padded = rsa.pkcs1._pad_for_signing(sha_hash, keylen)
payload = rsa.pkcs1.transform.bytes2int(padded)
encrypted = key.blinded_encrypt(payload)
block = rsa.pkcs1.transform.int2bytes(encrypted, keylen)
signature = base64.b64encode(block).decode()
# print("sig is %s\n" % block.hex())
return signature
def hash_node(node):
hash_ctx = SHA.new()
hash_node_ctx(node, hash_ctx)
return hash_ctx
ASN_NONE = 0
ASN_NS_TAG = 1 # aka "BEGIN_ELEMENT"
ASN_CHILD = 2 # aka "END_ATTRIBUTES"
ASN_END_TAG = 3 # aka "END_ELEMENT"
ASN_TEXT = 4 # aka "TEXT_NODE"
ASN_ATTRIBUTE = 5 # aka "ATTRIBUTE"
2021-09-25 20:24:03 +06:00
debug = False
def hash_node_ctx(node, hash_ctx):
qtag = etree.QName(node.tag)
if (qtag.localname == "hmac" or qtag.localname == "signature"):
if (qtag.namespace == "http://ns.adobe.com/adept"):
# Adobe HMAC and signature are not hashed
return
else:
print("Warning: Found hmac or signature node in unexpected namespace " + qtag.namespace)
2021-09-25 20:24:03 +06:00
hash_do_append_tag(hash_ctx, ASN_NS_TAG)
if qtag.namespace is None:
hash_do_append_string(hash_ctx, "")
else:
hash_do_append_string(hash_ctx, qtag.namespace)
2021-09-25 20:24:03 +06:00
hash_do_append_string(hash_ctx, qtag.localname)
attrKeys = node.keys()
2021-10-26 01:00:33 +06:00
# Attributes need to be sorted
2021-09-25 20:24:03 +06:00
attrKeys.sort()
2021-10-26 01:00:33 +06:00
# TODO Implement UTF-8 bytewise sorting:
# "Attributes are sorted first by their namespaces and
# then by their names; sorting is done bytewise on UTF-8
# representations."
2021-09-25 20:24:03 +06:00
for attribute in attrKeys:
# Hash all the attributes
2021-09-25 20:24:03 +06:00
hash_do_append_tag(hash_ctx, ASN_ATTRIBUTE)
# Check for element namespace and hash that, if present:
q_attribute = etree.QName(attribute)
# Hash element namespace (usually "")
# If namespace is none, use "". Else, use namespace.
hash_do_append_string(hash_ctx, "" if q_attribute.namespace is None else q_attribute.namespace)
# Hash (local) name and value
hash_do_append_string(hash_ctx, q_attribute.localname)
hash_do_append_string(hash_ctx, node.get(attribute))
2021-10-26 01:00:33 +06:00
hash_do_append_tag(hash_ctx, ASN_CHILD)
if (node.text is not None):
# If there's raw text, hash that.
# This code block used to just be the following:
# hash_do_append_tag(hash_ctx, ASN_TEXT)
# hash_do_append_string(hash_ctx, node.text.strip())
# though that only works with text nodes < 0x7fff.
# While I doubt we'll ever encounter text nodes larger than 32k in
# this application, I want to implement the spec correctly.
# So there's a loop going over the text, hashing 32k chunks.
text = node.text.strip()
textlen = len(text)
if textlen > 0:
done = 0
remaining = 0
while True:
remaining = textlen - done
if remaining > 0x7fff:
print("Warning: Hashing text node larger than 32k.")
print("This usually doesn't happen, and I'm not sure if this is implemented correctly.")
print("If you run into issues, please open a bug report.")
remaining = 0x7fff
hash_do_append_tag(hash_ctx, ASN_TEXT)
hash_do_append_string(hash_ctx, text[done:done+remaining])
done += remaining
if done >= textlen:
break
for child in node:
# If there's child nodes, hash these as well.
hash_node_ctx(child, hash_ctx)
hash_do_append_tag(hash_ctx, ASN_END_TAG)
2021-10-26 01:00:33 +06:00
2021-09-25 20:24:03 +06:00
def hash_do_append_string(hash_ctx, string: str):
str_bytes = bytes(string, encoding="utf-8")
length = len(str_bytes)
len_upper = int(length / 256)
len_lower = int(length & 0xFF)
hash_do_append_raw_bytes(hash_ctx, [len_upper, len_lower])
hash_do_append_raw_bytes(hash_ctx, str_bytes)
def hash_do_append_tag(hash_ctx, tag: int):
if (tag > 5):
return
hash_do_append_raw_bytes(hash_ctx, [tag])
def hash_do_append_raw_bytes(hash_ctx, data: bytes):
hash_ctx.update(bytearray(data))