From bc1038eb7dc39c48393eae8f073073c00a8ed27c Mon Sep 17 00:00:00 2001 From: Paul Date: Tue, 29 Nov 2022 19:32:19 +0100 Subject: [PATCH 1/4] Implement externalAccountBinding https://datatracker.ietf.org/doc/html/rfc8555#section-7.3.4 Renamed `account` variable to the more appropriate: `response`. That is what this variable holds. --- README.md | 15 +++++++++++++++ acme_tiny.py | 24 +++++++++++++----------- 2 files changed, 28 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 40a4377a..3c14f5a4 100644 --- a/README.md +++ b/README.md @@ -122,6 +122,21 @@ and read your private account key and CSR. python acme_tiny.py --account-key ./account.key --csr ./domain.csr --acme-dir /var/www/challenges/ > ./signed_chain.crt ``` +If your ACME CA mandates externalAccountBinding (eAB), provide those parameters like so: + +``` +# Run the script on your server +python acme_tiny.py --account-key ./account.key --csr ./domain.csr --acme-dir /var/www/challenges/ --eabkid 'PAtzxcSFQMQSdm9SLJTxCt0hwvvl5yNKPfnWBWqPk8o' --eabhmackey 'ZndUSkZvVldvMEFiRzQ5VWNCdERtNkNBNnBTcTl4czNKVEVxdUZiaEdpZXZNUVJBVmRuSFREcDJYX2s3X0NxTA' > ./signed_chain.crt +``` + +Some ACME CA mandate a contact at registration: + +``` +# Run the script on your server +python acme_tiny.py --account-key ./account.key --csr ./domain.csr --acme-dir /var/www/challenges/ --contact aaa@bbb.com --eabkid 'PAtzxcSFQMQSdm9SLJTxCt0hwvvl5yNKPfnWBWqPk8o' --eabhmackey 'ZndUSkZvVldvMEFiRzQ5VWNCdERtNkNBNnBTcTl4czNKVEVxdUZiaEdpZXZNUVJBVmRuSFREcDJYX2s3X0NxTA' > ./signed_chain.crt +``` + + ### Step 5: Install the certificate The signed https certificate chain that is output by this script can be used along diff --git a/acme_tiny.py b/acme_tiny.py index d992d02d..32459c5d 100755 --- a/acme_tiny.py +++ b/acme_tiny.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # Copyright Daniel Roesler, under MIT license, see LICENSE at github.com/diafygi/acme-tiny -import argparse, subprocess, json, os, sys, base64, binascii, time, hashlib, re, copy, textwrap, logging +import argparse, subprocess, json, os, sys, base64, binascii, time, hashlib, re, copy, textwrap, logging, hmac try: from urllib.request import urlopen, Request # Python 3 except ImportError: # pragma: no cover @@ -13,7 +13,7 @@ LOGGER.addHandler(logging.StreamHandler()) LOGGER.setLevel(logging.INFO) -def get_crt(account_key, csr, acme_dir, log=LOGGER, CA=DEFAULT_CA, disable_check=False, directory_url=DEFAULT_DIRECTORY_URL, contact=None, check_port=None): +def get_crt(account_key, csr, acme_dir, log=LOGGER, CA=DEFAULT_CA, disable_check=False, directory_url=DEFAULT_DIRECTORY_URL, contact=None, check_port=None, eabkid=None, eabhmackey=None): directory, acct_headers, alg, jwk = None, None, None, None # global variables # helper functions - base64 encode for jose spec @@ -108,11 +108,14 @@ def _poll_until_not(url, pending_statuses, err_msg): # create account, update contact details (if any), and set the global key identifier log.info("Registering account...") reg_payload = {"termsOfServiceAgreed": True} if contact is None else {"termsOfServiceAgreed": True, "contact": contact} - account, code, acct_headers = _send_signed_request(directory['newAccount'], reg_payload, "Error registering") + if eabkid and eabhmackey: # https://datatracker.ietf.org/doc/html/rfc8555#section-7.3.4 + log.info("Building externalAccountBinding...") + reg_payload['externalAccountBinding'] = {"protected": _b64(json.dumps({"alg": "HS256", "kid": eabkid, "url": directory['newAccount']}).encode('utf-8')), "payload": _b64(json.dumps(jwk).encode('utf-8')), "signature": _b64(hmac.new(base64.urlsafe_b64decode(eabhmackey.strip() + '=='), (_b64(json.dumps({"alg": "HS256", "kid": eabkid, "url": directory['newAccount']}).encode('utf-8')) + "." + _b64(json.dumps(jwk).encode('utf-8'))).encode('utf-8'), digestmod=hashlib.sha256).digest())} + response, code, acct_headers = _send_signed_request(directory['newAccount'], reg_payload, "Error registering") log.info("{0} Account ID: {1}".format("Registered!" if code == 201 else "Already registered!", acct_headers['Location'])) - if contact is not None: - account, _, _ = _send_signed_request(acct_headers['Location'], {"contact": contact}, "Error updating contact details") - log.info("Updated contact details:\n{0}".format("\n".join(account['contact']))) + if contact and code == 200: # 200 == already reg --> update + response, _, _ = _send_signed_request(acct_headers['Location'], {"contact": contact}, "Error updating contact details") + log.info("Updated contact details:\n{0}".format("\n".join(response['contact']))) # create a new order log.info("Creating new order...") @@ -175,10 +178,8 @@ def main(argv=None): description=textwrap.dedent("""\ This script automates the process of getting a signed TLS certificate from Let's Encrypt using the ACME protocol. It will need to be run on your server and have access to your private account key, so PLEASE READ THROUGH IT! - It's only ~200 lines, so it won't take long. - - Example Usage: python acme_tiny.py --account-key ./account.key --csr ./domain.csr --acme-dir /usr/share/nginx/html/.well-known/acme-challenge/ > signed_chain.crt - """) + It's only ~200 lines, so it won't take long.\n\n + Example Usage: python acme_tiny.py --account-key ./account.key --csr ./domain.csr --acme-dir /usr/share/nginx/html/.well-known/acme-challenge/ > signed_chain.crt""") ) parser.add_argument("--account-key", required=True, help="path to your Let's Encrypt account private key") parser.add_argument("--csr", required=True, help="path to your certificate signing request") @@ -189,10 +190,11 @@ def main(argv=None): parser.add_argument("--ca", default=DEFAULT_CA, help="DEPRECATED! USE --directory-url INSTEAD!") parser.add_argument("--contact", metavar="CONTACT", default=None, nargs="*", help="Contact details (e.g. mailto:aaa@bbb.com) for your account-key") parser.add_argument("--check-port", metavar="PORT", default=None, help="what port to use when self-checking the challenge file, default is port 80") + parser.add_argument("--eabkid", metavar="KID", default=None, help="Key Identifier for External Account Binding"), parser.add_argument("--eabhmackey", metavar="HMAC", default=None, help="HMAC key for External Account Binding") args = parser.parse_args(argv) LOGGER.setLevel(args.quiet or LOGGER.level) - signed_crt = get_crt(args.account_key, args.csr, args.acme_dir, log=LOGGER, CA=args.ca, disable_check=args.disable_check, directory_url=args.directory_url, contact=args.contact, check_port=args.check_port) + signed_crt = get_crt(args.account_key, args.csr, args.acme_dir, log=LOGGER, CA=args.ca, disable_check=args.disable_check, directory_url=args.directory_url, contact=args.contact, check_port=args.check_port, eabkid=args.eabkid, eabhmackey=args.eabhmackey) sys.stdout.write(signed_crt) if __name__ == "__main__": # pragma: no cover From 455f1fa94ff179597deebee5fc0243764ed55d4b Mon Sep 17 00:00:00 2001 From: Paul Date: Tue, 29 Nov 2022 19:54:20 +0100 Subject: [PATCH 2/4] Implement Changes of Terms of Service https://datatracker.ietf.org/doc/html/rfc8555#section-7.3.3 (which will never happen on Letsencrypt, but may on other CAs) --- acme_tiny.py | 1 + 1 file changed, 1 insertion(+) diff --git a/acme_tiny.py b/acme_tiny.py index 32459c5d..ab8f6893 100755 --- a/acme_tiny.py +++ b/acme_tiny.py @@ -116,6 +116,7 @@ def _poll_until_not(url, pending_statuses, err_msg): if contact and code == 200: # 200 == already reg --> update response, _, _ = _send_signed_request(acct_headers['Location'], {"contact": contact}, "Error updating contact details") log.info("Updated contact details:\n{0}".format("\n".join(response['contact']))) + log.info("You must agree to updated TOS:\n", response['instance']) if code == 403 and response['type'] == 'urn:ietf:params:acme:error:userActionRequired' else 0 # https://datatracker.ietf.org/doc/html/rfc8555#section-7.3.3 : #userActionRequired only for TOS in RFC8555 # create a new order log.info("Creating new order...") From 3d4b6ceb429c1b65c9a2b377dbf9e58ab554ae82 Mon Sep 17 00:00:00 2001 From: Paul Date: Tue, 29 Nov 2022 19:42:13 +0100 Subject: [PATCH 3/4] Scavenge some extra lines --- acme_tiny.py | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/acme_tiny.py b/acme_tiny.py index ab8f6893..c51c3699 100755 --- a/acme_tiny.py +++ b/acme_tiny.py @@ -16,20 +16,17 @@ def get_crt(account_key, csr, acme_dir, log=LOGGER, CA=DEFAULT_CA, disable_check=False, directory_url=DEFAULT_DIRECTORY_URL, contact=None, check_port=None, eabkid=None, eabhmackey=None): directory, acct_headers, alg, jwk = None, None, None, None # global variables - # helper functions - base64 encode for jose spec - def _b64(b): + def _b64(b): # helper function - base64 encode for jose spec return base64.urlsafe_b64encode(b).decode('utf8').replace("=", "") - # helper function - run external commands - def _cmd(cmd_list, stdin=None, cmd_input=None, err_msg="Command Line Error"): + def _cmd(cmd_list, stdin=None, cmd_input=None, err_msg="Command Line Error"): # helper function - run external commands proc = subprocess.Popen(cmd_list, stdin=stdin, stdout=subprocess.PIPE, stderr=subprocess.PIPE) out, err = proc.communicate(cmd_input) if proc.returncode != 0: raise IOError("{0}\n{1}".format(err_msg, err)) return out - # helper function - make request and automatically parse json response - def _do_request(url, data=None, err_msg="Error", depth=0): + def _do_request(url, data=None, err_msg="Error", depth=0): # helper function - make request and automatically parse json response try: resp = urlopen(Request(url, data=data, headers={"Content-Type": "application/jose+json", "User-Agent": "acme-tiny"})) resp_data, code, headers = resp.read().decode("utf8"), resp.getcode(), resp.headers @@ -46,8 +43,7 @@ def _do_request(url, data=None, err_msg="Error", depth=0): raise ValueError("{0}:\nUrl: {1}\nData: {2}\nResponse Code: {3}\nResponse: {4}".format(err_msg, url, data, code, resp_data)) return resp_data, code, headers - # helper function - make signed requests - def _send_signed_request(url, payload, err_msg, depth=0): + def _send_signed_request(url, payload, err_msg, depth=0): # helper function - make signed requests payload64 = "" if payload is None else _b64(json.dumps(payload).encode('utf8')) new_nonce = _do_request(directory['newNonce'])[2]['Replay-Nonce'] protected = {"url": url, "alg": alg, "nonce": new_nonce} @@ -61,8 +57,7 @@ def _send_signed_request(url, payload, err_msg, depth=0): except IndexError: # retry bad nonces (they raise IndexError) return _send_signed_request(url, payload, err_msg, depth=(depth + 1)) - # helper function - poll until complete - def _poll_until_not(url, pending_statuses, err_msg): + def _poll_until_not(url, pending_statuses, err_msg): # helper function - poll until complete result, t0 = None, time.time() while result is None or result['status'] in pending_statuses: assert (time.time() - t0 < 3600), "Polling timeout" # 1 hour timeout From 9f1b80f54a6e657db3c0c0125c89a30b23187699 Mon Sep 17 00:00:00 2001 From: Paul Date: Thu, 7 Oct 2021 20:46:43 +0200 Subject: [PATCH 4/4] eAB test cases --- tests/test_module.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/tests/test_module.py b/tests/test_module.py index 25461fe3..afdc948e 100644 --- a/tests/test_module.py +++ b/tests/test_module.py @@ -224,8 +224,8 @@ def test_account_key_domain(self): self.assertIsInstance(result, ValueError) self.assertIn(self.account_key_error, result.args[0]) - def test_contact(self): - """ Make sure optional contact details can be set """ + def test_contact_update(self): + """ Make sure optional contact details can be updated """ # add a logging handler that captures the info log output log_output = StringIO() debug_handler = logging.StreamHandler(log_output) @@ -233,6 +233,14 @@ def test_contact(self): # call acme_tiny with new contact details old_stdout = sys.stdout sys.stdout = StringIO() + result = acme_tiny.main([ + "--account-key", self.KEYS['account_key'].name, + "--csr", self.KEYS['domain_csr'].name, + "--acme-dir", self.tempdir, + "--directory-url", self.DIR_URL, + "--check-port", self.check_port, + "--contact", "mailto:devteam2@gethttpsforfree.com", "mailto:daboss@gethttpsforfree.com", + ]) result = acme_tiny.main([ "--account-key", self.KEYS['account_key'].name, "--csr", self.KEYS['domain_csr'].name,