DeDRM_tools/DeDRM_plugin/topazextract.py

493 lines
16 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
2013-10-03 00:59:40 +06:00
# -*- coding: utf-8 -*-
from __future__ import print_function
2013-10-03 00:59:40 +06:00
# topazextract.py
# Mostly written by some_updates based on code from many others
# Changelog
# 4.9 - moved unicode_argv call inside main for Windows DeDRM compatibility
# 5.0 - Fixed potential unicode problem with command line interface
# 6.0 - Added Python 3 compatibility for calibre 5.0
2013-10-03 00:59:40 +06:00
__version__ = '6.0'
2013-10-03 00:59:40 +06:00
import sys
import os, csv, getopt
#@@CALIBRE_COMPAT_CODE@@
2013-10-03 00:59:40 +06:00
import zlib, zipfile, tempfile, shutil
import traceback
from struct import pack
from struct import unpack
2023-08-04 00:45:06 +06:00
from .alfcrypto import Topaz_Cipher
from .utilities import SafeUnbuffered
2013-10-03 00:59:40 +06:00
2023-08-04 00:45:06 +06:00
from .argv_utils import unicode_argv
2013-10-03 00:59:40 +06:00
#global switch
debug = False
import kgenpids
2013-10-03 00:59:40 +06:00
class DrmException(Exception):
pass
# recursive zip creation support routine
def zipUpDir(myzip, tdir, localname):
currentdir = tdir
if localname != "":
2013-10-03 00:59:40 +06:00
currentdir = os.path.join(currentdir,localname)
list = os.listdir(currentdir)
for file in list:
afilename = file
localfilePath = os.path.join(localname, afilename)
realfilePath = os.path.join(currentdir,file)
if os.path.isfile(realfilePath):
myzip.write(realfilePath, localfilePath)
elif os.path.isdir(realfilePath):
zipUpDir(myzip, tdir, localfilePath)
#
# Utility routines
#
# Get a 7 bit encoded number from file
def bookReadEncodedNumber(fo):
flag = False
data = ord(fo.read(1))
if data == 0xFF:
flag = True
data = ord(fo.read(1))
if data >= 0x80:
datax = (data & 0x7F)
while data >= 0x80 :
data = ord(fo.read(1))
datax = (datax <<7) + (data & 0x7F)
data = datax
if flag:
data = -data
return data
# Get a length prefixed string from file
def bookReadString(fo):
stringLength = bookReadEncodedNumber(fo)
return unpack(str(stringLength)+'s',fo.read(stringLength))[0]
#
# crypto routines
#
# Context initialisation for the Topaz Crypto
def topazCryptoInit(key):
return Topaz_Cipher().ctx_init(key)
# ctx1 = 0x0CAFFE19E
# for keyChar in key:
# keyByte = ord(keyChar)
# ctx2 = ctx1
# ctx1 = ((((ctx1 >>2) * (ctx1 >>7))&0xFFFFFFFF) ^ (keyByte * keyByte * 0x0F902007)& 0xFFFFFFFF )
# return [ctx1,ctx2]
# decrypt data with the context prepared by topazCryptoInit()
def topazCryptoDecrypt(data, ctx):
return Topaz_Cipher().decrypt(data, ctx)
# ctx1 = ctx[0]
# ctx2 = ctx[1]
# plainText = ""
# for dataChar in data:
# dataByte = ord(dataChar)
# m = (dataByte ^ ((ctx1 >> 3) &0xFF) ^ ((ctx2<<3) & 0xFF)) &0xFF
# ctx2 = ctx1
# ctx1 = (((ctx1 >> 2) * (ctx1 >> 7)) &0xFFFFFFFF) ^((m * m * 0x0F902007) &0xFFFFFFFF)
# plainText += chr(m)
# return plainText
# Decrypt data with the PID
def decryptRecord(data,PID):
ctx = topazCryptoInit(PID)
return topazCryptoDecrypt(data, ctx)
# Try to decrypt a dkey record (contains the bookPID)
def decryptDkeyRecord(data,PID):
record = decryptRecord(data,PID)
2021-12-28 23:34:11 +06:00
if isinstance(record, str):
record = record.encode('latin-1')
2013-10-03 00:59:40 +06:00
fields = unpack('3sB8sB8s3s',record)
if fields[0] != b'PID' or fields[5] != b'pid' :
raise DrmException("Didn't find PID magic numbers in record")
2013-10-03 00:59:40 +06:00
elif fields[1] != 8 or fields[3] != 8 :
raise DrmException("Record didn't contain correct length fields")
2013-10-03 00:59:40 +06:00
elif fields[2] != PID :
raise DrmException("Record didn't contain PID")
2013-10-03 00:59:40 +06:00
return fields[4]
# Decrypt all dkey records (contain the book PID)
def decryptDkeyRecords(data,PID):
nbKeyRecords = data[0]
2013-10-03 00:59:40 +06:00
records = []
data = data[1:]
for i in range (0,nbKeyRecords):
length = data[0]
2013-10-03 00:59:40 +06:00
try:
key = decryptDkeyRecord(data[1:length+1],PID)
records.append(key)
except DrmException:
pass
data = data[1+length:]
if len(records) == 0:
raise DrmException("BookKey Not Found")
2013-10-03 00:59:40 +06:00
return records
class TopazBook:
def __init__(self, filename):
self.fo = open(filename, 'rb')
2013-10-03 00:59:40 +06:00
self.outdir = tempfile.mkdtemp()
# self.outdir = 'rawdat'
self.bookPayloadOffset = 0
self.bookHeaderRecords = {}
self.bookMetadata = {}
self.bookKey = None
magic = unpack('4s',self.fo.read(4))[0]
if magic != b'TPZ0':
raise DrmException("Parse Error : Invalid Header, not a Topaz file")
2013-10-03 00:59:40 +06:00
self.parseTopazHeaders()
self.parseMetadata()
def parseTopazHeaders(self):
def bookReadHeaderRecordData():
# Read and return the data of one header record at the current book file position
# [[offset,decompressedLength,compressedLength],...]
nbValues = bookReadEncodedNumber(self.fo)
if debug: print("%d records in header " % nbValues, end=' ')
2013-10-03 00:59:40 +06:00
values = []
for i in range (0,nbValues):
values.append([bookReadEncodedNumber(self.fo),bookReadEncodedNumber(self.fo),bookReadEncodedNumber(self.fo)])
return values
def parseTopazHeaderRecord():
# Read and parse one header record at the current book file position and return the associated data
# [[offset,decompressedLength,compressedLength],...]
if ord(self.fo.read(1)) != 0x63:
raise DrmException("Parse Error : Invalid Header")
2013-10-03 00:59:40 +06:00
tag = bookReadString(self.fo)
record = bookReadHeaderRecordData()
return [tag,record]
nbRecords = bookReadEncodedNumber(self.fo)
if debug: print("Headers: %d" % nbRecords)
2013-10-03 00:59:40 +06:00
for i in range (0,nbRecords):
result = parseTopazHeaderRecord()
if debug: print(result[0], ": ", result[1])
2013-10-03 00:59:40 +06:00
self.bookHeaderRecords[result[0]] = result[1]
if ord(self.fo.read(1)) != 0x64 :
raise DrmException("Parse Error : Invalid Header")
2013-10-03 00:59:40 +06:00
self.bookPayloadOffset = self.fo.tell()
def parseMetadata(self):
# Parse the metadata record from the book payload and return a list of [key,values]
self.fo.seek(self.bookPayloadOffset + self.bookHeaderRecords[b'metadata'][0][0])
2013-10-03 00:59:40 +06:00
tag = bookReadString(self.fo)
if tag != b'metadata' :
raise DrmException("Parse Error : Record Names Don't Match")
2013-10-03 00:59:40 +06:00
flags = ord(self.fo.read(1))
nbRecords = ord(self.fo.read(1))
if debug: print("Metadata Records: %d" % nbRecords)
2013-10-03 00:59:40 +06:00
for i in range (0,nbRecords) :
keyval = bookReadString(self.fo)
content = bookReadString(self.fo)
if debug: print(keyval)
if debug: print(content)
2013-10-03 00:59:40 +06:00
self.bookMetadata[keyval] = content
return self.bookMetadata
def getPIDMetaInfo(self):
keysRecord = self.bookMetadata.get(b'keys',b'')
keysRecordRecord = b''
if keysRecord != b'':
keylst = keysRecord.split(b',')
2013-10-03 00:59:40 +06:00
for keyval in keylst:
keysRecordRecord += self.bookMetadata.get(keyval,b'')
2013-10-03 00:59:40 +06:00
return keysRecord, keysRecordRecord
def getBookTitle(self):
title = b''
if b'Title' in self.bookMetadata:
title = self.bookMetadata[b'Title']
2013-10-03 00:59:40 +06:00
return title.decode('utf-8')
def setBookKey(self, key):
self.bookKey = key
def getBookPayloadRecord(self, name, index):
# Get a record in the book payload, given its name and index.
# decrypted and decompressed if necessary
encrypted = False
compressed = False
try:
recordOffset = self.bookHeaderRecords[name][index][0]
except:
raise DrmException("Parse Error : Invalid Record, record not found")
self.fo.seek(self.bookPayloadOffset + recordOffset)
tag = bookReadString(self.fo)
if tag != name :
raise DrmException("Parse Error : Invalid Record, record name doesn't match")
recordIndex = bookReadEncodedNumber(self.fo)
if recordIndex < 0 :
encrypted = True
recordIndex = -recordIndex -1
if recordIndex != index :
raise DrmException("Parse Error : Invalid Record, index doesn't match")
if (self.bookHeaderRecords[name][index][2] > 0):
compressed = True
record = self.fo.read(self.bookHeaderRecords[name][index][2])
2012-11-20 19:28:12 +06:00
else:
2013-10-03 00:59:40 +06:00
record = self.fo.read(self.bookHeaderRecords[name][index][1])
if encrypted:
if self.bookKey:
ctx = topazCryptoInit(self.bookKey)
record = topazCryptoDecrypt(record,ctx)
else :
raise DrmException("Error: Attempt to decrypt without bookKey")
if compressed:
2021-12-28 23:34:11 +06:00
if isinstance(record, str):
record = bytes(record, 'latin-1')
2013-10-03 00:59:40 +06:00
record = zlib.decompress(record)
return record
def processBook(self, pidlst):
raw = 0
fixedimage=True
try:
keydata = self.getBookPayloadRecord(b'dkey', 0)
except DrmException as e:
print("no dkey record found, book may not be encrypted")
2021-12-29 14:14:35 +06:00
print("attempting to extract files without a book key")
2013-10-03 00:59:40 +06:00
self.createBookDirectory()
self.extractFiles()
print("Successfully Extracted Topaz contents")
import genbook
2013-10-03 00:59:40 +06:00
rv = genbook.generateBook(self.outdir, raw, fixedimage)
if rv == 0:
print("Book Successfully generated.")
2013-10-03 00:59:40 +06:00
return rv
# try each pid to decode the file
bookKey = None
for pid in pidlst:
# use 8 digit pids here
pid = pid[0:8]
2021-12-28 23:34:11 +06:00
if isinstance(pid, str):
pid = pid.encode('latin-1')
print("Trying: {0}".format(pid))
2013-10-03 00:59:40 +06:00
bookKeys = []
data = keydata
try:
bookKeys+=decryptDkeyRecords(data,pid)
except DrmException as e:
2013-10-03 00:59:40 +06:00
pass
else:
bookKey = bookKeys[0]
print("Book Key Found! ({0})".format(bookKey.hex()))
2013-04-05 22:44:48 +06:00
break
2013-10-03 00:59:40 +06:00
if not bookKey:
2021-12-29 14:14:35 +06:00
raise DrmException("No key found in {0:d} keys tried. Read the FAQs at noDRM's repository: https://github.com/noDRM/DeDRM_tools/blob/master/FAQs.md".format(len(pidlst)))
2013-10-03 00:59:40 +06:00
self.setBookKey(bookKey)
self.createBookDirectory()
self.extractFiles()
print("Successfully Extracted Topaz contents")
import genbook
2013-10-03 00:59:40 +06:00
rv = genbook.generateBook(self.outdir, raw, fixedimage)
if rv == 0:
print("Book Successfully generated")
2013-10-03 00:59:40 +06:00
return rv
def createBookDirectory(self):
outdir = self.outdir
# create output directory structure
if not os.path.exists(outdir):
os.makedirs(outdir)
destdir = os.path.join(outdir,"img")
2013-10-03 00:59:40 +06:00
if not os.path.exists(destdir):
os.makedirs(destdir)
destdir = os.path.join(outdir,"color_img")
2013-10-03 00:59:40 +06:00
if not os.path.exists(destdir):
os.makedirs(destdir)
destdir = os.path.join(outdir,"page")
2013-10-03 00:59:40 +06:00
if not os.path.exists(destdir):
os.makedirs(destdir)
destdir = os.path.join(outdir,"glyphs")
2013-10-03 00:59:40 +06:00
if not os.path.exists(destdir):
os.makedirs(destdir)
def extractFiles(self):
outdir = self.outdir
for headerRecord in self.bookHeaderRecords:
name = headerRecord
if name != b'dkey':
ext = ".dat"
if name == b'img': ext = ".jpg"
if name == b'color' : ext = ".jpg"
print("Processing Section: {0}\n. . .".format(name.decode('utf-8')), end=' ')
2013-10-03 00:59:40 +06:00
for index in range (0,len(self.bookHeaderRecords[name])) :
fname = "{0}{1:04d}{2}".format(name.decode('utf-8'),index,ext)
2013-10-03 00:59:40 +06:00
destdir = outdir
if name == b'img':
destdir = os.path.join(outdir,"img")
if name == b'color':
destdir = os.path.join(outdir,"color_img")
if name == b'page':
destdir = os.path.join(outdir,"page")
if name == b'glyphs':
destdir = os.path.join(outdir,"glyphs")
2013-10-03 00:59:40 +06:00
outputFile = os.path.join(destdir,fname)
print(".", end=' ')
2013-10-03 00:59:40 +06:00
record = self.getBookPayloadRecord(name,index)
2021-12-28 23:34:11 +06:00
if isinstance(record, str):
record=bytes(record, 'latin-1')
if record != b'':
open(outputFile, 'wb').write(record)
print(" ")
2013-10-03 00:59:40 +06:00
def getFile(self, zipname):
htmlzip = zipfile.ZipFile(zipname,'w',zipfile.ZIP_DEFLATED, False)
htmlzip.write(os.path.join(self.outdir,"book.html"),"book.html")
htmlzip.write(os.path.join(self.outdir,"book.opf"),"book.opf")
if os.path.isfile(os.path.join(self.outdir,"cover.jpg")):
htmlzip.write(os.path.join(self.outdir,"cover.jpg"),"cover.jpg")
htmlzip.write(os.path.join(self.outdir,"style.css"),"style.css")
zipUpDir(htmlzip, self.outdir, "img")
2013-10-03 00:59:40 +06:00
htmlzip.close()
def getBookType(self):
return "Topaz"
2013-10-03 00:59:40 +06:00
def getBookExtension(self):
return ".htmlz"
2013-10-03 00:59:40 +06:00
def getSVGZip(self, zipname):
svgzip = zipfile.ZipFile(zipname,'w',zipfile.ZIP_DEFLATED, False)
svgzip.write(os.path.join(self.outdir,"index_svg.xhtml"),"index_svg.xhtml")
zipUpDir(svgzip, self.outdir, "svg")
zipUpDir(svgzip, self.outdir, "img")
2013-10-03 00:59:40 +06:00
svgzip.close()
def cleanup(self):
if os.path.isdir(self.outdir):
shutil.rmtree(self.outdir, True)
def usage(progname):
print("Removes DRM protection from Topaz ebooks and extracts the contents")
print("Usage:")
print(" {0} [-k <kindle.k4i>] [-p <comma separated PIDs>] [-s <comma separated Kindle serial numbers>] <infile> <outdir>".format(progname))
2013-10-03 00:59:40 +06:00
# Main
def cli_main():
2022-08-07 00:19:18 +06:00
argv=unicode_argv("topazextract.py")
2013-10-03 00:59:40 +06:00
progname = os.path.basename(argv[0])
print("TopazExtract v{0}.".format(__version__))
2013-10-03 00:59:40 +06:00
try:
opts, args = getopt.getopt(argv[1:], "k:p:s:x")
except getopt.GetoptError as err:
print("Error in options or arguments: {0}".format(err.args[0]))
2013-10-03 00:59:40 +06:00
usage(progname)
return 1
if len(args)<2:
usage(progname)
return 1
infile = args[0]
outdir = args[1]
if not os.path.isfile(infile):
print("Input File {0} Does Not Exist.".format(infile))
2013-10-03 00:59:40 +06:00
return 1
if not os.path.exists(outdir):
print("Output Directory {0} Does Not Exist.".format(outdir))
2013-10-03 00:59:40 +06:00
return 1
kDatabaseFiles = []
serials = []
pids = []
for o, a in opts:
if o == '-k':
if a == None :
raise DrmException("Invalid parameter for -k")
kDatabaseFiles.append(a)
if o == '-p':
if a == None :
raise DrmException("Invalid parameter for -p")
pids = a.split(',')
if o == '-s':
if a == None :
raise DrmException("Invalid parameter for -s")
serials = [serial.replace(" ","") for serial in a.split(',')]
bookname = os.path.splitext(os.path.basename(infile))[0]
tb = TopazBook(infile)
title = tb.getBookTitle()
print("Processing Book: {0}".format(title))
2013-10-03 00:59:40 +06:00
md1, md2 = tb.getPIDMetaInfo()
pids.extend(kgenpids.getPidList(md1, md2, serials, kDatabaseFiles))
try:
print("Decrypting Book")
2013-10-03 00:59:40 +06:00
tb.processBook(pids)
print(" Creating HTML ZIP Archive")
zipname = os.path.join(outdir, bookname + "_nodrm.htmlz")
2013-10-03 00:59:40 +06:00
tb.getFile(zipname)
print(" Creating SVG ZIP Archive")
zipname = os.path.join(outdir, bookname + "_SVG.zip")
2013-10-03 00:59:40 +06:00
tb.getSVGZip(zipname)
# removing internal temporary directory of pieces
tb.cleanup()
except DrmException as e:
print("Decryption failed\n{0}".format(traceback.format_exc()))
2013-10-03 00:59:40 +06:00
try:
tb.cleanup()
except:
pass
return 1
except Exception as e:
print("Decryption failed\n{0}".format(traceback.format_exc()))
2013-10-03 00:59:40 +06:00
try:
tb.cleanup()
except:
pass
return 1
return 0
if __name__ == '__main__':
sys.stdout=SafeUnbuffered(sys.stdout)
sys.stderr=SafeUnbuffered(sys.stderr)
sys.exit(cli_main())