You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
acsm-calibre-plugin/calibre-plugin/libadobe.py

660 lines
18 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
'''
Helper library with code needed for Adobe stuff.
'''
from uuid import getnode
import sys, os, hashlib, base64
import ssl
try:
import urllib.request as ulib
import urllib.error as uliberror
except:
import urllib2 as ulib
import urllib2 as uliberror
from datetime import datetime, timedelta
from lxml import etree
try:
from Cryptodome import Random
from Cryptodome.Cipher import AES
from Cryptodome.Hash import SHA
except ImportError:
# Some distros still ship Crypto
from Crypto import Random
from Crypto.Cipher import AES
from Crypto.Hash import SHA
#@@CALIBRE_COMPAT_CODE@@
from customRSA import CustomRSA
from oscrypto import keys
from oscrypto.asymmetric import dump_certificate, dump_private_key
VAR_ACS_SERVER_HTTP = "http://adeactivate.adobe.com/adept"
VAR_ACS_SERVER_HTTPS = "https://adeactivate.adobe.com/adept"
FILE_DEVICEKEY = "devicesalt"
FILE_DEVICEXML = "device.xml"
FILE_ACTIVATIONXML = "activation.xml"
# Lists of different ADE "versions" we know about
VAR_VER_SUPP_CONFIG_NAMES = [ "ADE 1.7.2", "ADE 2.0.1", "ADE 3.0.1", "ADE 4.0.3", "ADE 4.5.10", "ADE 4.5.11" ]
VAR_VER_SUPP_VERSIONS = [ "ADE WIN 9,0,1131,27", "2.0.1.78765", "3.0.1.91394", "4.0.3.123281",
"com.adobe.adobedigitaleditions.exe v4.5.10.186048",
"com.adobe.adobedigitaleditions.exe v4.5.11.187303" ]
VAR_VER_HOBBES_VERSIONS = [ "9.0.1131.27", "9.3.58046", "10.0.85385", "12.0.123217", "12.5.4.186049", "12.5.4.187298" ]
VAR_VER_OS_IDENTIFIERS = [ "Windows Vista", "Windows Vista", "Windows 8", "Windows 8", "Windows 8", "Windows 8" ]
# "Missing" versions:
# 1.7.1, 2.0, 3.0, 4.0, 4.0.1, 4.0.2, 4.5 to 4.5.9
# 4.5.7.179634
# This is a list of ALL versions we know (and can potentially use if present in a config file).
# Must have the same length / size as the four lists above.
VAR_VER_BUILD_IDS = [ 1131, 78765, 91394, 123281, 186048, 187303 ]
# Build ID 185749 also exists, that's a different (older) variant of 4.5.10.
# This is a list of versions that can be used for new authorizations:
VAR_VER_ALLOWED_BUILD_IDS_AUTHORIZE = [ 78765, 91394, 123281, 187303 ]
# This is a list of versions to be displayed in the version changer.
VAR_VER_ALLOWED_BUILD_IDS_SWITCH_TO = [ 1131, 78765, 91394, 123281, 187303 ]
# Versions >= this one are using HTTPS
# According to changelogs, this is implemented as of ADE 4.0.1 - no idea what build ID that is.
VAR_VER_NEED_HTTPS_BUILD_ID_LIMIT = 123281
# Versions >= this are using a different order for the XML elements in a FulfillmentNotification.
# This doesn't matter for fulfillment at all, but I want to emulate ADE as accurately as possible.
# Implemented as of ADE 4.0.0, no idea what exact build number that is.
VAR_VER_USE_DIFFERENT_NOTIFICATION_XML_ORDER = 123281
# Default build ID to use - ADE 2.0.1
VAR_VER_DEFAULT_BUILD_ID = 78765
def are_ade_version_lists_valid():
# These five lists MUST all have the same amount of elements.
# Otherwise that will cause all kinds of issues.
fail = False
if len(VAR_VER_SUPP_CONFIG_NAMES) != len(VAR_VER_SUPP_VERSIONS):
fail = True
if len(VAR_VER_SUPP_CONFIG_NAMES) != len(VAR_VER_HOBBES_VERSIONS):
fail = True
if len(VAR_VER_SUPP_CONFIG_NAMES) != len(VAR_VER_OS_IDENTIFIERS):
fail = True
if len(VAR_VER_SUPP_CONFIG_NAMES) != len(VAR_VER_BUILD_IDS):
fail = True
if fail:
print("Internal error in ACSM Input: Mismatched version list lenghts.")
print("This should never happen, please open a bug report.")
return False
return True
devkey_bytes = None
def get_devkey_path():
global FILE_DEVICEKEY
return FILE_DEVICEKEY
def get_device_path():
global FILE_DEVICEXML
return FILE_DEVICEXML
def get_activation_xml_path():
global FILE_ACTIVATIONXML
return FILE_ACTIVATIONXML
def update_account_path(folder_path):
# type: (str) -> None
global FILE_DEVICEKEY, FILE_DEVICEXML, FILE_ACTIVATIONXML
FILE_DEVICEKEY = os.path.join(folder_path, "devicesalt")
FILE_DEVICEXML = os.path.join(folder_path, "device.xml")
FILE_ACTIVATIONXML = os.path.join(folder_path, "activation.xml")
def createDeviceKeyFile():
# Original implementation: Device::createDeviceKeyFile()
DEVICE_KEY_SIZE = 16
global devkey_bytes
devkey_bytes = Random.get_random_bytes(DEVICE_KEY_SIZE)
f = open(FILE_DEVICEKEY, "wb")
f.write(devkey_bytes)
f.close()
def int_to_bytes(value, length, big_endian = True):
# Helper function for Python2 only (big endian)
# Python3 uses int.to_bytes()
result = []
for i in range(0, length):
result.append(value >> (i * 8) & 0xff)
if big_endian:
result.reverse()
return result
def get_mac_address():
mac1 = getnode()
mac2 = getnode()
if (mac1 != mac2) or ((mac1 >> 40) % 2):
if sys.version_info[0] >= 3:
return bytes([1, 2, 3, 4, 5, 0])
else:
return bytearray([1, 2, 3, 4, 5, 0])
if sys.version_info[0] >= 3:
return mac1.to_bytes(6, byteorder='big')
return int_to_bytes(mac1, 6)
def makeSerial(random):
# type: (bool) -> str
# Original implementation: std::string Device::makeSerial(bool random)
# It doesn't look like this implementation results in the same fingerprint Adobe is using in ADE.
# Given that Adobe only ever sees the SHA1 hash of this value, that probably doesn't matter.
sha_out = None
if not random:
try:
# Linux
uid = os.getuid()
import pwd
username = pwd.getpwuid(uid).pw_name.encode("utf-8").decode("latin-1")
except:
# Windows
uid = 1000
try:
username = os.getlogin().encode("utf-8").decode("latin-1")
except:
import getpass
username = getpass.getuser().encode("utf-8").decode("latin-1")
mac_address = get_mac_address()
dataToHash = "%d:%s:%02x:%02x:%02x:%02x:%02x:%02x\x00" % (uid, username,
mac_address[0], mac_address[1], mac_address[2],
mac_address[3], mac_address[4], mac_address[5])
sha_out = hashlib.sha1(dataToHash.encode('latin-1')).hexdigest().lower()
else:
import binascii
sha_out = binascii.hexlify(Random.get_random_bytes(20)).lower()
return sha_out
def makeFingerprint(serial):
# type: (str) -> str
# Original implementation: std::string Device::makeFingerprint(const std::string& serial)
# base64(sha1(serial + privateKey))
# Fingerprint must be 20 bytes or less.
global devkey_bytes
if devkey_bytes is None:
f = open(FILE_DEVICEKEY, "rb")
devkey_bytes = f.read()
f.close()
str_to_hash = serial + devkey_bytes.decode('latin-1')
hashed_str = hashlib.sha1(str_to_hash.encode('latin-1')).digest()
b64str = base64.b64encode(hashed_str)
return b64str
############################################## HTTP stuff:
def sendHTTPRequest_DL2FILE(URL, outputfile):
# type: (str, str) -> int
headers = {
"Accept": "*/*",
"User-Agent": "book2png",
# MacOS uses different User-Agent. Good thing we're emulating a Windows client.
}
req = ulib.Request(url=URL, headers=headers)
handler = ulib.urlopen(req)
chunksize = 16 * 1024
ret_code = handler.getcode()
loc = None
try:
loc = req.headers.get("Location")
except:
pass
if loc is not None:
return sendHTTPRequest_DL2FILE(loc)
if ret_code != 200:
return ret_code
with open(outputfile, "wb") as f:
while True:
chunk = handler.read(chunksize)
if not chunk:
break
f.write(chunk)
return 200
def sendHTTPRequest_getSimple(URL):
# type: (str) -> str
headers = {
"Accept": "*/*",
"User-Agent": "book2png",
# MacOS uses different User-Agent. Good thing we're emulating a Windows client.
}
# Ignore SSL:
# It appears as if lots of book distributors have either invalid or expired certs ...
# No idea how Adobe handles that (pinning?), but we can just ignore SSL errors and continue anyways.
# Not the best solution, but it works.
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
req = ulib.Request(url=URL, headers=headers)
handler = ulib.urlopen(req, context=ctx)
content = handler.read()
loc = None
try:
loc = req.headers.get("Location")
except:
pass
if loc is not None:
return sendHTTPRequest_getSimple(loc)
return content
def sendPOSTHTTPRequest(URL, document, type, returnRC = False):
# type: (str, bytes, str, bool) -> str
headers = {
"Accept": "*/*",
"User-Agent": "book2png",
# MacOS uses different User-Agent. Good thing we're emulating a Windows client.
"Content-Type": type
}
# Ignore SSL:
# It appears as if lots of book distributors have either invalid or expired certs ...
# No idea how Adobe handles that (pinning?), but we can just ignore SSL errors and continue anyways.
# Not the best solution, but it works.
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
# Make sure URL has a protocol
# Some vendors (see issue #22) apparently don't include "http://" in some of their URLs.
# Python returns an error when it encounters such a URL, so just add that prefix if it's not present.
if not "://" in URL:
print("Provider is using malformed URL %s, fixing." % (URL))
URL = "http://" + URL
req = ulib.Request(url=URL, headers=headers, data=document)
try:
handler = ulib.urlopen(req, context=ctx)
except uliberror.HTTPError as err:
# This happens with HTTP 500 and related errors.
print("Post request caused HTTPError %d" % (err.code))
if returnRC:
return err.code, "Post request caused HTTPException"
else:
return None
except uliberror.URLError as err:
# This happens if the hostname cannot be resolved.
print("Post request failed with URLError")
if returnRC:
return 900, "Post request failed with URLError"
else:
return None
ret_code = handler.getcode()
if (ret_code == 204 and returnRC):
return 204, ""
if (ret_code != 200):
print("Post request returned something other than 200 - returned %d" % (ret_code))
content = handler.read()
loc = None
try:
loc = req.headers.get("Location")
except:
pass
if loc is not None:
return sendPOSTHTTPRequest(loc, document, type, returnRC)
if returnRC:
return ret_code, content
return content
def sendHTTPRequest(URL):
# type: (str) -> str
return sendHTTPRequest_getSimple(URL)
def sendRequestDocu(document, URL):
# type: (str, str) -> str
return sendPOSTHTTPRequest(URL, document.encode("utf-8"), "application/vnd.adobe.adept+xml", False)
def sendRequestDocuRC(document, URL):
# type: (str, str) -> str
return sendPOSTHTTPRequest(URL, document.encode("utf-8"), "application/vnd.adobe.adept+xml", True)
######### Encryption and signing ###################
def encrypt_with_device_key(data):
data = bytearray(data)
global devkey_bytes
if devkey_bytes is None:
f = open(FILE_DEVICEKEY, "rb")
devkey_bytes = f.read()
f.close()
remain = 16
if (len(data) % 16):
remain = 16 - (len(data) % 16)
for _ in range(remain):
data.append(remain)
data = bytes(data)
iv = Random.get_random_bytes(16)
cip = AES.new(devkey_bytes, AES.MODE_CBC, iv)
encrypted = cip.encrypt(data)
res = iv + encrypted
return res
def decrypt_with_device_key(data):
if isinstance(data, str):
# Python2
data = bytes(data)
global devkey_bytes
if devkey_bytes is None:
f = open(FILE_DEVICEKEY, "rb")
devkey_bytes = f.read()
f.close()
cip = AES.new(devkey_bytes, AES.MODE_CBC, data[:16])
decrypted = bytearray(cip.decrypt(data[16:]))
# Remove padding
decrypted = decrypted[:-decrypted[-1]]
return decrypted
def addNonce():
# TODO: Update nonce calculation
# Currently, the plugin always uses the current time, and the counter (tmp) is always 0.
# What Adobe does instead is save the current time on program start, then increase tmp
# every time a Nonce is needed.
dt = datetime.utcnow()
sec = (dt - datetime(1970,1,1)).total_seconds()
Ntime = int(sec * 1000)
# Ntime is now milliseconds since 1970
# Unixtime to gregorian timestamp
Ntime += 62167219200000
# Something is fishy with this tmp value. It usually is 0 in ADE, but not always.
# I haven't yet figured out what it means ...
tmp = 0
if sys.version_info[0] >= 3:
final = bytearray(Ntime.to_bytes(8, 'little'))
final.extend(tmp.to_bytes(4, 'little'))
else:
final = bytearray(int_to_bytes(Ntime, 8, False))
final.extend(int_to_bytes(tmp, 4, True))
ret = ""
ret += "<adept:nonce>%s</adept:nonce>" % (base64.b64encode(final).decode("utf-8"))
m10m = dt + timedelta(minutes=10)
m10m_str = m10m.strftime("%Y-%m-%dT%H:%M:%SZ")
ret += "<adept:expiration>%s</adept:expiration>" % (m10m_str)
return ret
def get_cert_from_pkcs12(_pkcs12, _key):
_, cert, _ = keys.parse_pkcs12(_pkcs12, _key)
return dump_certificate(cert, encoding="der")
def sign_node(node):
sha_hash = hash_node(node)
sha_hash = sha_hash.digest()
# print("Hash is " + sha_hash.hex())
global devkey_bytes
global pkcs12
if devkey_bytes is None:
f = open(FILE_DEVICEKEY, "rb")
devkey_bytes = f.read()
f.close()
# Get private key
try:
activationxml = etree.parse(FILE_ACTIVATIONXML)
adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag)
pkcs12 = activationxml.find("./%s/%s" % (adNS("credentials"), adNS("pkcs12"))).text
except:
return None
my_pkcs12 = base64.b64decode(pkcs12)
my_priv_key, _, _ = keys.parse_pkcs12(my_pkcs12, base64.b64encode(devkey_bytes))
my_priv_key = dump_private_key(my_priv_key, None, "der")
# textbook RSA with that private key
block = CustomRSA.encrypt_for_adobe_signature(my_priv_key, sha_hash)
signature = base64.b64encode(block).decode()
# Debug
# print("sig is %s\n" % block.hex())
return signature
def hash_node(node):
hash_ctx = SHA.new()
hash_node_ctx(node, hash_ctx)
return hash_ctx
ASN_NONE = 0
ASN_NS_TAG = 1 # aka "BEGIN_ELEMENT"
ASN_CHILD = 2 # aka "END_ATTRIBUTES"
ASN_END_TAG = 3 # aka "END_ELEMENT"
ASN_TEXT = 4 # aka "TEXT_NODE"
ASN_ATTRIBUTE = 5 # aka "ATTRIBUTE"
debug = False
def hash_node_ctx(node, hash_ctx):
qtag = etree.QName(node.tag)
if (qtag.localname == "hmac" or qtag.localname == "signature"):
if (qtag.namespace == "http://ns.adobe.com/adept"):
# Adobe HMAC and signature are not hashed
return
else:
print("Warning: Found hmac or signature node in unexpected namespace " + qtag.namespace)
hash_do_append_tag(hash_ctx, ASN_NS_TAG)
if qtag.namespace is None:
hash_do_append_string(hash_ctx, "")
else:
hash_do_append_string(hash_ctx, qtag.namespace)
hash_do_append_string(hash_ctx, qtag.localname)
attrKeys = node.keys()
# Attributes need to be sorted
attrKeys.sort()
# TODO Implement UTF-8 bytewise sorting:
# "Attributes are sorted first by their namespaces and
# then by their names; sorting is done bytewise on UTF-8
# representations."
for attribute in attrKeys:
# Hash all the attributes
hash_do_append_tag(hash_ctx, ASN_ATTRIBUTE)
# Check for element namespace and hash that, if present:
q_attribute = etree.QName(attribute)
# Hash element namespace (usually "")
# If namespace is none, use "". Else, use namespace.
hash_do_append_string(hash_ctx, "" if q_attribute.namespace is None else q_attribute.namespace)
# Hash (local) name and value
hash_do_append_string(hash_ctx, q_attribute.localname)
hash_do_append_string(hash_ctx, node.get(attribute))
hash_do_append_tag(hash_ctx, ASN_CHILD)
if (node.text is not None):
# If there's raw text, hash that.
# This code block used to just be the following:
# hash_do_append_tag(hash_ctx, ASN_TEXT)
# hash_do_append_string(hash_ctx, node.text.strip())
# though that only works with text nodes < 0x7fff.
# While I doubt we'll ever encounter text nodes larger than 32k in
# this application, I want to implement the spec correctly.
# So there's a loop going over the text, hashing 32k chunks.
text = node.text.strip()
textlen = len(text)
if textlen > 0:
done = 0
remaining = 0
while True:
remaining = textlen - done
if remaining > 0x7fff:
print("Warning: Hashing text node larger than 32k.")
print("This usually doesn't happen, and I'm not sure if this is implemented correctly.")
print("If you run into issues, please open a bug report.")
remaining = 0x7fff
hash_do_append_tag(hash_ctx, ASN_TEXT)
hash_do_append_string(hash_ctx, text[done:done+remaining])
done += remaining
if done >= textlen:
break
for child in node:
# If there's child nodes, hash these as well.
hash_node_ctx(child, hash_ctx)
hash_do_append_tag(hash_ctx, ASN_END_TAG)
def hash_do_append_string(hash_ctx, string):
# type: (SHA.SHA1Hash, str) -> None
if sys.version_info[0] >= 3:
str_bytes = bytes(string, encoding="utf-8")
else:
str_bytes = bytes(string)
length = len(str_bytes)
len_upper = int(length / 256)
len_lower = int(length & 0xFF)
hash_do_append_raw_bytes(hash_ctx, [len_upper, len_lower])
hash_do_append_raw_bytes(hash_ctx, str_bytes)
def hash_do_append_tag(hash_ctx, tag):
# type: (SHA.SHA1Hash, int) -> None
if (tag > 5):
return
hash_do_append_raw_bytes(hash_ctx, [tag])
def hash_do_append_raw_bytes(hash_ctx, data):
# type: (SHA.SHA1Hash, bytes) -> None
hash_ctx.update(bytearray(data))