v0.0.9: Add support for FulfillmentNotification and LoanReturn

This commit is contained in:
Florian Bach 2021-10-04 15:36:25 +02:00
parent 36b28765dd
commit 8e8ca43c7e
7 changed files with 510 additions and 12 deletions

View File

@ -21,6 +21,14 @@ IMPORTANT:
- Support for PDFs might be unreliable. You will need to apply pull request #1689 (including my additional bugfix in the comments of that PR) to the DeDRM plugin in order to remove the DRM from PDF files. If you still encounter an issue with a PDF file created by this tool even with these bugfixes, please report a bug (in this repository, not in the DeDRM one) and attach the corrupted PDF.
- This software is not approved by Adobe. I am not responsible if Adobe detects that you're using nonstandard software and bans your account. Do not complain to me if Adobe bans your main ADE account - you have been warned.
## Returning books
If a book is marked as returnable (like a library book), you can "return" it to the library using this plugin.
Just open the plugin settings, click "Show loaned books" (the option is only visible if you have at least one loaned book that's been downloaded with this plugin), select the book, then click the arrow button to return. Or click the "X" button to just remove the loan record from the list without returning the book.
This makes the book available for someone else again, but it does not automatically get deleted from your Calibre library - you are responsible for doing that after returning a book.
Note: You can only return books that you downloaded with version 0.0.9 (or newer) of this plugin. You cannot return books downloaded with ADE or with earlier versions of this plugin.
## Standalone version
@ -38,5 +46,4 @@ There's a bunch of features that could still be added, but most of them aren't i
- Support for anonymous Adobe IDs
- Support for un-authorizing a machine
- Support for returning loan books
- ...

View File

@ -13,10 +13,11 @@
# v0.0.6: First PDF support, allow importing previously exported activation data.
# v0.0.7: More PDF logging, PDF reading in latin-1, MacOS locale bugfix
# v0.0.8: More PDF bugfixes, support unlimited PDF file sizes, tell Calibre ACSMs are books.
# v0.0.9: Add FulfillmentNotification support, add LoanReturn support.
from calibre.customize import FileTypePlugin # type: ignore
__version__ = '0.0.8'
__version__ = '0.0.9'
PLUGIN_NAME = "DeACSM"
PLUGIN_VERSION_TUPLE = tuple([int(x) for x in __version__.split(".")])
@ -308,7 +309,12 @@ class DeACSM(FileTypePlugin):
traceback.print_exc()
success, replyData = fulfill(path_to_ebook)
import calibre_plugins.deacsm.prefs as prefs # type: ignore
deacsmprefs = prefs.DeACSM_Prefs()
success, replyData = fulfill(path_to_ebook, deacsmprefs["notify_fulfillment"])
if (success is False):
print("{0} v{1}: Hey, that didn't work: \n".format(PLUGIN_NAME, PLUGIN_VERSION) + replyData)
else:

View File

@ -7,11 +7,15 @@ import os, base64, traceback
from lxml import etree
import time, datetime
from PyQt5.Qt import (Qt, QWidget, QHBoxLayout, QVBoxLayout, QLabel, QLineEdit,
QGroupBox, QPushButton, QListWidget, QListWidgetItem, QInputDialog,
QLineEdit, QAbstractItemView, QIcon, QDialog, QDialogButtonBox, QUrl)
from PyQt5 import QtCore
from PyQt5 import Qt as QtGui
from zipfile import ZipFile
@ -36,6 +40,10 @@ class ConfigWidget(QWidget):
self.tempdeacsmprefs = {}
self.tempdeacsmprefs['path_to_account_data'] = self.deacsmprefs['path_to_account_data']
self.tempdeacsmprefs['notify_fulfillment'] = self.deacsmprefs['notify_fulfillment']
self.tempdeacsmprefs['list_of_rented_books'] = self.deacsmprefs['list_of_rented_books']
# Start Qt Gui dialog layout
layout = QVBoxLayout(self)
@ -76,6 +84,21 @@ class ConfigWidget(QWidget):
self.button_export_activation.setEnabled(activated)
ua_group_box_layout.addWidget(self.button_export_activation)
self.button_rented_books = QtGui.QPushButton(self)
self.button_rented_books.setText(_("Show loaned books"))
self.button_rented_books.clicked.connect(self.show_rented_books)
self.button_rented_books.setEnabled(activated)
ua_group_box_layout.addWidget(self.button_rented_books)
if (len(self.deacsmprefs["list_of_rented_books"]) == 0):
self.button_rented_books.setEnabled(False)
self.chkNotifyFulfillment = QtGui.QCheckBox("Notify ACS server after successful fulfillment")
self.chkNotifyFulfillment.setToolTip("Default: True\n\nIf this is enabled, the ACS server will receive a notification once the ACSM has successfully been converted. \nThis is not strictly necessary, but it is what ADE does, so it's probably safer to just do it as well.\nAlso, it is required to be able to return loaned books.")
self.chkNotifyFulfillment.setChecked(self.tempdeacsmprefs["notify_fulfillment"])
layout.addWidget(self.chkNotifyFulfillment)
try:
from calibre_plugins.deacsm.libadobe import VAR_HOBBES_VERSION, createDeviceKeyFile, update_account_path
@ -288,6 +311,7 @@ class ConfigWidget(QWidget):
def save_settings(self):
#self.deacsmprefs.set('path_to_account_data', self.txtboxUA.text())
self.deacsmprefs.set('notify_fulfillment', self.chkNotifyFulfillment.isChecked())
self.deacsmprefs.writeprefs()
def load_resource(self, name):
@ -296,3 +320,176 @@ class ConfigWidget(QWidget):
return zf.read(name).decode('utf-8')
return ""
def show_rented_books(self):
d = RentedBooksDialog(self, self.deacsmprefs["list_of_rented_books"])
d.exec_()
class RentedBooksDialog(QDialog):
def __init__(self, parent, booklist):
QDialog.__init__(self,parent)
self.parent = parent
self.setWindowTitle("DeACSM: Manage loaned Books")
# Start Qt Gui dialog layout
layout = QVBoxLayout(self)
self.setLayout(layout)
keys_group_box = QGroupBox("List of loaned books", self)
layout.addWidget(keys_group_box)
keys_group_box_layout = QHBoxLayout()
keys_group_box.setLayout(keys_group_box_layout)
self.listy = QListWidget(self)
self.listy.setToolTip("List of loaned books")
self.listy.setSelectionMode(QAbstractItemView.SingleSelection)
self.populate_list()
keys_group_box_layout.addWidget(self.listy)
button_layout = QVBoxLayout()
keys_group_box_layout.addLayout(button_layout)
self._add_key_button = QtGui.QToolButton(self)
self._add_key_button.setIcon(QIcon(I('view-refresh.png')))
self._add_key_button.setToolTip("Return book to library")
self._add_key_button.clicked.connect(self.return_book)
button_layout.addWidget(self._add_key_button)
self._delete_key_button = QtGui.QToolButton(self)
self._delete_key_button.setToolTip(_("Delete book entry from list"))
self._delete_key_button.setIcon(QIcon(I('list_remove.png')))
self._delete_key_button.clicked.connect(self.delete_book_entry)
button_layout.addWidget(self._delete_key_button)
self.lblAccInfo = QtGui.QLabel(self)
self.lblAccInfo.setText("Click the arrow button to return a loaned book to the library.\nClick the red X to delete the loan record without returning the book.")
layout.addWidget(self.lblAccInfo)
self.button_box = QDialogButtonBox(QDialogButtonBox.Ok | QDialogButtonBox.Cancel)
self.button_box.accepted.connect(self.accept)
self.button_box.rejected.connect(self.reject)
layout.addWidget(self.button_box)
self.resize(self.sizeHint())
def td_format(self, td_object):
seconds = int(td_object.total_seconds())
periods = [
('y', 60*60*24*365),
('M', 60*60*24*30),
('d', 60*60*24),
('h', 60*60),
('m', 60),
('s', 1)
]
strings=[]
tick = 0
for period_name, period_seconds in periods:
if seconds > period_seconds:
period_value , seconds = divmod(seconds, period_seconds)
strings.append("%s%s" % (period_value, period_name))
tick += 1
if tick >= 2:
break
return " ".join(strings)
def populate_list(self):
self.listy.clear()
overdue_books = []
for book in self.parent.deacsmprefs["list_of_rented_books"]:
try:
book_time_stamp = book["validUntil"]
timestamp = datetime.datetime.strptime(book_time_stamp, "%Y-%m-%dT%H:%M:%SZ")
currenttime = datetime.datetime.utcnow()
except:
print("Invalid book timestamp")
continue
if (timestamp <= currenttime):
# Book is overdue, no need to return. Delete from list.
overdue_books.append(book)
continue
else:
info = "(" + self.td_format(timestamp - currenttime)
info += " remaining)"
item = QListWidgetItem(book["book_name"] + " " + info)
item.setData(QtCore.Qt.UserRole, book["loanID"])
self.listy.addItem(item)
for book in overdue_books:
self.parent.deacsmprefs["list_of_rented_books"].remove(book)
self.parent.deacsmprefs.writeprefs()
def return_book(self):
if not self.listy.currentItem():
return
userdata = str(self.listy.currentItem().data(QtCore.Qt.UserRole))
print("Returning book %s (ID %s)" % (self.listy.currentItem().text(), userdata))
try:
from calibre_plugins.deacsm.libadobeFulfill import tryReturnBook
except:
try:
from libadobeFulfill import tryReturnBook
except:
print("{0} v{1}: Error while importing book return stuff".format(PLUGIN_NAME, PLUGIN_VERSION))
traceback.print_exc()
Ret_book = None
for book in self.parent.deacsmprefs["list_of_rented_books"]:
if book["loanID"] == userdata:
Ret_book = book
break
if Ret_book is None:
return
ret, msg = tryReturnBook(Ret_book)
if (ret):
print("Book successfully returned:")
print(msg)
self.delete_book_entry(nomsg=True)
self.populate_list()
return info_dialog(None, "Done", "Book successfully returned", show=True, show_copy_button=False)
else:
print("Book return failed:")
print(msg)
return error_dialog(None, "Error", "Book return failed", det_msg=msg, show=True, show_copy_button=False)
def delete_book_entry(self, nomsg = False):
if not self.listy.currentItem():
return
userdata = str(self.listy.currentItem().data(QtCore.Qt.UserRole))
print("Deleting book entry %s (ID %s)" % (self.listy.currentItem().text(), userdata))
success = False
for book in self.parent.deacsmprefs["list_of_rented_books"]:
if book["loanID"] == userdata:
self.parent.deacsmprefs["list_of_rented_books"].remove(book)
success = True
break
self.populate_list()
if success and not nomsg:
return info_dialog(None, "Done", "Book entry deleted without returning.", show=True, show_copy_button=False)
if not nomsg:
return error_dialog(None, "Error", "Error while deleting book entry", show=True, show_copy_button=False)

View File

@ -8,7 +8,7 @@ Helper library with code needed for Adobe stuff.
from Crypto import Random
from uuid import getnode
import os, hashlib, base64
import urllib.request
import urllib.request, ssl
from Crypto.Cipher import AES
from datetime import datetime, timedelta
@ -177,15 +177,27 @@ def sendHTTPRequest_getSimple(URL: str):
return content
def sendPOSTHTTPRequest(URL: str, document: bytes, type: str):
def sendPOSTHTTPRequest(URL: str, document: bytes, type: str, returnRC = False):
headers = {
"Accept": "*/*",
"User-Agent": "book2png",
"Content-Type": type
}
# Ignore SSL:
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
req = urllib.request.Request(url=URL, headers=headers, data=document)
handler = urllib.request.urlopen(req)
handler = urllib.request.urlopen(req, context=ctx)
ret_code = handler.getcode()
if (ret_code == 204 and returnRC):
return 204, ""
if (ret_code != 200):
print("Post request returned something other than 200 - returned %d" % (ret_code))
content = handler.read()
@ -196,8 +208,11 @@ def sendPOSTHTTPRequest(URL: str, document: bytes, type: str):
pass
if loc is not None:
return sendPOSTHTTPRequest(loc, document, type)
return sendPOSTHTTPRequest(loc, document, type, returnRC)
if returnRC:
return ret_code, content
return content
@ -206,7 +221,11 @@ def sendHTTPRequest(URL: str):
def sendRequestDocu(document: str, URL: str):
return sendPOSTHTTPRequest(URL, document.encode("utf-8"), "application/vnd.adobe.adept+xml")
return sendPOSTHTTPRequest(URL, document.encode("utf-8"), "application/vnd.adobe.adept+xml", False)
def sendRequestDocuRC(document: str, URL: str):
return sendPOSTHTTPRequest(URL, document.encode("utf-8"), "application/vnd.adobe.adept+xml", True)
######### Encryption and signing ###################

View File

@ -2,10 +2,10 @@ from lxml import etree
import base64
try:
from libadobe import addNonce, sign_node, get_cert_from_pkcs12, sendRequestDocu, sendHTTPRequest
from libadobe import addNonce, sign_node, get_cert_from_pkcs12, sendRequestDocu, sendRequestDocuRC, sendHTTPRequest
from libadobe import get_devkey_path, get_device_path, get_activation_xml_path
except:
from calibre_plugins.deacsm.libadobe import addNonce, sign_node, get_cert_from_pkcs12, sendRequestDocu, sendHTTPRequest
from calibre_plugins.deacsm.libadobe import addNonce, sign_node, get_cert_from_pkcs12, sendRequestDocu, sendRequestDocuRC, sendHTTPRequest
from calibre_plugins.deacsm.libadobe import get_devkey_path, get_device_path, get_activation_xml_path
@ -248,7 +248,7 @@ def buildRights(license_token_node):
return ret
def fulfill(acsm_file):
def fulfill(acsm_file, do_notify = False):
# Get pkcs12:
pkcs12 = None
@ -341,6 +341,8 @@ def fulfill(acsm_file):
else:
return False, "Looks like there's been an error during Fulfillment: %s" % replyData
# Print fulfillmentResult
#print(replyData)
adobe_fulfill_response = etree.fromstring(replyData)
NSMAP = { "adept" : "http://ns.adobe.com/adept" }
@ -348,6 +350,26 @@ def fulfill(acsm_file):
licenseURL = adobe_fulfill_response.find("./%s/%s/%s/%s" % (adNS("fulfillmentResult"), adNS("resourceItemInfo"), adNS("licenseToken"), adNS("licenseURL"))).text
if do_notify:
print("Notifying server ...")
success, response = performFulfillmentNotification(adobe_fulfill_response)
else:
print("Not notifying any server since that was disabled.")
is_returnable = False
try:
is_returnable_tx = adobe_fulfill_response.find("./%s/%s" % (adNS("fulfillmentResult"), adNS("returnable"))).text
if is_returnable_tx.lower() == "true":
is_returnable = True
except:
pass
if (is_returnable and do_notify):
# Only support loan returning if we also notified ACS.
# Otherwise the server gets confused and we don't want that.
updateLoanReturnData(adobe_fulfill_response)
success, response = fetchLicenseServiceCertificate(licenseURL, operatorURL)
if success is False:
@ -357,6 +379,244 @@ def fulfill(acsm_file):
def updateLoanReturnData(fulfillmentResultToken):
NSMAP = { "adept" : "http://ns.adobe.com/adept" }
adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag)
dcNS = lambda tag: '{%s}%s' % ('http://purl.org/dc/elements/1.1/', tag)
try:
loanToken = fulfillmentResultToken.find("./%s" % (adNS("loanToken")))
if (loanToken is None):
print("Loan token not found")
return False
except:
print("Loan token error")
return False
try:
loanID = loanToken.findall("./%s" % (adNS("loan")))[0].text
if (loanID is None):
print("Loan ID not found")
return False
operatorURL = loanToken.find("./%s" % (adNS("operatorURL"))).text
except:
print("Loan ID error")
return False
book_name = fulfillmentResultToken.find("./%s/%s/%s/%s" % (adNS("fulfillmentResult"), adNS("resourceItemInfo"), adNS("metadata"), dcNS("title"))).text
userUUID = fulfillmentResultToken.find("./%s/%s/%s/%s" % (adNS("fulfillmentResult"), adNS("resourceItemInfo"), adNS("licenseToken"), adNS("user"))).text
deviceUUID = fulfillmentResultToken.find("./%s/%s/%s/%s" % (adNS("fulfillmentResult"), adNS("resourceItemInfo"), adNS("licenseToken"), adNS("device"))).text
loanid = fulfillmentResultToken.find("./%s/%s/%s/%s" % (adNS("fulfillmentResult"), adNS("resourceItemInfo"), adNS("licenseToken"), adNS("fulfillment"))).text
permissions = fulfillmentResultToken.find("./%s/%s/%s/%s" % (adNS("fulfillmentResult"), adNS("resourceItemInfo"), adNS("licenseToken"), adNS("permissions")))
display = permissions.findall("./%s" % (adNS("display")))[0]
try:
dsp_until = display.find("./%s" % (adNS("until"))).text
except:
print("error with DSP")
return False
if (dsp_until is None):
print("No validUntil thing")
return False
# "userUUID" is the user UUID
# "deviceUUID" is the device UUID
# "loanID" is the loan ID
# "validUntil" is how long it's valid
try:
import calibre_plugins.deacsm.prefs as prefs # type: ignore
deacsmprefs = prefs.DeACSM_Prefs()
except:
print("Exception while reading config file")
return False
# Add all necessary information for a book return to the JSON array.
# The config widget can then read this and present a list of not-yet-returned
# books, and can then return them.
# Also, the config widget is responsible for cleaning up that list.
deacsmprefs["list_of_rented_books"].append({
"book_name": book_name,
"user": userUUID,
"device": deviceUUID,
"loanID": loanid,
"operatorURL": operatorURL,
"validUntil": dsp_until
})
deacsmprefs.writeprefs()
return True
def tryReturnBook(bookData):
try:
user = bookData["user"]
device = bookData["device"]
loanID = bookData["loanID"]
operatorURL = bookData["operatorURL"]
except:
print("Invalid book data!")
return False, "Invalid book data"
req_data = "<?xml version=\"1.0\"?>"
req_data += "<adept:loanReturn xmlns:adept=\"http://ns.adobe.com/adept\">"
req_data += "<adept:user>%s</adept:user>" % (user)
req_data += "<adept:device>%s</adept:device>" % (device)
req_data += "<adept:loan>%s</adept:loan>" % (loanID)
req_data += addNonce()
req_data += "</adept:loanReturn>"
NSMAP = { "adept" : "http://ns.adobe.com/adept" }
etree.register_namespace("adept", NSMAP["adept"])
full_text_xml = etree.fromstring(req_data)
signature = sign_node(full_text_xml)
if (signature is None):
print("SIGN ERROR!")
return False, "Sign error"
etree.SubElement(full_text_xml, etree.QName(NSMAP["adept"], "signature")).text = signature
print("Would notify server %s:" % (operatorURL + "/LoanReturn"))
doc_send = "<?xml version=\"1.0\"?>\n" + etree.tostring(full_text_xml, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8")
# print(doc_send)
retval = sendRequestDocu(doc_send, operatorURL + "/LoanReturn").decode("utf-8")
if "<error" in retval:
print("Loan return failed: %s" % (retval))
return False, retval
elif "<envelope" in retval:
print("Loan return successful")
return performFulfillmentNotification(etree.fromstring(retval), True, user=user, device=device)
else:
print("Invalid loan return response: %s" % (retval))
return False, retval
def performFulfillmentNotification(fulfillmentResultToken, forceOptional = False, user = None, device = None):
NSMAP = { "adept" : "http://ns.adobe.com/adept" }
adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag)
reset = False
try:
notifiers = fulfillmentResultToken.findall("./%s/%s" % (adNS("fulfillmentResult"), adNS("notify")))
except:
reset = True
if len(notifiers) == 0:
try:
notifiers = fulfillmentResultToken.findall("./%s" % (adNS("notify")))
except:
reset = True
if len(notifiers) == 0:
try:
notifiers = fulfillmentResultToken.findall("./%s/%s" % (adNS("envelope"), adNS("notify")))
except:
reset = True
if len(notifiers) == 0:
print("<notify> tag not found. Guess nobody wants to be notified.")
#print(etree.tostring(fulfillmentResultToken, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8"))
return True, ""
errmsg = ""
errmsg_crit = ""
for element in notifiers:
url = element.find("./%s" % (adNS("notifyURL"))).text
body = element.find("./%s" % (adNS("body")))
critical = True
if element.get("critical", "yes") == "no":
critial = False
print("Notifying optional server %s" % (url))
else:
print("Notifying server %s" % (url))
if (user is None):
user = fulfillmentResultToken.find("./%s/%s/%s/%s" % (adNS("fulfillmentResult"), adNS("resourceItemInfo"), adNS("licenseToken"), adNS("user"))).text
if (device is None):
device = fulfillmentResultToken.find("./%s/%s/%s/%s" % (adNS("fulfillmentResult"), adNS("resourceItemInfo"), adNS("licenseToken"), adNS("device"))).text
full_text = "<adept:notification xmlns:adept=\"http://ns.adobe.com/adept\">"
full_text += "<adept:user>%s</adept:user>" % user
full_text += "<adept:device>%s</adept:device>" % device
full_text += etree.tostring(body, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8")
full_text += addNonce()
full_text += "</adept:notification>"
NSMAP = { "adept" : "http://ns.adobe.com/adept" }
etree.register_namespace("adept", NSMAP["adept"])
full_text_xml = etree.fromstring(full_text)
signature = sign_node(full_text_xml)
if (signature is None):
print("SIGN ERROR!")
continue
etree.SubElement(full_text_xml, etree.QName(NSMAP["adept"], "signature")).text = signature
doc_send = "<?xml version=\"1.0\"?>\n" + etree.tostring(full_text_xml, encoding="utf-8", pretty_print=True, xml_declaration=False).decode("utf-8")
# Debug: Print notify request
#print(doc_send)
code, msg = sendRequestDocuRC(doc_send, url)
try:
msg = msg.decode("utf-8")
except:
pass
if "<error" in msg:
print("Fulfillment notification error: %s" % (msg))
errmsg += "ERROR\n" + url + "\n" + msg + "\n\n"
if critical:
errmsg_crit += "ERROR\n" + url + "\n" + msg + "\n\n"
elif "<success" in msg:
print("Fulfillment notification successful.")
elif code == 204:
print("Fulfillment notification successful (204).")
else:
print("Weird Fulfillment Notification response: %s" % (msg))
errmsg += "ERROR\n" + url + "\n" + msg + "\n\n"
if critical:
errmsg_crit += "ERROR\n" + url + "\n" + msg + "\n\n"
if errmsg_crit == "":
return True, ""
return False, errmsg
def fetchLicenseServiceCertificate(licenseURL: str, operatorURL: str):
# Check if we already have a cert for this URL:
adNS = lambda tag: '{%s}%s' % ('http://ns.adobe.com/adept', tag)

View File

@ -142,7 +142,7 @@ def patch_drm_into_pdf(filename_in, adept_license_string, filename_out, ebx_book
trailer_idx += 1
trailer = line + "\n" + trailer
print ("LINE: " + line)
#print ("LINE: " + line)
if (trailer_idx > 10):
print("Took more than 10 attempts to find startxref ...")

View File

@ -18,6 +18,15 @@ class DeACSM_Prefs():
self.deacsmprefs.defaults['configured'] = False
self.deacsmprefs.defaults['notify_fulfillment'] = True
self.deacsmprefs.defaults['list_of_rented_books'] = []
if self.deacsmprefs['list_of_rented_books'] == []:
self.deacsmprefs['list_of_rented_books'] = []
self.pluginsdir = os.path.join(config_dir,"plugins")
if not os.path.exists(self.pluginsdir):
os.mkdir(self.pluginsdir)