Begin work on Python2 support

This commit is contained in:
Florian Bach 2022-01-16 17:43:29 +01:00
parent cec56cb9a2
commit 6d72506fad
13 changed files with 410 additions and 256 deletions

View File

@ -34,8 +34,16 @@ jobs:
run: |
# Require cryptography >= 3.1 because in 3.0 and below, the backend param in load_key_and_certificates was still required.
pip3 install freezegun lxml pycryptodome rsa oscrypto "cryptography>=3.1"
# Install Python2 stuff
curl https://bootstrap.pypa.io/pip/2.7/get-pip.py --output get-pip.py
python2 get-pip.py
pip2 install freezegun mock lxml pycryptodome "rsa<=4.3" oscrypto cryptography==3.1
- name: Run tests
- name: Run tests (Python 3)
run: |
cd tests && ./main.py
cd tests && python3 ./main.py && cd ..
- name: Run tests (Python 2)
run: |
cd tests && PYTHONWARNINGS=ignore python2 ./main.py && cd ..

View File

@ -3,7 +3,11 @@
[ ! -f calibre-plugin/asn1crypto.zip ] && ./package_modules.sh
[ ! -f calibre-plugin/oscrypto.zip ] && ./package_modules.sh
pushd calibre-plugin
rm -rf calibre-plugin-tmp || /bin/true
cp -r calibre-plugin calibre-plugin-tmp
pushd calibre-plugin-tmp
pushd keyextract
# Compile C programs:
@ -21,8 +25,24 @@ echo -n "2021-12-19-03" > module_id.txt
cp ../LICENSE LICENSE
cp ../README.md README.md
shopt -s globstar
echo "Injecting Python2 compat code ..."
for file in **/*.py;
do
#echo $file
# Inject Python2 compat code:
sed '/#@@CALIBRE_COMPAT_CODE@@/ {
r __calibre_compat_code.py
d
}' -i $file
done
# Create ZIP file from calibre-plugin folder.
zip -r ../calibre-plugin.zip *
popd
rm -rf calibre-plugin-tmp

View File

@ -0,0 +1,21 @@
#@@CALIBRE_COMPAT_CODE_START@@
import sys, os
# Compatibility code taken from noDRM's DeDRM fork.
# This fixes the weird import issues with Calibre 2,
# and it allows me to get rid of a ton of try-except blocks.
if "calibre" in sys.modules:
# Explicitly allow importing everything ...
if os.path.dirname(os.path.abspath(__file__)) not in sys.path:
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
# Bugfix for Calibre < 5:
if sys.version_info[0] == 2:
from calibre.utils.config import config_dir
if os.path.join(config_dir, "plugins", "DeACSM.zip") not in sys.path:
sys.path.insert(0, os.path.join(config_dir, "plugins", "DeACSM.zip"))
#@@CALIBRE_COMPAT_CODE_END@@

View File

@ -54,13 +54,15 @@ import os, shutil, traceback, sys, time, io, random
import zipfile
from lxml import etree
#@@CALIBRE_COMPAT_CODE@@
class DeACSM(FileTypePlugin):
name = PLUGIN_NAME
description = "ACSM Input Plugin - Takes an Adobe ACSM file and converts that into a useable EPUB or PDF file. Python reimplementation of libgourou by Grégory Soutadé"
supported_platforms = ['linux', 'osx', 'windows']
author = "Leseratte10"
version = PLUGIN_VERSION_TUPLE
minimum_calibre_version = (5, 0, 0)
minimum_calibre_version = (4, 0, 0)
file_types = set(['acsm'])
on_import = True
on_preprocess = True
@ -184,21 +186,9 @@ class DeACSM(FileTypePlugin):
# Okay, now all the modules are available, import the Adobe modules.
# Account:
try:
from calibre_plugins.deacsm.libadobe import createDeviceKeyFile, update_account_path
from calibre_plugins.deacsm.libadobeAccount import createDeviceFile, createUser, signIn, activateDevice
except:
from libadobe import createDeviceKeyFile, update_account_path
from libadobeAccount import createDeviceFile, createUser, signIn, activateDevice
# Fulfill:
try:
from calibre_plugins.deacsm.libadobe import sendHTTPRequest
from calibre_plugins.deacsm.libadobeFulfill import buildRights, fulfill
except:
from libadobe import sendHTTPRequest
from libadobeFulfill import buildRights, fulfill
from libadobe import createDeviceKeyFile, update_account_path, sendHTTPRequest
from libadobeAccount import createDeviceFile, createUser, signIn, activateDevice
from libadobeFulfill import buildRights, fulfill
import calibre_plugins.deacsm.prefs as prefs # type: ignore
@ -245,20 +235,13 @@ class DeACSM(FileTypePlugin):
except:
return False
def download(self, replyData: str):
def download(self, replyData):
# type: (str) -> str
try:
from calibre_plugins.deacsm.libadobe import sendHTTPRequest_DL2FILE
from calibre_plugins.deacsm.libadobeFulfill import buildRights, fulfill
except:
from libadobe import sendHTTPRequest_DL2FILE
from libadobeFulfill import buildRights, fulfill
try:
from calibre_plugins.deacsm.libpdf import patch_drm_into_pdf
except:
from libpdf import patch_drm_into_pdf
from libadobe import sendHTTPRequest_DL2FILE
from libadobeFulfill import buildRights, fulfill
from libpdf import patch_drm_into_pdf
adobe_fulfill_response = etree.fromstring(replyData)
@ -344,7 +327,9 @@ class DeACSM(FileTypePlugin):
print("{0} v{1}: Error: Unsupported file type ...".format(PLUGIN_NAME, PLUGIN_VERSION))
return None
def run(self, path_to_ebook: str):
def run(self, path_to_ebook):
# type: (str) -> str
# This code gets called by Calibre with a path to the new book file.
# We need to check if it's an ACSM file
@ -363,12 +348,9 @@ class DeACSM(FileTypePlugin):
print("{0} v{1}: ADE auth is missing or broken ".format(PLUGIN_NAME, PLUGIN_VERSION))
return path_to_ebook
try:
from calibre_plugins.deacsm.libadobe import are_ade_version_lists_valid
from calibre_plugins.deacsm.libadobeFulfill import fulfill
except:
from libadobe import are_ade_version_lists_valid
from libadobeFulfill import fulfill
from libadobe import are_ade_version_lists_valid
from libadobeFulfill import fulfill
if not are_ade_version_lists_valid():
print("{0} v{1}: ADE version list mismatch, please open a bug report.".format(PLUGIN_NAME, PLUGIN_VERSION))

View File

@ -32,6 +32,13 @@ from calibre.utils.config import config_dir # type: ignore
from calibre.constants import isosx, iswindows, islinux # type: ignore
#@@CALIBRE_COMPAT_CODE@@
if sys.version_info[0] == 2:
class FileNotFoundError(Exception):
pass
class ConfigWidget(QWidget):
def __init__(self, plugin_path):
QWidget.__init__(self)
@ -171,15 +178,11 @@ class ConfigWidget(QWidget):
try:
from calibre_plugins.deacsm.libadobe import createDeviceKeyFile, update_account_path, are_ade_version_lists_valid
from calibre_plugins.deacsm.libadobeAccount import createDeviceFile, createUser, signIn, activateDevice
from libadobe import createDeviceKeyFile, update_account_path, are_ade_version_lists_valid
from libadobeAccount import createDeviceFile, createUser, signIn, activateDevice
except:
try:
from libadobe import createDeviceKeyFile, update_account_path, are_ade_version_lists_valid
from libadobeAccount import createDeviceFile, createUser, signIn, activateDevice
except:
print("{0} v{1}: Error while importing Account stuff".format(PLUGIN_NAME, PLUGIN_VERSION))
traceback.print_exc()
print("{0} v{1}: Error while importing Account stuff".format(PLUGIN_NAME, PLUGIN_VERSION))
traceback.print_exc()
update_account_path(self.deacsmprefs["path_to_account_data"])
@ -242,15 +245,11 @@ class ConfigWidget(QWidget):
# So just open up a folder picker dialog and have the user select the eReader's root folder.
try:
from calibre_plugins.deacsm.libadobe import update_account_path, VAR_VER_HOBBES_VERSIONS
from calibre_plugins.deacsm.libadobeAccount import activateDevice, exportProxyAuth
from libadobe import update_account_path, VAR_VER_HOBBES_VERSIONS
from libadobeAccount import activateDevice, exportProxyAuth
except:
try:
from libadobe import update_account_path, VAR_VER_HOBBES_VERSIONS
from libadobeAccount import activateDevice, exportProxyAuth
except:
print("{0} v{1}: Error while importing Account stuff".format(PLUGIN_NAME, PLUGIN_VERSION))
traceback.print_exc()
print("{0} v{1}: Error while importing Account stuff".format(PLUGIN_NAME, PLUGIN_VERSION))
traceback.print_exc()
update_account_path(self.deacsmprefs["path_to_account_data"])
@ -402,7 +401,7 @@ class ConfigWidget(QWidget):
try:
containerdev = etree.parse(device_xml_path)
except (FileNotFoundError, OSError) as e:
except (IOError, FileNotFoundError, OSError) as e:
return error_dialog(None, "Failed", "Error while reading device.xml", show=True, show_copy_button=False)
try:
@ -509,14 +508,12 @@ class ConfigWidget(QWidget):
def get_account_info(self):
try:
from calibre_plugins.deacsm.libadobe import VAR_VER_SUPP_CONFIG_NAMES, VAR_VER_HOBBES_VERSIONS
from libadobe import VAR_VER_SUPP_CONFIG_NAMES, VAR_VER_HOBBES_VERSIONS
except:
try:
from libadobe import VAR_VER_SUPP_CONFIG_NAMES, VAR_VER_HOBBES_VERSIONS
except:
print("{0} v{1}: Error while importing Account stuff".format(PLUGIN_NAME, PLUGIN_VERSION))
traceback.print_exc()
print("{0} v{1}: Error while importing Account stuff".format(PLUGIN_NAME, PLUGIN_VERSION))
traceback.print_exc()
activation_xml_path = os.path.join(self.deacsmprefs["path_to_account_data"], "activation.xml")
device_xml_path = os.path.join(self.deacsmprefs["path_to_account_data"], "device.xml")
@ -525,7 +522,7 @@ class ConfigWidget(QWidget):
try:
container = etree.parse(activation_xml_path)
containerdev = etree.parse(device_xml_path)
except (FileNotFoundError, OSError) as e:
except (IOError, FileNotFoundError, OSError) as e:
return "Not authorized for any ADE ID", False, None
try:
@ -579,16 +576,12 @@ class ConfigWidget(QWidget):
def export_activation(self):
try:
from calibre_plugins.deacsm.libadobe import update_account_path
from calibre_plugins.deacsm.libadobeAccount import getAccountUUID
from libadobe import update_account_path
from libadobeAccount import getAccountUUID
except:
try:
from libadobe import update_account_path
from libadobeAccount import getAccountUUID
except:
print("{0} v{1}: Error while importing Account stuff".format(PLUGIN_NAME, PLUGIN_VERSION))
traceback.print_exc()
return False
print("{0} v{1}: Error while importing Account stuff".format(PLUGIN_NAME, PLUGIN_VERSION))
traceback.print_exc()
return False
update_account_path(self.deacsmprefs["path_to_account_data"])
@ -678,7 +671,7 @@ class ConfigWidget(QWidget):
return error_dialog(None, "Import failed", "The WINEPREFIX you entered doesn't seem to contain an authorized ADE.", show=True, show_copy_button=False)
from calibre_plugins.deacsm.libadobeImportAccount import importADEactivationLinuxWine
from libadobeImportAccount import importADEactivationLinuxWine
ret, msg = importADEactivationLinuxWine(text)
@ -715,7 +708,7 @@ class ConfigWidget(QWidget):
def import_activation_from_Win(self):
# This will try to import the activation from Adobe Digital Editions on Windows ...
from calibre_plugins.deacsm.libadobeImportAccount import importADEactivationWindows
from libadobeImportAccount import importADEactivationWindows
ret, msg = importADEactivationWindows()
@ -757,7 +750,7 @@ class ConfigWidget(QWidget):
info_dialog(None, "Importing from ADE", msg, show=True, show_copy_button=False)
from calibre_plugins.deacsm.libadobeImportAccount import importADEactivationMac
from libadobeImportAccount import importADEactivationMac
ret, msg = importADEactivationMac()
@ -862,24 +855,19 @@ class ConfigWidget(QWidget):
def switch_ade_version(self):
try:
from calibre_plugins.deacsm.libadobe import VAR_VER_HOBBES_VERSIONS, VAR_VER_SUPP_CONFIG_NAMES
from calibre_plugins.deacsm.libadobe import VAR_VER_BUILD_IDS, VAR_VER_ALLOWED_BUILD_IDS_SWITCH_TO
from calibre_plugins.deacsm.libadobeAccount import changeDeviceVersion
from libadobe import VAR_VER_HOBBES_VERSIONS, VAR_VER_SUPP_CONFIG_NAMES
from libadobe import VAR_VER_BUILD_IDS, VAR_VER_ALLOWED_BUILD_IDS_SWITCH_TO
from libadobeAccount import changeDeviceVersion
except:
try:
from libadobe import VAR_VER_HOBBES_VERSIONS, VAR_VER_SUPP_CONFIG_NAMES
from libadobe import VAR_VER_BUILD_IDS, VAR_VER_ALLOWED_BUILD_IDS_SWITCH_TO
from libadobeAccount import changeDeviceVersion
except:
print("{0} v{1}: Error while importing Account stuff".format(PLUGIN_NAME, PLUGIN_VERSION))
traceback.print_exc()
print("{0} v{1}: Error while importing Account stuff".format(PLUGIN_NAME, PLUGIN_VERSION))
traceback.print_exc()
device_xml_path = os.path.join(self.deacsmprefs["path_to_account_data"], "device.xml")
try:
containerdev = etree.parse(device_xml_path)
except (FileNotFoundError, OSError) as e:
except (IOError, FileNotFoundError, OSError) as e:
return error_dialog(None, "Failed", "Error while reading file", show=True, show_copy_button=False)
try:
@ -961,18 +949,14 @@ class ConfigWidget(QWidget):
def create_anon_auth(self):
try:
from calibre_plugins.deacsm.libadobe import createDeviceKeyFile, update_account_path, VAR_VER_SUPP_CONFIG_NAMES
from calibre_plugins.deacsm.libadobe import VAR_VER_ALLOWED_BUILD_IDS_AUTHORIZE, VAR_VER_BUILD_IDS, VAR_VER_DEFAULT_BUILD_ID
from calibre_plugins.deacsm.libadobeAccount import createDeviceFile, createUser, signIn, activateDevice
from libadobe import createDeviceKeyFile, update_account_path, VAR_VER_SUPP_CONFIG_NAMES
from libadobe import VAR_VER_ALLOWED_BUILD_IDS_AUTHORIZE, VAR_VER_BUILD_IDS, VAR_VER_DEFAULT_BUILD_ID
from libadobeAccount import createDeviceFile, createUser, signIn, activateDevice
except:
try:
from libadobe import createDeviceKeyFile, update_account_path, VAR_VER_SUPP_CONFIG_NAMES
from libadobe import VAR_VER_ALLOWED_BUILD_IDS_AUTHORIZE, VAR_VER_BUILD_IDS, VAR_VER_DEFAULT_BUILD_ID
from libadobeAccount import createDeviceFile, createUser, signIn, activateDevice
except:
print("{0} v{1}: Error while importing Account stuff".format(PLUGIN_NAME, PLUGIN_VERSION))
traceback.print_exc()
print("{0} v{1}: Error while importing Account stuff".format(PLUGIN_NAME, PLUGIN_VERSION))
traceback.print_exc()
update_account_path(self.deacsmprefs["path_to_account_data"])
@ -1059,15 +1043,11 @@ class ConfigWidget(QWidget):
def convert_anon_to_account(self):
try:
from calibre_plugins.deacsm.libadobe import createDeviceKeyFile, update_account_path
from calibre_plugins.deacsm.libadobeAccount import convertAnonAuthToAccount
from libadobe import createDeviceKeyFile, update_account_path
from libadobeAccount import convertAnonAuthToAccount
except:
try:
from libadobe import createDeviceKeyFile, update_account_path
from libadobeAccount import convertAnonAuthToAccount
except:
print("{0} v{1}: Error while importing Account stuff".format(PLUGIN_NAME, PLUGIN_VERSION))
traceback.print_exc()
print("{0} v{1}: Error while importing Account stuff".format(PLUGIN_NAME, PLUGIN_VERSION))
traceback.print_exc()
update_account_path(self.deacsmprefs["path_to_account_data"])
@ -1128,17 +1108,12 @@ class ConfigWidget(QWidget):
def link_account(self):
try:
from calibre_plugins.deacsm.libadobe import createDeviceKeyFile, update_account_path, VAR_VER_SUPP_CONFIG_NAMES
from calibre_plugins.deacsm.libadobe import VAR_VER_ALLOWED_BUILD_IDS_AUTHORIZE, VAR_VER_BUILD_IDS, VAR_VER_DEFAULT_BUILD_ID
from calibre_plugins.deacsm.libadobeAccount import createDeviceFile, getAuthMethodsAndCert, createUser, signIn, activateDevice
from libadobe import createDeviceKeyFile, update_account_path, VAR_VER_SUPP_CONFIG_NAMES
from libadobe import VAR_VER_ALLOWED_BUILD_IDS_AUTHORIZE, VAR_VER_BUILD_IDS, VAR_VER_DEFAULT_BUILD_ID
from libadobeAccount import createDeviceFile, getAuthMethodsAndCert, createUser, signIn, activateDevice
except:
try:
from libadobe import createDeviceKeyFile, update_account_path, VAR_VER_SUPP_CONFIG_NAMES
from libadobe import VAR_VER_ALLOWED_BUILD_IDS_AUTHORIZE, VAR_VER_BUILD_IDS, VAR_VER_DEFAULT_BUILD_ID
from libadobeAccount import createDeviceFile, getAuthMethodsAndCert, createUser, signIn, activateDevice
except:
print("{0} v{1}: Error while importing Account stuff".format(PLUGIN_NAME, PLUGIN_VERSION))
traceback.print_exc()
print("{0} v{1}: Error while importing Account stuff".format(PLUGIN_NAME, PLUGIN_VERSION))
traceback.print_exc()
update_account_path(self.deacsmprefs["path_to_account_data"])
@ -1247,15 +1222,11 @@ class ConfigWidget(QWidget):
def export_key(self):
try:
from calibre_plugins.deacsm.libadobe import update_account_path
from calibre_plugins.deacsm.libadobeAccount import exportAccountEncryptionKeyDER, getAccountUUID
from libadobe import update_account_path
from libadobeAccount import exportAccountEncryptionKeyDER, getAccountUUID
except:
try:
from libadobe import update_account_path
from libadobeAccount import exportAccountEncryptionKeyDER, getAccountUUID
except:
print("{0} v{1}: Error while importing Account stuff".format(PLUGIN_NAME, PLUGIN_VERSION))
traceback.print_exc()
print("{0} v{1}: Error while importing Account stuff".format(PLUGIN_NAME, PLUGIN_VERSION))
traceback.print_exc()
update_account_path(self.deacsmprefs["path_to_account_data"])
@ -1421,13 +1392,10 @@ class RentedBooksDialog(QDialog):
try:
from calibre_plugins.deacsm.libadobeFulfill import tryReturnBook
from libadobeFulfill import tryReturnBook
except:
try:
from libadobeFulfill import tryReturnBook
except:
print("{0} v{1}: Error while importing book return stuff".format(PLUGIN_NAME, PLUGIN_VERSION))
traceback.print_exc()
print("{0} v{1}: Error while importing book return stuff".format(PLUGIN_NAME, PLUGIN_VERSION))
traceback.print_exc()
Ret_book = None
for book in self.parent.deacsmprefs["list_of_rented_books"]:

View File

@ -8,6 +8,8 @@ The original code used blinding and this one doesn't,
but we don't really care about side-channel attacks ...
'''
import sys
try:
from Cryptodome.PublicKey import RSA
except ImportError:
@ -16,6 +18,7 @@ except ImportError:
class CustomRSA:
@staticmethod
def encrypt_for_adobe_signature(signing_key, message):
key = RSA.importKey(signing_key)
keylen = CustomRSA.byte_size(key.n)
@ -23,12 +26,17 @@ class CustomRSA:
payload = CustomRSA.transform_bytes2int(padded)
encrypted = CustomRSA.normal_encrypt(key, payload)
block = CustomRSA.transform_int2bytes(encrypted, keylen)
return block
return bytearray(block)
def byte_size(number: int):
@staticmethod
def byte_size(number):
# type: (int) -> int
return (number.bit_length() + 7) // 8
def pad_message(message: bytes, target_len: int) -> bytes:
@staticmethod
def pad_message(message, target_len):
# type: (bytes, int) -> bytes
# Padding always uses 0xFF
# Returns: 00 01 PADDING 00 MESSAGE
@ -40,9 +48,13 @@ class CustomRSA:
padding_len = target_len - message_length - 3
return b"".join([b"\x00\x01", padding_len * b"\xff", b"\x00", message])
ret = bytearray(b"".join([b"\x00\x01", padding_len * b"\xff", b"\x00"]))
ret.extend(bytes(message))
def normal_encrypt(key, message: int):
return ret
@staticmethod
def normal_encrypt(key, message):
if message < 0 or message > key.n:
raise ValueError("Invalid message")
@ -50,15 +62,57 @@ class CustomRSA:
encrypted = pow(message, key.d, key.n)
return encrypted
def transform_bytes2int(raw_bytes: bytes):
return int.from_bytes(raw_bytes, "big", signed=False)
@staticmethod
def py2_int_to_bytes(value, length, big_endian = True):
result = []
for i in range(0, length):
result.append(value >> (i * 8) & 0xff)
if big_endian:
result.reverse()
return result
@staticmethod
def py2_bytes_to_int(bytes, big_endian = True):
# type: (bytes, bool) -> int
my_bytes = bytes
if not big_endian:
my_bytes.reverse()
result = 0
for b in my_bytes:
result = result * 256 + int(b)
return result
@staticmethod
def transform_bytes2int(raw_bytes):
# type: (bytes) -> int
if sys.version_info[0] >= 3:
return int.from_bytes(raw_bytes, "big", signed=False)
return CustomRSA.py2_bytes_to_int(raw_bytes, True)
@staticmethod
def transform_int2bytes(number, fill_size = 0):
# type: (int, int) -> bytes
def transform_int2bytes(number: int, fill_size: int = 0):
if number < 0:
raise ValueError("Negative number")
if fill_size > 0:
return number.to_bytes(fill_size, "big")
size = None
bytes_needed = max(1, CustomRSA.byte_size(number))
return number.to_bytes(bytes_needed, "big")
if fill_size > 0:
size = fill_size
else:
size = max(1, CustomRSA.byte_size(number))
if sys.version_info[0] >= 3:
return number.to_bytes(size, "big")
return CustomRSA.py2_int_to_bytes(number, size, True)

View File

@ -1,9 +1,9 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#@@CALIBRE_COMPAT_CODE@@
from re import VERBOSE
import sys
def unfuck(user):
# Wine uses a pretty nonstandard encoding in their registry file.
@ -22,10 +22,16 @@ def unfuck(user):
while i < len(user):
# Convert string of len 1 to a byte
char = user[i][0].encode("latin-1")[0]
if sys.version_info[0] == 2:
char = ord(char)
if char == ord('\\'):
# Get next char:
i += 1
char = user[i][0].encode("latin-1")[0]
if sys.version_info[0] == 2:
char = ord(char)
if (char == ord('a')):
user_new.append(0x07)
elif (char == ord('b')):
@ -46,6 +52,8 @@ def unfuck(user):
# Get next char
i += 1
char = user[i][0].encode("latin-1")[0]
if sys.version_info[0] == 2:
char = ord(char)
if char not in hex_char_list:
user_new.append(ord('x'))
# This seems to be fallback code.
@ -57,16 +65,25 @@ def unfuck(user):
# Read up to 3 more chars
next = user[i + 1][0].encode("latin-1")[0]
if sys.version_info[0] == 2:
next = ord(next)
if next in hex_char_list:
ival += chr(next)
i += 1
next = user[i + 1][0].encode("latin-1")[0]
if sys.version_info[0] == 2:
next = ord(next)
if next in hex_char_list:
ival += chr(next)
i += 1
next = user[i + 1][0].encode("latin-1")[0]
if sys.version_info[0] == 2:
next = ord(next)
if next in hex_char_list:
ival += chr(next)
i += 1
@ -83,11 +100,17 @@ def unfuck(user):
# Read up to 2 more chars
next = user[i + 1][0].encode("latin-1")[0]
if sys.version_info[0] == 2:
next = ord(next)
if next >= ord('0') and next <= ord('9'):
octal = (octal * 8) + (next - ord('0'))
i += 1
next = user[i + 1][0].encode("latin-1")[0]
if sys.version_info[0] == 2:
next = ord(next)
if next >= ord('0') and next <= ord('9'):
octal = (octal * 8) + (next - ord('0'))
i += 1
@ -122,11 +145,8 @@ def GetMasterKey(path_to_wine_prefix):
except:
pass
try:
import cpuid
except:
import calibre_plugins.deacsm.cpuid as cpuid
import cpuid
import struct
try:

View File

@ -17,6 +17,8 @@ except ImportError:
else:
raise
#@@CALIBRE_COMPAT_CODE@@
def GetSystemDirectory():
from ctypes import windll, c_wchar_p, c_uint, create_unicode_buffer
MAX_PATH = 255
@ -126,11 +128,8 @@ def GetMasterKey():
# Get CPU vendor:
try:
import cpuid
except:
import calibre_plugins.deacsm.cpuid as cpuid
import cpuid
import struct
cpu = cpuid.CPUID()
_, b, c, d = cpu(0)

View File

@ -6,8 +6,15 @@ Helper library with code needed for Adobe stuff.
'''
from uuid import getnode
import os, hashlib, base64
import urllib.request, ssl
import sys, os, hashlib, base64
import ssl
try:
import urllib.request as ulib
import urllib.error as uliberror
except:
import urllib2 as ulib
import urllib2 as uliberror
from datetime import datetime, timedelta
from lxml import etree
@ -23,10 +30,11 @@ except ImportError:
from Crypto.Cipher import AES
from Crypto.Hash import SHA
try:
from customRSA import CustomRSA
except:
from calibre_plugins.deacsm.customRSA import CustomRSA
#@@CALIBRE_COMPAT_CODE@@
from customRSA import CustomRSA
from oscrypto import keys
from oscrypto.asymmetric import dump_certificate, dump_private_key
@ -115,7 +123,9 @@ def get_activation_xml_path():
return FILE_ACTIVATIONXML
def update_account_path(folder_path: str):
def update_account_path(folder_path):
# type: (str) -> None
global FILE_DEVICEKEY, FILE_DEVICEXML, FILE_ACTIVATIONXML
FILE_DEVICEKEY = os.path.join(folder_path, "devicesalt")
@ -134,17 +144,40 @@ def createDeviceKeyFile():
f.write(devkey_bytes)
f.close()
def int_to_bytes(value, length, big_endian = True):
# Helper function for Python2 only (big endian)
# Python3 uses int.to_bytes()
result = []
for i in range(0, length):
result.append(value >> (i * 8) & 0xff)
if big_endian:
result.reverse()
return result
def get_mac_address():
mac1 = getnode()
mac2 = getnode()
if (mac1 != mac2) or ((mac1 >> 40) % 2):
return bytes([1, 2, 3, 4, 5, 0])
if sys.version_info[0] >= 3:
return bytes([1, 2, 3, 4, 5, 0])
else:
return bytearray([1, 2, 3, 4, 5, 0])
return mac1.to_bytes(6, byteorder='big')
if sys.version_info[0] >= 3:
return mac1.to_bytes(6, byteorder='big')
return int_to_bytes(mac1, 6)
def makeSerial(random: bool):
def makeSerial(random):
# type: (bool) -> str
# Original implementation: std::string Device::makeSerial(bool random)
# It doesn't look like this implementation results in the same fingerprint Adobe is using in ADE.
@ -165,17 +198,26 @@ def makeSerial(random: bool):
mac_address = get_mac_address()
dataToHash = "%d:%s:%02x:%02x:%02x:%02x:%02x:%02x\x00" % (uid, username,
mac_address[0], mac_address[1], mac_address[2],
mac_address[3], mac_address[4], mac_address[5])
if sys.version_info[0] >= 3:
dataToHash = "%d:%s:%02x:%02x:%02x:%02x:%02x:%02x\x00" % (uid, username,
mac_address[0], mac_address[1], mac_address[2],
mac_address[3], mac_address[4], mac_address[5])
else:
dataToHash = "%d:%s:%02x:%02x:%02x:%02x:%02x:%02x\x00" % (uid, username,
mac_address[0], mac_address[1], mac_address[2],
mac_address[3], mac_address[4], mac_address[5])
sha_out = hashlib.sha1(dataToHash.encode('latin-1')).hexdigest().lower()
else:
sha_out = Random.get_random_bytes(20).hex().lower()
import binascii
sha_out = binascii.hexlify(Random.get_random_bytes(20)).lower()
return sha_out
def makeFingerprint(serial: str):
def makeFingerprint(serial):
# type: (str) -> str
# Original implementation: std::string Device::makeFingerprint(const std::string& serial)
# base64(sha1(serial + privateKey))
# Fingerprint must be 20 bytes or less.
@ -195,14 +237,16 @@ def makeFingerprint(serial: str):
############################################## HTTP stuff:
def sendHTTPRequest_DL2FILE(URL: str, outputfile: str):
def sendHTTPRequest_DL2FILE(URL, outputfile):
# type: (str, str) -> int
headers = {
"Accept": "*/*",
"User-Agent": "book2png",
# MacOS uses different User-Agent. Good thing we're emulating a Windows client.
}
req = urllib.request.Request(url=URL, headers=headers)
handler = urllib.request.urlopen(req)
req = ulib.Request(url=URL, headers=headers)
handler = ulib.urlopen(req)
chunksize = 16 * 1024
@ -230,7 +274,8 @@ def sendHTTPRequest_DL2FILE(URL: str, outputfile: str):
return 200
def sendHTTPRequest_getSimple(URL: str):
def sendHTTPRequest_getSimple(URL):
# type: (str) -> str
headers = {
"Accept": "*/*",
@ -246,8 +291,8 @@ def sendHTTPRequest_getSimple(URL: str):
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
req = urllib.request.Request(url=URL, headers=headers)
handler = urllib.request.urlopen(req, context=ctx)
req = ulib.Request(url=URL, headers=headers)
handler = ulib.urlopen(req, context=ctx)
content = handler.read()
@ -262,7 +307,8 @@ def sendHTTPRequest_getSimple(URL: str):
return content
def sendPOSTHTTPRequest(URL: str, document: bytes, type: str, returnRC = False):
def sendPOSTHTTPRequest(URL, document, type, returnRC = False):
# type: (str, bytes, str, bool) -> str
headers = {
"Accept": "*/*",
@ -279,10 +325,10 @@ def sendPOSTHTTPRequest(URL: str, document: bytes, type: str, returnRC = False):
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
req = urllib.request.Request(url=URL, headers=headers, data=document)
req = ulib.Request(url=URL, headers=headers, data=document)
try:
handler = urllib.request.urlopen(req, context=ctx)
except urllib.error.HTTPError as err:
handler = ulib.urlopen(req, context=ctx)
except uliberror.HTTPError as err:
# This happens with HTTP 500 and related errors.
print("Post request caused HTTPError %d" % (err.code))
return err.code, "Post request caused HTTPException"
@ -311,14 +357,17 @@ def sendPOSTHTTPRequest(URL: str, document: bytes, type: str, returnRC = False):
return content
def sendHTTPRequest(URL: str):
def sendHTTPRequest(URL):
# type: (str) -> str
return sendHTTPRequest_getSimple(URL)
def sendRequestDocu(document: str, URL: str):
def sendRequestDocu(document, URL):
# type: (str, str) -> str
return sendPOSTHTTPRequest(URL, document.encode("utf-8"), "application/vnd.adobe.adept+xml", False)
def sendRequestDocuRC(document: str, URL: str):
def sendRequestDocuRC(document, URL):
# type: (str, str) -> str
return sendPOSTHTTPRequest(URL, document.encode("utf-8"), "application/vnd.adobe.adept+xml", True)
@ -328,6 +377,8 @@ def sendRequestDocuRC(document: str, URL: str):
def encrypt_with_device_key(data):
data = bytearray(data)
global devkey_bytes
if devkey_bytes is None:
f = open(FILE_DEVICEKEY, "rb")
@ -338,7 +389,9 @@ def encrypt_with_device_key(data):
if (len(data) % 16):
remain = 16 - (len(data) % 16)
data += bytes([remain])*remain
for _ in range(remain):
data.append(remain)
iv = Random.get_random_bytes(16)
cip = AES.new(devkey_bytes, AES.MODE_CBC, iv)
@ -348,6 +401,11 @@ def encrypt_with_device_key(data):
return res
def decrypt_with_device_key(data):
if isinstance(data, str):
# Python2
data = bytes(data)
global devkey_bytes
if devkey_bytes is None:
f = open(FILE_DEVICEKEY, "rb")
@ -355,7 +413,7 @@ def decrypt_with_device_key(data):
f.close()
cip = AES.new(devkey_bytes, AES.MODE_CBC, data[:16])
decrypted = cip.decrypt(data[16:])
decrypted = bytearray(cip.decrypt(data[16:]))
# Remove padding
decrypted = decrypted[:-decrypted[-1]]
@ -380,12 +438,17 @@ def addNonce():
# Unixtime to gregorian timestamp
Ntime += 62167219200000
final = bytearray(Ntime.to_bytes(8, 'little'))
# Something is fishy with this tmp value. It usually is 0 in ADE, but not always.
# I haven't yet figured out what it means ...
tmp = 0
final.extend(tmp.to_bytes(4, 'little'))
if sys.version_info[0] >= 3:
final = bytearray(Ntime.to_bytes(8, 'little'))
final.extend(tmp.to_bytes(4, 'little'))
else:
final = bytearray(int_to_bytes(Ntime, 8, False))
final.extend(int_to_bytes(tmp, 4, True))
ret = ""
@ -552,9 +615,13 @@ def hash_node_ctx(node, hash_ctx):
def hash_do_append_string(hash_ctx, string: str):
def hash_do_append_string(hash_ctx, string):
# type: (SHA.SHA1Hash, str) -> None
str_bytes = bytes(string, encoding="utf-8")
if sys.version_info[0] >= 3:
str_bytes = bytes(string, encoding="utf-8")
else:
str_bytes = bytes(string)
length = len(str_bytes)
len_upper = int(length / 256)
@ -563,12 +630,14 @@ def hash_do_append_string(hash_ctx, string: str):
hash_do_append_raw_bytes(hash_ctx, [len_upper, len_lower])
hash_do_append_raw_bytes(hash_ctx, str_bytes)
def hash_do_append_tag(hash_ctx, tag: int):
def hash_do_append_tag(hash_ctx, tag):
# type: (SHA.SHA1Hash, int) -> None
if (tag > 5):
return
hash_do_append_raw_bytes(hash_ctx, [tag])
def hash_do_append_raw_bytes(hash_ctx, data: bytes):
def hash_do_append_raw_bytes(hash_ctx, data):
# type: (SHA.SHA1Hash, bytes) -> None
hash_ctx.update(bytearray(data))

View File

@ -12,25 +12,20 @@ except ImportError:
from Crypto.Util.asn1 import DerSequence
from Crypto.Cipher import PKCS1_v1_5
try:
from libadobe import addNonce, sign_node, sendRequestDocu, sendHTTPRequest
from libadobe import makeFingerprint, makeSerial, encrypt_with_device_key, decrypt_with_device_key
from libadobe import get_devkey_path, get_device_path, get_activation_xml_path
from libadobe import VAR_VER_SUPP_CONFIG_NAMES, VAR_VER_HOBBES_VERSIONS, VAR_VER_OS_IDENTIFIERS
from libadobe import VAR_VER_ALLOWED_BUILD_IDS_SWITCH_TO, VAR_VER_SUPP_VERSIONS, VAR_ACS_SERVER_HTTP
from libadobe import VAR_ACS_SERVER_HTTPS, VAR_VER_BUILD_IDS, VAR_VER_NEED_HTTPS_BUILD_ID_LIMIT, VAR_VER_ALLOWED_BUILD_IDS_AUTHORIZE
except:
from calibre_plugins.deacsm.libadobe import addNonce, sign_node, sendRequestDocu, sendHTTPRequest
from calibre_plugins.deacsm.libadobe import makeFingerprint, makeSerial, encrypt_with_device_key, decrypt_with_device_key
from calibre_plugins.deacsm.libadobe import get_devkey_path, get_device_path, get_activation_xml_path
from calibre_plugins.deacsm.libadobe import VAR_VER_SUPP_CONFIG_NAMES, VAR_VER_HOBBES_VERSIONS, VAR_VER_OS_IDENTIFIERS
from calibre_plugins.deacsm.libadobe import VAR_VER_ALLOWED_BUILD_IDS_SWITCH_TO, VAR_VER_SUPP_VERSIONS, VAR_ACS_SERVER_HTTP
from calibre_plugins.deacsm.libadobe import VAR_ACS_SERVER_HTTPS, VAR_VER_BUILD_IDS, VAR_VER_NEED_HTTPS_BUILD_ID_LIMIT, VAR_VER_ALLOWED_BUILD_IDS_AUTHORIZE
#@@CALIBRE_COMPAT_CODE@@
from libadobe import addNonce, sign_node, sendRequestDocu, sendHTTPRequest
from libadobe import makeFingerprint, makeSerial, encrypt_with_device_key, decrypt_with_device_key
from libadobe import get_devkey_path, get_device_path, get_activation_xml_path
from libadobe import VAR_VER_SUPP_CONFIG_NAMES, VAR_VER_HOBBES_VERSIONS, VAR_VER_OS_IDENTIFIERS
from libadobe import VAR_VER_ALLOWED_BUILD_IDS_SWITCH_TO, VAR_VER_SUPP_VERSIONS, VAR_ACS_SERVER_HTTP
from libadobe import VAR_ACS_SERVER_HTTPS, VAR_VER_BUILD_IDS, VAR_VER_NEED_HTTPS_BUILD_ID_LIMIT, VAR_VER_ALLOWED_BUILD_IDS_AUTHORIZE
def createDeviceFile(randomSerial, useVersionIndex = 0):
# type: (bool, int) -> bool
def createDeviceFile(randomSerial: bool, useVersionIndex: int = 0):
# Original implementation: Device::createDeviceFile(const std::string& hobbes, bool randomSerial)
if useVersionIndex >= len(VAR_VER_SUPP_CONFIG_NAMES):
@ -142,7 +137,7 @@ def getAuthMethodsAndCert():
def createUser(useVersionIndex: int = 0, authCert = None):
def createUser(useVersionIndex = 0, authCert = None):
if useVersionIndex >= len(VAR_VER_SUPP_CONFIG_NAMES):
return False, "Invalid Version index", [[], []]
@ -215,12 +210,11 @@ def createUser(useVersionIndex: int = 0, authCert = None):
return True, "Done"
def encryptLoginCredentials(username: str, password: str, authenticationCertificate: str):
def encryptLoginCredentials(username, password, authenticationCertificate):
# type: (str, str, str) -> bytes
try:
from calibre_plugins.deacsm.libadobe import devkey_bytes as devkey_adobe
except:
from libadobe import devkey_bytes as devkey_adobe
from libadobe import devkey_bytes as devkey_adobe
import struct
if devkey_adobe is not None:
devkey_bytes = devkey_adobe
@ -234,9 +228,9 @@ def encryptLoginCredentials(username: str, password: str, authenticationCertific
# Build buffer <devkey_bytes> <len username> <username> <len password> <password>
ar = bytearray(devkey_bytes)
ar.extend(bytearray(len(username).to_bytes(1, 'big')))
ar.extend(bytearray(struct.pack("B", len(username))))
ar.extend(bytearray(username.encode("latin-1")))
ar.extend(bytearray(len(password).to_bytes(1, 'big')))
ar.extend(bytearray(struct.pack("B", len(password))))
ar.extend(bytearray(password.encode("latin-1")))
# Crypt code from https://stackoverflow.com/a/12921889/4991648
@ -253,7 +247,9 @@ def encryptLoginCredentials(username: str, password: str, authenticationCertific
return crypted_msg
def buildSignInRequestForAnonAuthConvert(username: str, password: str, authenticationCertificate: str):
def buildSignInRequestForAnonAuthConvert(username, password, authenticationCertificate):
# type: (str, str, str) -> str
NSMAP = { "adept" : "http://ns.adobe.com/adept" }
etree.register_namespace("adept", NSMAP["adept"])
@ -283,7 +279,9 @@ def buildSignInRequestForAnonAuthConvert(username: str, password: str, authentic
return "<?xml version=\"1.0\"?>\n" + etree.tostring(root, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("latin-1")
def buildSignInRequest(type: str, username: str, password: str, authenticationCertificate: str):
def buildSignInRequest(type, username, password, authenticationCertificate):
# type: (str, str, str, str) -> str
NSMAP = { "adept" : "http://ns.adobe.com/adept" }
etree.register_namespace("adept", NSMAP["adept"])
@ -316,7 +314,8 @@ def buildSignInRequest(type: str, username: str, password: str, authenticationCe
return "<?xml version=\"1.0\"?>\n" + etree.tostring(root, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("latin-1")
def convertAnonAuthToAccount(username: str, passwd: str):
def convertAnonAuthToAccount(username, passwd):
# If you have an anonymous authorization, you can convert that to an AdobeID.
# Important: You can only do this ONCE for each AdobeID.
# The AdobeID you are using for this must not be connected to any ADE install.
@ -402,7 +401,7 @@ def convertAnonAuthToAccount(username: str, passwd: str):
def signIn(account_type: str, username: str, passwd: str):
def signIn(account_type, username, passwd):
# Get authenticationCertificate
@ -556,7 +555,7 @@ def exportProxyAuth(act_xml_path, activationToken):
def buildActivateReqProxy(useVersionIndex: int = 0, proxyData = None):
def buildActivateReqProxy(useVersionIndex = 0, proxyData = None):
if proxyData is None:
return False
@ -652,7 +651,7 @@ def buildActivateReqProxy(useVersionIndex: int = 0, proxyData = None):
return True, ret
def buildActivateReq(useVersionIndex: int = 0):
def buildActivateReq(useVersionIndex = 0):
if useVersionIndex >= len(VAR_VER_SUPP_CONFIG_NAMES):
return False
@ -727,7 +726,7 @@ def buildActivateReq(useVersionIndex: int = 0):
# Call this function to change from ADE2 to ADE3 and vice versa.
def changeDeviceVersion(useVersionIndex: int = 0):
def changeDeviceVersion(useVersionIndex = 0):
if useVersionIndex >= len(VAR_VER_SUPP_CONFIG_NAMES):
return False, "Invalid Version index"
@ -771,7 +770,7 @@ def changeDeviceVersion(useVersionIndex: int = 0):
def activateDevice(useVersionIndex: int = 0, proxyData = None):
def activateDevice(useVersionIndex = 0, proxyData = None):
if useVersionIndex >= len(VAR_VER_SUPP_CONFIG_NAMES):
return False, "Invalid Version index"
@ -881,7 +880,8 @@ def getAccountUUID():
return None
def exportAccountEncryptionKeyDER(output_file: str):
def exportAccountEncryptionKeyDER(output_file):
# type: (str) -> bool
try:
activationxml = etree.parse(get_activation_xml_path())
adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag)

View File

@ -1,16 +1,13 @@
from lxml import etree
import base64
try:
from libadobe import addNonce, sign_node, get_cert_from_pkcs12, sendRequestDocu, sendRequestDocuRC, sendHTTPRequest
from libadobe import get_devkey_path, get_device_path, get_activation_xml_path
from libadobe import VAR_VER_SUPP_VERSIONS, VAR_VER_SUPP_CONFIG_NAMES, VAR_VER_HOBBES_VERSIONS
from libadobe import VAR_VER_BUILD_IDS, VAR_VER_USE_DIFFERENT_NOTIFICATION_XML_ORDER
except:
from calibre_plugins.deacsm.libadobe import addNonce, sign_node, get_cert_from_pkcs12, sendRequestDocu, sendRequestDocuRC, sendHTTPRequest
from calibre_plugins.deacsm.libadobe import get_devkey_path, get_device_path, get_activation_xml_path
from calibre_plugins.deacsm.libadobe import VAR_VER_SUPP_VERSIONS, VAR_VER_SUPP_CONFIG_NAMES, VAR_VER_HOBBES_VERSIONS
from calibre_plugins.deacsm.libadobe import VAR_VER_BUILD_IDS, VAR_VER_USE_DIFFERENT_NOTIFICATION_XML_ORDER
#@@CALIBRE_COMPAT_CODE@@
from libadobe import addNonce, sign_node, get_cert_from_pkcs12, sendRequestDocu, sendRequestDocuRC, sendHTTPRequest
from libadobe import get_devkey_path, get_device_path, get_activation_xml_path
from libadobe import VAR_VER_SUPP_VERSIONS, VAR_VER_SUPP_CONFIG_NAMES, VAR_VER_HOBBES_VERSIONS
from libadobe import VAR_VER_BUILD_IDS, VAR_VER_USE_DIFFERENT_NOTIFICATION_XML_ORDER
def buildFulfillRequest(acsm):
@ -108,7 +105,8 @@ def buildFulfillRequest(acsm):
def buildInitLicenseServiceRequest(authURL: str):
def buildInitLicenseServiceRequest(authURL):
# type: (str) -> str
adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag)
@ -175,7 +173,9 @@ def buildAuthRequest():
return ret
def doOperatorAuth(operatorURL: str):
def doOperatorAuth(operatorURL):
# type: (str) -> str
auth_req = buildAuthRequest()
authURL = operatorURL
@ -212,7 +212,8 @@ def doOperatorAuth(operatorURL: str):
def operatorAuth(operatorURL: str):
def operatorAuth(operatorURL):
# type: (str) -> str
adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag)
NSMAP = { "adept" : "http://ns.adobe.com/adept" }
@ -749,7 +750,8 @@ def performFulfillmentNotification(fulfillmentResultToken, forceOptional = False
def fetchLicenseServiceCertificate(licenseURL: str, operatorURL: str):
def fetchLicenseServiceCertificate(licenseURL, operatorURL):
# Check if we already have a cert for this URL:
adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag)
NSMAP = { "adept" : "http://ns.adobe.com/adept" }

View File

@ -8,6 +8,10 @@ except ImportError:
# Some distros still ship this as Crypto
from Crypto.Cipher import AES as _AES
#@@CALIBRE_COMPAT_CODE@@
class AES(object):
def __init__(self, key, iv):
self._aes = _AES.new(key, _AES.MODE_CBC, iv)
@ -15,12 +19,8 @@ class AES(object):
return self._aes.decrypt(data)
try:
from libadobe import makeSerial, get_devkey_path, get_device_path, get_activation_xml_path
from libadobe import VAR_VER_HOBBES_VERSIONS, VAR_VER_OS_IDENTIFIERS, VAR_VER_DEFAULT_BUILD_ID, VAR_VER_BUILD_IDS
except:
from calibre_plugins.deacsm.libadobe import makeSerial, get_devkey_path, get_device_path, get_activation_xml_path
from calibre_plugins.deacsm.libadobe import VAR_VER_HOBBES_VERSIONS, VAR_VER_OS_IDENTIFIERS, VAR_VER_DEFAULT_BUILD_ID, VAR_VER_BUILD_IDS
from libadobe import makeSerial, get_devkey_path, get_device_path, get_activation_xml_path
from libadobe import VAR_VER_HOBBES_VERSIONS, VAR_VER_OS_IDENTIFIERS, VAR_VER_DEFAULT_BUILD_ID, VAR_VER_BUILD_IDS
def importADEactivationLinuxWine(wine_prefix_path, buildIDtoEmulate=VAR_VER_DEFAULT_BUILD_ID):

View File

@ -7,7 +7,10 @@ sys.path.append("../calibre-plugin")
import unittest
import base64
from freezegun import freeze_time
from unittest.mock import patch
if sys.version_info[0] >= 3:
from unittest.mock import patch
else:
from mock import patch
from lxml import etree
@ -38,9 +41,9 @@ class TestAdobe(unittest.TestCase):
def test_checkIfVersionListsAreValid(self):
'''
Check if version lists are sane
'''Check if version lists are sane'''
'''
These four lists must all have the same amount of elements.
Also, the default build ID must be valid, and all the IDs
available for authorization or switching must be valid, too.
@ -67,7 +70,7 @@ class TestAdobe(unittest.TestCase):
def test_fingerprintGeneration(self):
'''Check if fingerprint generation works'''
libadobe.devkey_bytes = bytes([0xf8, 0x7a, 0xfc, 0x8c, 0x75, 0x25, 0xdc, 0x4b, 0x83, 0xec, 0x0c, 0xe2, 0xab, 0x4b, 0xef, 0x51])
libadobe.devkey_bytes = bytearray([0xf8, 0x7a, 0xfc, 0x8c, 0x75, 0x25, 0xdc, 0x4b, 0x83, 0xec, 0x0c, 0xe2, 0xab, 0x4b, 0xef, 0x51])
self.assertEqual(libadobe.makeFingerprint("f0081bce3f771bdeeb26fcb4b2011fed77edff7b"), b"FgLMNXxv1BZPqMOM6IUnfaG4Qj8=", "Wrong fingerprint")
self.assertEqual(libadobe.makeFingerprint("HelloWorld123"), b"hpp223C1kfLDOoyxo8WR7KhcXB8=", "Wrong fingerprint")
@ -79,14 +82,14 @@ class TestAdobe(unittest.TestCase):
# Overwrite the get_random_bytes function that's used to get a random IV
# Forcing hard-coded IV ...
random.get_random_bytes._mock_side_effect = lambda rndlen: bytes([0xc2, 0x3b, 0x0f, 0xde, 0xf2, 0x4a, 0xc3, 0x03,
random.get_random_bytes._mock_side_effect = lambda rndlen: bytearray([0xc2, 0x3b, 0x0f, 0xde, 0xf2, 0x4a, 0xc3, 0x03,
0xae, 0xc8, 0x70, 0xd4, 0x46, 0x6c, 0x8b, 0xb0])
libadobe.devkey_bytes = bytes([0xf8, 0x7a, 0xfc, 0x8c, 0x75, 0x25, 0xdc, 0x4b, 0x83, 0xec, 0x0c, 0xe2, 0xab, 0x4b, 0xef, 0x51])
libadobe.devkey_bytes = bytearray([0xf8, 0x7a, 0xfc, 0x8c, 0x75, 0x25, 0xdc, 0x4b, 0x83, 0xec, 0x0c, 0xe2, 0xab, 0x4b, 0xef, 0x51])
mock_data = b"Test message"
mock_result = libadobe.encrypt_with_device_key(mock_data)
expected_result = bytes([0xc2, 0x3b, 0x0f, 0xde, 0xf2, 0x4a, 0xc3, 0x03, 0xae, 0xc8, 0x70, 0xd4,
expected_result = bytearray([0xc2, 0x3b, 0x0f, 0xde, 0xf2, 0x4a, 0xc3, 0x03, 0xae, 0xc8, 0x70, 0xd4,
0x46, 0x6c, 0x8b, 0xb0, 0x23, 0x5a, 0xd3, 0x1b, 0x4e, 0x2b, 0x12, 0x79,
0x85, 0x63, 0x2d, 0x01, 0xa4, 0xe8, 0x29, 0x22])
@ -95,9 +98,9 @@ class TestAdobe(unittest.TestCase):
def test_deviceKeyDecryption(self):
'''Check if decryption with the device key works'''
libadobe.devkey_bytes = bytes([0xf8, 0x7a, 0xfc, 0x8c, 0x75, 0x25, 0xdc, 0x4b, 0x83, 0xec, 0x0c, 0xe2, 0xab, 0x4b, 0xef, 0x51])
libadobe.devkey_bytes = bytearray([0xf8, 0x7a, 0xfc, 0x8c, 0x75, 0x25, 0xdc, 0x4b, 0x83, 0xec, 0x0c, 0xe2, 0xab, 0x4b, 0xef, 0x51])
mock_data = bytes([0xc2, 0x3b, 0x0f, 0xde, 0xf2, 0x4a, 0xc3, 0x03, 0xae, 0xc8, 0x70, 0xd4,
mock_data = bytearray([0xc2, 0x3b, 0x0f, 0xde, 0xf2, 0x4a, 0xc3, 0x03, 0xae, 0xc8, 0x70, 0xd4,
0x46, 0x6c, 0x8b, 0xb0, 0x23, 0x5a, 0xd3, 0x1b, 0x4e, 0x2b, 0x12, 0x79,
0x85, 0x63, 0x2d, 0x01, 0xa4, 0xe8, 0x29, 0x22])
@ -175,7 +178,7 @@ class TestAdobe(unittest.TestCase):
# in case we need to go back to the old method.
mock_signing_key = "MIICdAIBADANBgkqhkiG9w0BAQEFAASCAl4wggJaAgEAAoGBALluuPvdDpr4L0j3eIGy3VxhgRcEKU3++qwbdvLXI99/izW9kfELFFJtq5d4ktIIUIvHsWkW0jblGi+bQ4sQXCeIvtOgqVHMSvRpW78lnGEkdD4Y1qhbcVGw7OGpWlhp8qCJKVCGbrkML7BSwFvQqqvg4vMU8O1uALfJvicKN3YfAgMBAAECf3uEg+Hr+DrstHhZF40zJPHKG3FkFd3HerXbOawMH5Q6CKTuKDGmOYQD+StFIlMArQJh8fxTVM3gSqgPkyyiesw0OuECU985FaLbUWxuCQzBcitnhl+VSv19oEPHTJWu0nYabasfT4oPjf8eiWR/ymJ9DZrjMWWy4Xf/S+/nFYUCQQDIZ1pc9nZsCB4QiBl5agTXoMcKavxFHPKxI/mHfRCHYjNyirziBJ+Dc/N40zKvldNBjO43KjLhUZs/BxdAJo09AkEA7OAdsg6SmviVV8xk0vuTmgLxhD7aZ9vpV4KF5+TH2DbximFoOP3YRObXV862wAjCpa84v43ok7Imtsu3NKQ+iwJAc0mx3GUU/1U0JoKFVSm+m2Ws27tsYT4kB/AQLvetuJSv0CcsPkI2meLsoAev0v84Ry+SIz4tgx31V672mzsSaQJBAJET1rw2Vq5Zr8Y9ZkceVFGQmfGAOW5A71Jsm6zin0+anyc874NwXaQdqiiab61/8A9gGSahOKA1DacJcCTqr28CQGm4mn3rOQFf+nniajIobATjNHaZJ76Xnc6rtoreK6+ZjO9wYF+797X/bhiV11Fpakvyrz6+t7bAd0PPQ2taTDg="
payload_bytes = bytes([0x34, 0x52, 0xe3, 0xd1, 0x1c, 0xdd, 0x70, 0xeb, 0x90, 0x32, 0x3f, 0x29, 0x1c, 0x06, 0xaf, 0xaf, 0xe1, 0x0e, 0x09, 0x8a])
payload_bytes = bytearray([0x34, 0x52, 0xe3, 0xd1, 0x1c, 0xdd, 0x70, 0xeb, 0x90, 0x32, 0x3f, 0x29, 0x1c, 0x06, 0xaf, 0xaf, 0xe1, 0x0e, 0x09, 0x8a])
try:
import rsa
@ -184,7 +187,10 @@ class TestAdobe(unittest.TestCase):
key = rsa.PrivateKey.load_pkcs1(RSA.importKey(base64.b64decode(mock_signing_key)).exportKey())
keylen = rsa.pkcs1.common.byte_size(key.n)
padded = rsa.pkcs1._pad_for_signing(payload_bytes, keylen)
if sys.version_info[0] >= 3:
padded = rsa.pkcs1._pad_for_signing(bytes(payload_bytes), keylen)
else:
padded = rsa.pkcs1._pad_for_signing(bytes(payload_bytes), keylen)
payload = rsa.pkcs1.transform.bytes2int(padded)
encrypted = key.blinded_encrypt(payload)
block = rsa.pkcs1.transform.int2bytes(encrypted, keylen)
@ -200,7 +206,7 @@ class TestAdobe(unittest.TestCase):
'''Check if the builtin CustomRSA library signs correctly'''
mock_signing_key = "MIICdAIBADANBgkqhkiG9w0BAQEFAASCAl4wggJaAgEAAoGBALluuPvdDpr4L0j3eIGy3VxhgRcEKU3++qwbdvLXI99/izW9kfELFFJtq5d4ktIIUIvHsWkW0jblGi+bQ4sQXCeIvtOgqVHMSvRpW78lnGEkdD4Y1qhbcVGw7OGpWlhp8qCJKVCGbrkML7BSwFvQqqvg4vMU8O1uALfJvicKN3YfAgMBAAECf3uEg+Hr+DrstHhZF40zJPHKG3FkFd3HerXbOawMH5Q6CKTuKDGmOYQD+StFIlMArQJh8fxTVM3gSqgPkyyiesw0OuECU985FaLbUWxuCQzBcitnhl+VSv19oEPHTJWu0nYabasfT4oPjf8eiWR/ymJ9DZrjMWWy4Xf/S+/nFYUCQQDIZ1pc9nZsCB4QiBl5agTXoMcKavxFHPKxI/mHfRCHYjNyirziBJ+Dc/N40zKvldNBjO43KjLhUZs/BxdAJo09AkEA7OAdsg6SmviVV8xk0vuTmgLxhD7aZ9vpV4KF5+TH2DbximFoOP3YRObXV862wAjCpa84v43ok7Imtsu3NKQ+iwJAc0mx3GUU/1U0JoKFVSm+m2Ws27tsYT4kB/AQLvetuJSv0CcsPkI2meLsoAev0v84Ry+SIz4tgx31V672mzsSaQJBAJET1rw2Vq5Zr8Y9ZkceVFGQmfGAOW5A71Jsm6zin0+anyc874NwXaQdqiiab61/8A9gGSahOKA1DacJcCTqr28CQGm4mn3rOQFf+nniajIobATjNHaZJ76Xnc6rtoreK6+ZjO9wYF+797X/bhiV11Fpakvyrz6+t7bAd0PPQ2taTDg="
payload_bytes = bytes([0x34, 0x52, 0xe3, 0xd1, 0x1c, 0xdd, 0x70, 0xeb, 0x90, 0x32, 0x3f, 0x29, 0x1c, 0x06, 0xaf, 0xaf, 0xe1, 0x0e, 0x09, 0x8a])
payload_bytes = bytearray([0x34, 0x52, 0xe3, 0xd1, 0x1c, 0xdd, 0x70, 0xeb, 0x90, 0x32, 0x3f, 0x29, 0x1c, 0x06, 0xaf, 0xaf, 0xe1, 0x0e, 0x09, 0x8a])
block = CustomRSA.encrypt_for_adobe_signature(base64.b64decode(mock_signing_key), payload_bytes)
signature = base64.b64encode(block).decode()
@ -287,7 +293,7 @@ class TestAdobe(unittest.TestCase):
user = "username"
passwd = "unit-test-password"
libadobe.devkey_bytes = bytes([0xf8, 0x7a, 0xfc, 0x8c, 0x75, 0x25, 0xdc, 0x4b, 0x83, 0xec, 0x0c, 0xe2, 0xab, 0x4b, 0xef, 0x51])
libadobe.devkey_bytes = bytearray([0xf8, 0x7a, 0xfc, 0x8c, 0x75, 0x25, 0xdc, 0x4b, 0x83, 0xec, 0x0c, 0xe2, 0xab, 0x4b, 0xef, 0x51])
encrypted = libadobeAccount.encryptLoginCredentials(user, passwd, mock_auth_certificate)
# Okay, now try to decrypt this again:
@ -296,13 +302,18 @@ class TestAdobe(unittest.TestCase):
cipher_engine = PKCS1_v1_5.new(pkey)
msg = cipher_engine.decrypt(encrypted, bytes([0x00] * 16))
import struct
expected_msg = bytearray(libadobe.devkey_bytes)
expected_msg.extend(bytearray(len(user).to_bytes(1, 'big')))
expected_msg.extend(bytearray(struct.pack("B", len(user))))
expected_msg.extend(bytearray(user.encode("latin-1")))
expected_msg.extend(bytearray(len(passwd).to_bytes(1, 'big')))
expected_msg.extend(bytearray(struct.pack("B", len(passwd))))
expected_msg.extend(bytearray(passwd.encode("latin-1")))
self.assertEqual(msg.hex(), expected_msg.hex(), "devkey encryption returned invalid result")
if sys.version_info[0] >= 3:
self.assertEqual(msg.hex(), expected_msg.hex(), "devkey encryption returned invalid result")
else:
self.assertEqual(msg, expected_msg, "devkey encryption returned invalid result")
@ -322,7 +333,7 @@ class TestOther(unittest.TestCase):
'''Check if Wine username decoder is working properly'''
self.assertEqual(fix_wine_username(r'"1234"'), b'1234', "Wine username mismatch")
self.assertEqual(fix_wine_username(r'"a\x00e931"'), b'a\xe931', "Wine username mismatch")
self.assertEqual(fix_wine_username(r'"a\x00e931"'), b'a\xe931', "Wine username mismatch with UTF-8")
def test_pdf_trimEncrypt(self):
'''Check if PDF encryption string trimming code is working properly'''