From 815269b2b71803058a5c597470c4d9961d7558dc Mon Sep 17 00:00:00 2001 From: Lukas Puehringer Date: Mon, 14 Aug 2023 13:44:00 +0200 Subject: [PATCH 01/14] Support 'metadata update' in 'sign_metadata' task Implement support for distributed asynchronous root metadata signing in the course of a "metadata update" event. Other than the already supported "bootstrap" signing event, signatures added to root during "metadata update" must validate with keys from trusted OR new root, and meet the signature threshold of trusted AND new root. *Related changes:* - Ignore obsolete "rolename" in sign_metadata payload. We only support root, and check the type when loading "ROOT_SIGNING". - Refactor `_validate_{signature, threshold}` helpers to accept an optional delegator (e.g. trusted root). - Add local `_result` helper to return a "sign metadata"-specific task result. Signed-off-by: Lukas Puehringer --- repository_service_tuf_worker/repository.py | 192 +++++++++----------- 1 file changed, 89 insertions(+), 103 deletions(-) diff --git a/repository_service_tuf_worker/repository.py b/repository_service_tuf_worker/repository.py index e2065673..b351a747 100644 --- a/repository_service_tuf_worker/repository.py +++ b/repository_service_tuf_worker/repository.py @@ -1262,20 +1262,25 @@ def metadata_rotation( return self.metadata_update(payload, update_state) @staticmethod - def _validate_signature(metadata: Metadata, signature: Signature) -> bool: + def _validate_signature( + metadata: Metadata, + signature: Signature, + delegator: Optional[Metadata] = None, + ) -> bool: """ - Validate signature over metadata using appropriate delegator. - - NOTE: In "metadata update" signing event, the public key and - authorization info is retrieved from "trusted root" + Validate signature over metadata using appropriate delegator. + If no delegator is passed, the metadata itself is used as delegator. """ + if delegator is None: + delegator = metadata + keyid = signature.keyid - if keyid not in metadata.signed.roles[Root.type].keyids: + if keyid not in delegator.signed.roles[Root.type].keyids: logging.info(f"signature '{keyid}' not authorized") return False - key = metadata.signed.keys.get(signature.keyid) + key = delegator.signed.keys.get(signature.keyid) if not key: logging.info(f"no key for signature '{keyid}'") return False @@ -1292,25 +1297,19 @@ def _validate_signature(metadata: Metadata, signature: Signature) -> bool: return True @staticmethod - def _validate_threshold(metadata: Metadata) -> bool: + def _validate_threshold( + metadata: Metadata, delegator: Optional[Metadata] = None + ) -> bool: """ Validate signature threshold using appropriate delegator(s). + If no delegator is passed, the metadata itself is used as delegator. - NOTE: In "metadata update" signing event, the threshold for: - - root is validated with the passed metadata AND the trusted root; - - top-level targets is validated with the trusted root; - - delegated targets is validated with the delegating targets; - as delegator. """ + if delegator is None: + delegator = metadata try: - # TODO: `verify_delegate` does not tell us if there are any - # superfluous valid or invalid signatures. Is this something we - # want to know, e.g. to detect mistakes? To detect superfluous - # signatures if verify_delegate succeeds, would be easy: `assert - # len(signatures) == threshold`. Anything, else would require a - # custom `verify_delegate` function. - metadata.verify_delegate(Root.type, metadata) + delegator.verify_delegate(Root.type, metadata) except UnsignedMetadataError as e: logging.info(e) @@ -1327,100 +1326,87 @@ def sign_metadata( ) -> Dict[str, Any]: """Add signature to metadata for pending signing event. - Add signature (from payload) to cached role metadata (from settings) - for the role that matches the passed rolename (from payload), if a - signing event exists for that role, and the signature is valid. - - If the signature threshold is reached, the signing event is finalized. + Add signature (from payload) to cached root metadata (from settings), + if a signing event exists, and the signature is valid. - ** Signing event types (and details) ** + Signing event types are 'bootstrap' or 'metadata update'. - BOOTSTRAP: Only root metadata can be updated in this event. To verify - the passed signature, the keys and threshold are read from the root - metadata to be updated itself. If the threshold is reached, the - bootstrap process is finalized. - - METADATA UPDATE: Root, targets or delegated targets metadata can be - updated in this event. Depending on the metadata type, the authorized - public keys and threshold are read from different delegating metadata. + If the signature threshold is reached, the signing event is finalized, + otherwise it remains in pending state. """ - rolename = payload["role"] - signature_dict = payload["signature"] - # Assert pending signing event - metadata_dict = self._settings.get_fresh(f"{rolename.upper()}_SIGNING") + def _result(status, error=None, bootstrap=None, update=None): + details = {} + if status: + details["message"] = "Signature Processed" + else: + details["message"] = "Signature Failed" + if error: + details["error"] = error + if bootstrap: + details["bootstrap"] = bootstrap + if update: + details["update"] = update + return self._task_result(TaskName.SIGN_METADATA, status, details) + + signature = Signature.from_dict(payload["signature"]) + + # Assert pending signing event exists + metadata_dict = self._settings.get_fresh("ROOT_SIGNING") if metadata_dict is None: - return self._task_result( - TaskName.SIGN_METADATA, - False, - { - "message": "Signature Failed", - "error": f"No signatures pending for {rolename}", - }, - ) + msg = "No signatures pending for root" + return _result(False, error=msg) - # Assert signing event type (currently bootstrap only) - # TODO: repository-service-tuf/repository-service-tuf-worker#336 - bootstrap_state = self._settings.get_fresh("BOOTSTRAP") - if "signing" not in bootstrap_state: - return self._task_result( - TaskName.SIGN_METADATA, - False, - { - "message": "Signature Failed", - "error": "No bootstrap available for signing", - }, - ) - - # Assert metadata type is allowed for signing event + # Assert metadata type is root root = Metadata.from_dict(metadata_dict) if not isinstance(root.signed, Root): - return self._task_result( - TaskName.SIGN_METADATA, - False, - { - "message": "Signature Failed", - "error": f"Role {rolename} has wrong type", - }, - ) + msg = f"Expected 'root', got '{root.signed.type}'" + return _result(False, error=msg) - # Assert passed signature is valid for metadata - signature = Signature.from_dict(signature_dict) - if not self._validate_signature(root, signature): - return self._task_result( - TaskName.SIGN_METADATA, - False, - { - "message": "Signature Failed", - "error": "Invalid signature", - }, - ) + # If it isn't a "bootstrap" signing event, it must be "update metadata" + bootstrap_state = self._settings.get_fresh("BOOTSTRAP") + if "signing" in bootstrap_state: + # Signature and threshold of initial root can only self-validate, + # there is no "trusted root" at bootstrap time yet. + if not self._validate_signature(root, signature): + return _result(False, error="Invalid signature") + + root.signatures[signature.keyid] = signature + if not self._validate_threshold(root): + self.write_repository_settings("ROOT_SIGNING", root.to_dict()) + msg = f"Root v{root.signed.version} is pending signatures" + return _result(True, bootstrap=msg) + + bootstrap_task_id = bootstrap_state.split("signing-")[1] + self._bootstrap_finalize(root, bootstrap_task_id) + return _result(True, bootstrap="Bootstrap Finished") - # Check threshold with new signature included - root.signatures[signature.keyid] = signature - if not self._validate_threshold(root): - self.write_repository_settings("ROOT_SIGNING", root.to_dict()) - root_version = root.signed.version - return self._task_result( - TaskName.SIGN_METADATA, - True, - { - "message": "Signature Processed", - "bootstrap": f"Root v{root_version} is pending signatures", - }, + else: + # Also consult with "trusted root" when updating a new root + # - Signature must validate with trusted OR new root + # - Threshold must validate with trusted AND new root + trusted_root = self._storage_backend.get("root") + trusted_signature = self._validate_signature( + root, signature, trusted_root ) - - # Finalize bootstrap - bootstrap_task_id = bootstrap_state.split("signing-")[1] - self._bootstrap_finalize(root, bootstrap_task_id) - return self._task_result( - TaskName.SIGN_METADATA, - True, - { - "message": "Signature Processed", - "bootstrap": "Bootstrap Finished", - }, - ) + new_signature = self._validate_signature(root, signature) + + if not (trusted_signature or new_signature): + return _result(False, error="Invalid signature") + + root.signatures[signature.keyid] = signature + trusted_threshold = self._validate_threshold(root, trusted_root) + new_threshold = self._validate_threshold(root) + if not (trusted_threshold and new_threshold): + self.write_repository_settings("ROOT_SIGNING", root.to_dict()) + msg = f"Root v{root.signed.version} is pending signatures" + return _result(True, update=msg) + + # TODO: Refactor `_root_metadata_update` to de-duplicate validation + self._root_metadata_update(root) + # Update successful, root persisted -> finalize event... + self.write_repository_settings("ROOT_SIGNING", None) + return _result(True, update="Metadata update finished") def delete_sign_metadata( self, From 808cdbea6334cfd422b67c2f535f1fb96a25a919 Mon Sep 17 00:00:00 2001 From: Lukas Puehringer Date: Wed, 16 Aug 2023 18:04:29 +0200 Subject: [PATCH 02/14] Fix failing tests for sign_metadata - remove obsolete "test_sign_metadata_root_signing_no_bootstrap" Now, if there is no ongoing "bootstrap", we just assume there is an ongoing "metadata update". - adopt changes in "test_sign_metadata_invalid_role_type" - new expected error message - fail earlier, before consulting with "BOOTSTRAP" state variable Signed-off-by: Lukas Puehringer --- .../test_repository.py | 46 +------------------ 1 file changed, 1 insertion(+), 45 deletions(-) diff --git a/tests/unit/tuf_repository_service_worker/test_repository.py b/tests/unit/tuf_repository_service_worker/test_repository.py index a6494d14..262dcbea 100644 --- a/tests/unit/tuf_repository_service_worker/test_repository.py +++ b/tests/unit/tuf_repository_service_worker/test_repository.py @@ -3103,49 +3103,6 @@ def test_sign_metadata_no_role_signing( pretend.call("ROOT_SIGNING"), ] - def test_sign_metadata_root_signing_no_bootstrap( - self, test_repo, monkeypatch, mocked_datetime - ): - fake_datetime = mocked_datetime - - def fake_get_fresh(key): - if key == "BOOTSTRAP": - return "" - if key == "ROOT_SIGNING": - return {"metadata": "fake"} - - fake_settings = pretend.stub( - get_fresh=pretend.call_recorder(fake_get_fresh), - ) - monkeypatch.setattr( - repository, - "get_repository_settings", - lambda *a, **kw: fake_settings, - ) - - payload = { - "role": "root", - "signature": {"keyid": "keyid2", "sig": "sig2"}, - } - result = test_repo.sign_metadata(payload) - - assert result == { - "task": "sign_metadata", - "status": False, - "last_update": fake_datetime.now(), - "details": { - "message": "Signature Failed", - "error": "No bootstrap available for signing", - }, - } - assert fake_settings.get_fresh.calls == [ - pretend.call("ROOT_SIGNING"), - pretend.call("BOOTSTRAP"), - ] - assert repository.Metadata.from_dict.calls == [ - pretend.call({"metadata": "fake"}) - ] - def test_sign_metadata_invalid_role_type( self, test_repo, monkeypatch, mocked_datetime ): @@ -3183,12 +3140,11 @@ def fake_get_fresh(key): "last_update": fake_datetime.now(), "details": { "message": "Signature Failed", - "error": f"Role {payload['role']} has wrong type", + "error": "Expected 'root', got 'targets'", }, } assert fake_settings.get_fresh.calls == [ pretend.call("ROOT_SIGNING"), - pretend.call("BOOTSTRAP"), ] def test_sign_metadata_invalid_signature( From 3656043137407ca2ca44d9b7a1af3e53482821d1 Mon Sep 17 00:00:00 2001 From: Lukas Puehringer Date: Wed, 4 Oct 2023 11:30:41 +0200 Subject: [PATCH 03/14] Add test_sign_metadata__update__invalid_signature Signed-off-by: Lukas Puehringer --- .../test_repository.py | 66 +++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/tests/unit/tuf_repository_service_worker/test_repository.py b/tests/unit/tuf_repository_service_worker/test_repository.py index 262dcbea..874aecbe 100644 --- a/tests/unit/tuf_repository_service_worker/test_repository.py +++ b/tests/unit/tuf_repository_service_worker/test_repository.py @@ -3271,6 +3271,72 @@ def fake_get_fresh(key): pretend.call("ROOT_SIGNING", "fake_metadata") ] + def test_sign_metadata__update__invalid_signature( + self, test_repo, monkeypatch, mocked_datetime + ): + fake_datetime = mocked_datetime + + def fake_get_fresh(key): + if key == "BOOTSTRAP": + return "" + if key == "ROOT_SIGNING": + return {"metadata": "fake"} + + fake_settings = pretend.stub( + get_fresh=pretend.call_recorder(fake_get_fresh), + ) + + monkeypatch.setattr( + repository, + "get_repository_settings", + lambda *a, **kw: fake_settings, + ) + + fake_signature = pretend.stub(keyid="fake_sig") + repository.Signature.from_dict = pretend.call_recorder( + lambda *a: fake_signature + ) + test_repo._validate_signature = pretend.call_recorder(lambda *a: False) + + fake_trusted_root = repository.Metadata( + signed=repository.Root(version=1) + ) + test_repo._storage_backend.get = pretend.call_recorder( + lambda r: fake_trusted_root + ) + fake_new_root = repository.Metadata(signed=repository.Root(version=2)) + fake_new_root.to_dict = pretend.call_recorder(lambda: "fake") + repository.Metadata.from_dict = pretend.call_recorder( + lambda *a: fake_new_root + ) + + payload = { + "role": "root", + "signature": {"keyid": "keyid2", "sig": "sig2"}, + } + result = test_repo.sign_metadata(payload) + + assert result == { + "task": "sign_metadata", + "status": False, + "last_update": fake_datetime.now(), + "details": { + "message": "Signature Failed", + "error": "Invalid signature", + }, + } + assert fake_settings.get_fresh.calls == [ + pretend.call("ROOT_SIGNING"), + pretend.call("BOOTSTRAP"), + ] + assert repository.Metadata.from_dict.calls == [ + pretend.call({"metadata": "fake"}) + ] + assert test_repo._validate_signature.calls == [ + pretend.call(fake_new_root, fake_signature, fake_trusted_root), + pretend.call(fake_new_root, fake_signature), + ] + def test_delete_sign_metadata_bootstrap_signing_state( self, test_repo, monkeypatch, mocked_datetime ): From e001fd33ba0239c3fa24c782cbbb1c399e62e3c9 Mon Sep 17 00:00:00 2001 From: Lukas Puehringer Date: Wed, 4 Oct 2023 11:32:24 +0200 Subject: [PATCH 04/14] Add test_sign_metadata__update__invalid_threshold Signed-off-by: Lukas Puehringer --- .../test_repository.py | 73 +++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/tests/unit/tuf_repository_service_worker/test_repository.py b/tests/unit/tuf_repository_service_worker/test_repository.py index 874aecbe..9134be10 100644 --- a/tests/unit/tuf_repository_service_worker/test_repository.py +++ b/tests/unit/tuf_repository_service_worker/test_repository.py @@ -3337,6 +3337,79 @@ def fake_get_fresh(key): pretend.call(fake_new_root, fake_signature), ] + def test_sign_metadata__update__invalid_threshold( + self, test_repo, monkeypatch, mocked_datetime + ): + fake_datetime = mocked_datetime + + def fake_get_fresh(key): + if key == "BOOTSTRAP": + return "" + if key == "ROOT_SIGNING": + return {"metadata": "fake"} + + fake_settings = pretend.stub( + get_fresh=pretend.call_recorder(fake_get_fresh), + ) + + monkeypatch.setattr( + repository, + "get_repository_settings", + lambda *a, **kw: fake_settings, + ) + + fake_signature = pretend.stub(keyid="fake_sig") + repository.Signature.from_dict = pretend.call_recorder( + lambda *a: fake_signature + ) + test_repo._validate_signature = pretend.call_recorder(lambda *a: True) + test_repo._validate_threshold = pretend.call_recorder(lambda *a: False) + test_repo.write_repository_settings = pretend.call_recorder( + lambda *a: None + ) + + fake_trusted_root = repository.Metadata( + signed=repository.Root(version=1) + ) + test_repo._storage_backend.get = pretend.call_recorder( + lambda r: fake_trusted_root + ) + fake_new_root = repository.Metadata(signed=repository.Root(version=2)) + fake_new_root.to_dict = pretend.call_recorder(lambda: "fake") + repository.Metadata.from_dict = pretend.call_recorder( + lambda *a: fake_new_root + ) + + payload = { + "role": "root", + "signature": {"keyid": "keyid2", "sig": "sig2"}, + } + result = test_repo.sign_metadata(payload) + + assert result == { + "task": "sign_metadata", + "status": True, + "last_update": fake_datetime.now(), + "details": { + "message": "Signature Processed", + "update": "Root v2 is pending signatures", + }, + } + assert fake_settings.get_fresh.calls == [ + pretend.call("ROOT_SIGNING"), + pretend.call("BOOTSTRAP"), + ] + assert repository.Metadata.from_dict.calls == [ + pretend.call({"metadata": "fake"}) + ] + assert test_repo._validate_signature.calls == [ + pretend.call(fake_new_root, fake_signature, fake_trusted_root), + pretend.call(fake_new_root, fake_signature), + ] + assert test_repo.write_repository_settings.calls == [ + pretend.call("ROOT_SIGNING", "fake") + ] + def test_delete_sign_metadata_bootstrap_signing_state( self, test_repo, monkeypatch, mocked_datetime ): From c4d38a4e8593fd2e6546076fe72c1017c2827edd Mon Sep 17 00:00:00 2001 From: Lukas Puehringer Date: Wed, 4 Oct 2023 11:35:09 +0200 Subject: [PATCH 05/14] Add test_sign_metadata__update__finalize Signed-off-by: Lukas Puehringer --- .../test_repository.py | 78 +++++++++++++++++++ 1 file changed, 78 insertions(+) diff --git a/tests/unit/tuf_repository_service_worker/test_repository.py b/tests/unit/tuf_repository_service_worker/test_repository.py index 9134be10..7d903454 100644 --- a/tests/unit/tuf_repository_service_worker/test_repository.py +++ b/tests/unit/tuf_repository_service_worker/test_repository.py @@ -3410,6 +3410,84 @@ def fake_get_fresh(key): pretend.call("ROOT_SIGNING", "fake") ] + def test_sign_metadata__update__finalize( + self, test_repo, monkeypatch, mocked_datetime + ): + fake_datetime = mocked_datetime + + def fake_get_fresh(key): + if key == "BOOTSTRAP": + return "" + if key == "ROOT_SIGNING": + return {"metadata": "fake"} + + fake_settings = pretend.stub( + get_fresh=pretend.call_recorder(fake_get_fresh), + ) + + monkeypatch.setattr( + repository, + "get_repository_settings", + lambda *a, **kw: fake_settings, + ) + + fake_signature = pretend.stub(keyid="fake_sig") + repository.Signature.from_dict = pretend.call_recorder( + lambda *a: fake_signature + ) + test_repo._validate_signature = pretend.call_recorder(lambda *a: True) + test_repo._validate_threshold = pretend.call_recorder(lambda *a: True) + test_repo.write_repository_settings = pretend.call_recorder( + lambda *a: None + ) + test_repo._root_metadata_update = pretend.call_recorder( + lambda *a: "fake_result" + ) + + fake_trusted_root = repository.Metadata( + signed=repository.Root(version=1) + ) + test_repo._storage_backend.get = pretend.call_recorder( + lambda r: fake_trusted_root + ) + fake_new_root = repository.Metadata(signed=repository.Root(version=2)) + repository.Metadata.from_dict = pretend.call_recorder( + lambda *a: fake_new_root + ) + + payload = { + "role": "root", + "signature": {"keyid": "keyid2", "sig": "sig2"}, + } + result = test_repo.sign_metadata(payload) + + assert result == { + "task": "sign_metadata", + "status": True, + "last_update": fake_datetime.now(), + "details": { + "message": "Signature Processed", + "update": "Metadata update finished", + }, + } + assert fake_settings.get_fresh.calls == [ + pretend.call("ROOT_SIGNING"), + pretend.call("BOOTSTRAP"), + ] + assert repository.Metadata.from_dict.calls == [ + pretend.call({"metadata": "fake"}) + ] + assert test_repo._validate_signature.calls == [ + pretend.call(fake_new_root, fake_signature, fake_trusted_root), + pretend.call(fake_new_root, fake_signature), + ] + assert test_repo._root_metadata_update.calls == [ + pretend.call(fake_new_root) + ] + assert test_repo.write_repository_settings.calls == [ + pretend.call("ROOT_SIGNING", None) + ] + def test_delete_sign_metadata_bootstrap_signing_state( self, test_repo, monkeypatch, mocked_datetime ): From 5651cb2f8822270f161bc3bf526b763dd9f5d64d Mon Sep 17 00:00:00 2001 From: Lukas Puehringer Date: Wed, 4 Oct 2023 11:45:13 +0200 Subject: [PATCH 06/14] Refactor test_sign_metadata__update* and add cases Combine existing sign_metadata/update metadata tests in a single parametrized test method and add additional test cases for different return values from internal `_validate_{signature, threshold}` calls, in order to test correct sage of OR/AND operators: - signature must be valid according to trusted OR new root - threshold must be met according to trusted AND new root Note: The test removes asserts for internal method calls, which don't seem so interesting, as long as we get the expected result. Signed-off-by: Lukas Puehringer --- .../test_repository.py | 262 ++++++------------ 1 file changed, 83 insertions(+), 179 deletions(-) diff --git a/tests/unit/tuf_repository_service_worker/test_repository.py b/tests/unit/tuf_repository_service_worker/test_repository.py index 7d903454..5fdfd5fd 100644 --- a/tests/unit/tuf_repository_service_worker/test_repository.py +++ b/tests/unit/tuf_repository_service_worker/test_repository.py @@ -3271,11 +3271,69 @@ def fake_get_fresh(key): pretend.call("ROOT_SIGNING", "fake_metadata") ] - def test_sign_metadata__update__invalid_signature( - self, test_repo, monkeypatch, mocked_datetime + @pytest.mark.parametrize( + "validation_results, details, status", + [ + ( + {"signature": iter((False, False))}, + {"message": "Signature Failed", "error": "Invalid signature"}, + False, + ), + ( + { + "signature": iter((True, False)), + "threshold": iter((False, False)), + }, + { + "message": "Signature Processed", + "update": "Root v2 is pending signatures", + }, + True, + ), + ( + { + "signature": iter((False, True)), + "threshold": iter((False, True)), + }, + { + "message": "Signature Processed", + "update": "Root v2 is pending signatures", + }, + True, + ), + ( + { + "signature": iter((True, False)), + "threshold": iter((True, False)), + }, + { + "message": "Signature Processed", + "update": "Root v2 is pending signatures", + }, + True, + ), + ( + { + "signature": iter((True, True)), + "threshold": iter((True, True)), + }, + { + "message": "Signature Processed", + "update": "Metadata update finished", + }, + True, + ), + ], + ) + def test_sign_metadata__update( + self, + test_repo, + monkeypatch, + mocked_datetime, + validation_results, + details, + status, ): - fake_datetime = mocked_datetime - def fake_get_fresh(key): if key == "BOOTSTRAP": return "" @@ -3285,19 +3343,15 @@ def fake_get_fresh(key): fake_settings = pretend.stub( get_fresh=pretend.call_recorder(fake_get_fresh), ) - monkeypatch.setattr( repository, "get_repository_settings", lambda *a, **kw: fake_settings, ) - - fake_signature = pretend.stub(keyid="fake_sig") + fake_signature = pretend.stub(keyid="fake") repository.Signature.from_dict = pretend.call_recorder( lambda *a: fake_signature ) - test_repo._validate_signature = pretend.call_recorder(lambda *a: False) - fake_trusted_root = repository.Metadata( signed=repository.Root(version=1) ) @@ -3305,188 +3359,38 @@ def fake_get_fresh(key): lambda r: fake_trusted_root ) fake_new_root = repository.Metadata(signed=repository.Root(version=2)) - fake_new_root.to_dict = pretend.call_recorder(lambda: "fake") repository.Metadata.from_dict = pretend.call_recorder( lambda *a: fake_new_root ) - - payload = { - "role": "root", - "signature": {"keyid": "keyid2", "sig": "sig2"}, - } - result = test_repo.sign_metadata(payload) - - assert result == { - "task": "sign_metadata", - "status": False, - "last_update": fake_datetime.now(), - "details": { - "message": "Signature Failed", - "error": "Invalid signature", - }, - } - assert fake_settings.get_fresh.calls == [ - pretend.call("ROOT_SIGNING"), - pretend.call("BOOTSTRAP"), - ] - assert repository.Metadata.from_dict.calls == [ - pretend.call({"metadata": "fake"}) - ] - assert test_repo._validate_signature.calls == [ - pretend.call(fake_new_root, fake_signature, fake_trusted_root), - pretend.call(fake_new_root, fake_signature), - ] - - def test_sign_metadata__update__invalid_threshold( - self, test_repo, monkeypatch, mocked_datetime - ): - fake_datetime = mocked_datetime - - def fake_get_fresh(key): - if key == "BOOTSTRAP": - return "" - if key == "ROOT_SIGNING": - return {"metadata": "fake"} - - fake_settings = pretend.stub( - get_fresh=pretend.call_recorder(fake_get_fresh), - ) - - monkeypatch.setattr( - repository, - "get_repository_settings", - lambda *a, **kw: fake_settings, - ) - - fake_signature = pretend.stub(keyid="fake_sig") - repository.Signature.from_dict = pretend.call_recorder( - lambda *a: fake_signature - ) - test_repo._validate_signature = pretend.call_recorder(lambda *a: True) - test_repo._validate_threshold = pretend.call_recorder(lambda *a: False) - test_repo.write_repository_settings = pretend.call_recorder( - lambda *a: None - ) - - fake_trusted_root = repository.Metadata( - signed=repository.Root(version=1) - ) - test_repo._storage_backend.get = pretend.call_recorder( - lambda r: fake_trusted_root - ) - fake_new_root = repository.Metadata(signed=repository.Root(version=2)) fake_new_root.to_dict = pretend.call_recorder(lambda: "fake") - repository.Metadata.from_dict = pretend.call_recorder( - lambda *a: fake_new_root - ) - - payload = { - "role": "root", - "signature": {"keyid": "keyid2", "sig": "sig2"}, - } - result = test_repo.sign_metadata(payload) - - assert result == { - "task": "sign_metadata", - "status": True, - "last_update": fake_datetime.now(), - "details": { - "message": "Signature Processed", - "update": "Root v2 is pending signatures", - }, - } - assert fake_settings.get_fresh.calls == [ - pretend.call("ROOT_SIGNING"), - pretend.call("BOOTSTRAP"), - ] - assert repository.Metadata.from_dict.calls == [ - pretend.call({"metadata": "fake"}) - ] - assert test_repo._validate_signature.calls == [ - pretend.call(fake_new_root, fake_signature, fake_trusted_root), - pretend.call(fake_new_root, fake_signature), - ] - assert test_repo.write_repository_settings.calls == [ - pretend.call("ROOT_SIGNING", "fake") - ] - - def test_sign_metadata__update__finalize( - self, test_repo, monkeypatch, mocked_datetime - ): - fake_datetime = mocked_datetime - - def fake_get_fresh(key): - if key == "BOOTSTRAP": - return "" - if key == "ROOT_SIGNING": - return {"metadata": "fake"} - - fake_settings = pretend.stub( - get_fresh=pretend.call_recorder(fake_get_fresh), - ) - - monkeypatch.setattr( - repository, - "get_repository_settings", - lambda *a, **kw: fake_settings, - ) - - fake_signature = pretend.stub(keyid="fake_sig") - repository.Signature.from_dict = pretend.call_recorder( - lambda *a: fake_signature - ) - test_repo._validate_signature = pretend.call_recorder(lambda *a: True) - test_repo._validate_threshold = pretend.call_recorder(lambda *a: True) test_repo.write_repository_settings = pretend.call_recorder( - lambda *a: None - ) - test_repo._root_metadata_update = pretend.call_recorder( - lambda *a: "fake_result" + lambda *a: "fake" ) - fake_trusted_root = repository.Metadata( - signed=repository.Root(version=1) - ) - test_repo._storage_backend.get = pretend.call_recorder( - lambda r: fake_trusted_root - ) - fake_new_root = repository.Metadata(signed=repository.Root(version=2)) - repository.Metadata.from_dict = pretend.call_recorder( - lambda *a: fake_new_root - ) + # Mock return values of `_validate_{signature, threshold}` + # By using ``next`` on the fixture values, we can get different results + # in subsequent calls, and test the correct usage of AND/OR operators. + fake_signature_validation = validation_results.get("signature") + if fake_signature_validation: + test_repo._validate_signature = pretend.call_recorder( + lambda *a: next(fake_signature_validation) + ) + fake_threshold_validation = validation_results.get("threshold") + if fake_threshold_validation: + test_repo._validate_threshold = pretend.call_recorder( + lambda *a: next(fake_threshold_validation) + ) - payload = { - "role": "root", - "signature": {"keyid": "keyid2", "sig": "sig2"}, - } - result = test_repo.sign_metadata(payload) + # Call sign_metadata with fake payload + # All deserialization and validation is mocked + result = test_repo.sign_metadata({"signature": "fake"}) assert result == { "task": "sign_metadata", - "status": True, - "last_update": fake_datetime.now(), - "details": { - "message": "Signature Processed", - "update": "Metadata update finished", - }, + "status": status, + "last_update": mocked_datetime.now(), + "details": details, } - assert fake_settings.get_fresh.calls == [ - pretend.call("ROOT_SIGNING"), - pretend.call("BOOTSTRAP"), - ] - assert repository.Metadata.from_dict.calls == [ - pretend.call({"metadata": "fake"}) - ] - assert test_repo._validate_signature.calls == [ - pretend.call(fake_new_root, fake_signature, fake_trusted_root), - pretend.call(fake_new_root, fake_signature), - ] - assert test_repo._root_metadata_update.calls == [ - pretend.call(fake_new_root) - ] - assert test_repo.write_repository_settings.calls == [ - pretend.call("ROOT_SIGNING", None) - ] def test_delete_sign_metadata_bootstrap_signing_state( self, test_repo, monkeypatch, mocked_datetime From fcd057454ea81a864cc714660660ec9ffeeb32df Mon Sep 17 00:00:00 2001 From: Lukas Puehringer Date: Thu, 24 Aug 2023 16:52:04 +0200 Subject: [PATCH 07/14] Address misc review comments in sign_metadata - Use elif instead of elsewhere appropriate - Remove blank line in docstrings - Clarify comment about signature/threshold validation Co-authored-by: Martin Vrachev Signed-off-by: Lukas Puehringer --- repository_service_tuf_worker/repository.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/repository_service_tuf_worker/repository.py b/repository_service_tuf_worker/repository.py index b351a747..b3e08005 100644 --- a/repository_service_tuf_worker/repository.py +++ b/repository_service_tuf_worker/repository.py @@ -1270,7 +1270,6 @@ def _validate_signature( """ Validate signature over metadata using appropriate delegator. If no delegator is passed, the metadata itself is used as delegator. - """ if delegator is None: delegator = metadata @@ -1303,7 +1302,6 @@ def _validate_threshold( """ Validate signature threshold using appropriate delegator(s). If no delegator is passed, the metadata itself is used as delegator. - """ if delegator is None: delegator = metadata @@ -1343,10 +1341,11 @@ def _result(status, error=None, bootstrap=None, update=None): details["message"] = "Signature Failed" if error: details["error"] = error - if bootstrap: + elif bootstrap: details["bootstrap"] = bootstrap - if update: + elif update: details["update"] = update + return self._task_result(TaskName.SIGN_METADATA, status, details) signature = Signature.from_dict(payload["signature"]) @@ -1382,9 +1381,11 @@ def _result(status, error=None, bootstrap=None, update=None): return _result(True, bootstrap="Bootstrap Finished") else: - # Also consult with "trusted root" when updating a new root - # - Signature must validate with trusted OR new root - # - Threshold must validate with trusted AND new root + # We need the "trusted root" when updating to a new root: + # - signature could come from a key, which is only in the trusted + # root, OR from a key, which is only in the new root + # - threshold must validate with the threshold of keys as defined + # in the trusted root AND as defined in the new root trusted_root = self._storage_backend.get("root") trusted_signature = self._validate_signature( root, signature, trusted_root From b3c80dbda6957cb5575d620483a82ad168a15c11 Mon Sep 17 00:00:00 2001 From: Lukas Puehringer Date: Fri, 25 Aug 2023 14:58:01 +0200 Subject: [PATCH 08/14] Assert _root_metadata_update result in sign_metadata This is a temporary measure until after _root_metadata_update has been refactored (see code comment) to not fail silently e.g in in tests that mock the argument passed to _root_metadata_update. This commit also updates the related tests to now mock the `_root_metadata_update` result too. As the wrong result would no longer fail silently. Signed-off-by: Lukas Puehringer --- repository_service_tuf_worker/repository.py | 9 ++++++++- .../tuf_repository_service_worker/test_repository.py | 7 +++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/repository_service_tuf_worker/repository.py b/repository_service_tuf_worker/repository.py index b3e08005..efe3aad2 100644 --- a/repository_service_tuf_worker/repository.py +++ b/repository_service_tuf_worker/repository.py @@ -1404,7 +1404,14 @@ def _result(status, error=None, bootstrap=None, update=None): return _result(True, update=msg) # TODO: Refactor `_root_metadata_update` to de-duplicate validation - self._root_metadata_update(root) + # and messaging. At this point, we know that root is valid and + # there can be only one message. (remove assert after refactor!) + result = self._root_metadata_update(root) + assert result == { # nosec + "message": "Metadata Update Processed", + "role": "root", + } + # Update successful, root persisted -> finalize event... self.write_repository_settings("ROOT_SIGNING", None) return _result(True, update="Metadata update finished") diff --git a/tests/unit/tuf_repository_service_worker/test_repository.py b/tests/unit/tuf_repository_service_worker/test_repository.py index 5fdfd5fd..230c8e7a 100644 --- a/tests/unit/tuf_repository_service_worker/test_repository.py +++ b/tests/unit/tuf_repository_service_worker/test_repository.py @@ -3367,6 +3367,13 @@ def fake_get_fresh(key): lambda *a: "fake" ) + test_repo._root_metadata_update = pretend.call_recorder( + lambda a: { + "message": "Metadata Update Processed", + "role": "root", + } + ) + # Mock return values of `_validate_{signature, threshold}` # By using ``next`` on the fixture values, we can get different results # in subsequent calls, and test the correct usage of AND/OR operators. From ca913b35fdd3509785f6aad40648fb42822767a4 Mon Sep 17 00:00:00 2001 From: Lukas Puehringer Date: Fri, 25 Aug 2023 15:29:41 +0200 Subject: [PATCH 09/14] Check 'role' field in 'sign_metadata' payload 'sign_metadata' only supports root, thus the role in the payload is not relevant, and was ignored previously. For consistency, this commit adds a check that the role is indeed root and fails otherwise. This is also tested by adding another column to the test table of test_sign_metadata__update, used to patch the default payload in test runs. Signed-off-by: Lukas Puehringer --- repository_service_tuf_worker/repository.py | 6 ++++++ .../test_repository.py | 21 +++++++++++++++++-- 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/repository_service_tuf_worker/repository.py b/repository_service_tuf_worker/repository.py index efe3aad2..6eb15c47 100644 --- a/repository_service_tuf_worker/repository.py +++ b/repository_service_tuf_worker/repository.py @@ -1349,6 +1349,12 @@ def _result(status, error=None, bootstrap=None, update=None): return self._task_result(TaskName.SIGN_METADATA, status, details) signature = Signature.from_dict(payload["signature"]) + rolename = payload["role"] + + # Assert requested metadata type is root + if rolename != Root.type: + msg = f"Expected '{Root.type}', got '{rolename}'" + return _result(False, error=msg) # Assert pending signing event exists metadata_dict = self._settings.get_fresh("ROOT_SIGNING") diff --git a/tests/unit/tuf_repository_service_worker/test_repository.py b/tests/unit/tuf_repository_service_worker/test_repository.py index 230c8e7a..d1b3fb28 100644 --- a/tests/unit/tuf_repository_service_worker/test_repository.py +++ b/tests/unit/tuf_repository_service_worker/test_repository.py @@ -3272,14 +3272,25 @@ def fake_get_fresh(key): ] @pytest.mark.parametrize( - "validation_results, details, status", + "payload_patch, validation_results, details, status", [ ( + {"role": "foo"}, + {}, + { + "message": "Signature Failed", + "error": "Expected 'root', got 'foo'", + }, + False, + ), + ( + {}, {"signature": iter((False, False))}, {"message": "Signature Failed", "error": "Invalid signature"}, False, ), ( + {}, { "signature": iter((True, False)), "threshold": iter((False, False)), @@ -3291,6 +3302,7 @@ def fake_get_fresh(key): True, ), ( + {}, { "signature": iter((False, True)), "threshold": iter((False, True)), @@ -3302,6 +3314,7 @@ def fake_get_fresh(key): True, ), ( + {}, { "signature": iter((True, False)), "threshold": iter((True, False)), @@ -3313,6 +3326,7 @@ def fake_get_fresh(key): True, ), ( + {}, { "signature": iter((True, True)), "threshold": iter((True, True)), @@ -3330,6 +3344,7 @@ def test_sign_metadata__update( test_repo, monkeypatch, mocked_datetime, + payload_patch, validation_results, details, status, @@ -3390,7 +3405,9 @@ def fake_get_fresh(key): # Call sign_metadata with fake payload # All deserialization and validation is mocked - result = test_repo.sign_metadata({"signature": "fake"}) + payload = {"signature": "fake", "role": "root"} + payload.update(payload_patch) + result = test_repo.sign_metadata(payload) assert result == { "task": "sign_metadata", From b883cf5813a5f2e8396ad28754f82d8f0ff1ce86 Mon Sep 17 00:00:00 2001 From: Lukas Puehringer Date: Mon, 28 Aug 2023 16:43:01 +0200 Subject: [PATCH 10/14] Re-use part of _root_metadata_update Factor out "finalize" part of _root_metadata_update to re-use in sign_metadata. Prior to this commit, sign_metadata would call _root_metadata_update duplicating much of the verification behavior, although it only cared for the finalization part. Now, it can call into the desired subroutine only. Signed-off-by: Lukas Puehringer --- repository_service_tuf_worker/repository.py | 28 +++++++++------------ 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/repository_service_tuf_worker/repository.py b/repository_service_tuf_worker/repository.py index 6eb15c47..605dae25 100644 --- a/repository_service_tuf_worker/repository.py +++ b/repository_service_tuf_worker/repository.py @@ -1146,6 +1146,16 @@ def _root_metadata_update( }, ) + self._root_metadata_update_finalize(current_root, new_root) + return self._task_result( + TaskName.METADATA_UPDATE, + True, + {"message": "Metadata Update Processed", "role": Root.type}, + ) + + def _root_metadata_update_finalize( + self, current_root: Metadata[Root], new_root: Metadata[Root] + ) -> None: # We always persist the new root metadata, but we cannot persist # without verifying if the online key is rotated to avoid a mismatch # with the rest of the roles using the online key. @@ -1185,12 +1195,6 @@ def _root_metadata_update( f"({self._timeout} seconds)" ) - return self._task_result( - TaskName.METADATA_UPDATE, - True, - {"message": "Metadata Update Processed", "role": Root.type}, - ) - def metadata_update( self, payload: Dict[Literal["metadata"], Dict[Literal[Root.type], Any]], @@ -1409,16 +1413,8 @@ def _result(status, error=None, bootstrap=None, update=None): msg = f"Root v{root.signed.version} is pending signatures" return _result(True, update=msg) - # TODO: Refactor `_root_metadata_update` to de-duplicate validation - # and messaging. At this point, we know that root is valid and - # there can be only one message. (remove assert after refactor!) - result = self._root_metadata_update(root) - assert result == { # nosec - "message": "Metadata Update Processed", - "role": "root", - } - - # Update successful, root persisted -> finalize event... + # Threshold reached -> finalize event + self._root_metadata_update_finalize(trusted_root, root) self.write_repository_settings("ROOT_SIGNING", None) return _result(True, update="Metadata update finished") From c7792d312a1fefd0fa743ca1f8af8b8ff0479ea3 Mon Sep 17 00:00:00 2001 From: Lukas Puehringer Date: Thu, 5 Oct 2023 12:17:17 +0200 Subject: [PATCH 11/14] Support partially signed metadata in metadata_update Update `_root_metadata_update` subroutine of `metadata_update` task interface to accept partially signed metadata. If the required threshold is not met, the passed metadata is written to the "ROOT_SIGNING" repository setting and the task returns with a "pending signatures" message Missing signatures can then be added using the `sign_metadata` task interface, which also finalizes the metadata update, as soon as the threshold is reached. NOTE: Currently, there is no sanity check of signatures below the threshold. A useful check might be, that passed metadata has at least 1 initial and only valid signatures, akin to bootstrap. #367 will make this a lot easier. This change also includes a reordering of the validation routine to check the version increment prior to signature threshold. Otherwise, a bad version would only be detected after all signatures have been added. Signed-off-by: Lukas Puehringer --- repository_service_tuf_worker/repository.py | 31 ++++++++--- .../test_repository.py | 55 ++++++++++++++++--- 2 files changed, 71 insertions(+), 15 deletions(-) diff --git a/repository_service_tuf_worker/repository.py b/repository_service_tuf_worker/repository.py index 605dae25..c63a1165 100644 --- a/repository_service_tuf_worker/repository.py +++ b/repository_service_tuf_worker/repository.py @@ -1109,12 +1109,6 @@ def _trusted_root_update( f"Expected 'root', got '{new_root.signed.type}'" ) - # Verify that new root is signed by trusted root - current_root.verify_delegate(Root.type, new_root) - - # Verify that new root is signed by itself - new_root.verify_delegate(Root.type, new_root) - # Verify the new root version if new_root.signed.version != current_root.signed.version + 1: raise BadVersionNumberError( @@ -1122,6 +1116,12 @@ def _trusted_root_update( f" instead got version {new_root.signed.version}" ) + # Verify that new root is signed by trusted root + current_root.verify_delegate(Root.type, new_root) + + # Verify that new root is signed by itself + new_root.verify_delegate(Root.type, new_root) + def _root_metadata_update( self, new_root: Metadata[Root] ) -> Dict[str, Any]: @@ -1130,9 +1130,26 @@ def _root_metadata_update( try: self._trusted_root_update(current_root, new_root) + + except UnsignedMetadataError: + # TODO: Add missing sanity check - new root must have at least 1 + # and only valid signature - use `get_verification_status` (#367) + self.write_repository_settings("ROOT_SIGNING", new_root.to_dict()) + return self._task_result( + TaskName.METADATA_UPDATE, + True, + { + "message": "Metadata Update Processed", + "role": Root.type, + "update": ( + f"Root v{new_root.signed.version} is " + "pending signatures" + ), + }, + ) + except ( ValueError, - UnsignedMetadataError, TypeError, BadVersionNumberError, RepositoryError, diff --git a/tests/unit/tuf_repository_service_worker/test_repository.py b/tests/unit/tuf_repository_service_worker/test_repository.py index d1b3fb28..ee9ed670 100644 --- a/tests/unit/tuf_repository_service_worker/test_repository.py +++ b/tests/unit/tuf_repository_service_worker/test_repository.py @@ -2513,25 +2513,17 @@ def test__trusted_root_update_bad_version(self, test_repo): version=4, type=repository.Root.type, ), - verify_delegate=pretend.call_recorder(lambda *a: None), ) fake_old_root_md = pretend.stub( signed=pretend.stub( roles={"timestamp": pretend.stub(keyids={"k1": "v1"})}, version=1, ), - verify_delegate=pretend.call_recorder(lambda *a: None), ) with pytest.raises(repository.BadVersionNumberError) as err: test_repo._trusted_root_update(fake_old_root_md, fake_new_root_md) assert "Expected root version 2 instead got version 4" in str(err) - assert fake_new_root_md.verify_delegate.calls == [ - pretend.call(repository.Root.type, fake_new_root_md) - ] - assert fake_old_root_md.verify_delegate.calls == [ - pretend.call(repository.Root.type, fake_new_root_md) - ] def test__trusted_root_update_bad_type(self, test_repo): fake_new_root_md = pretend.stub( @@ -2593,6 +2585,53 @@ def test__root_metadata_update(self, test_repo, mocked_datetime): pretend.call(fake_new_root_md, repository.Root.type) ] + def test__root_metadata_update_signatures_pending( + self, test_repo, mocked_datetime + ): + fake_datetime = mocked_datetime + fake_new_root_md = pretend.stub( + signed=pretend.stub( + roles={"timestamp": pretend.stub(keyids={"k1": "v1"})}, + version=2, + ) + ) + fake_old_root_md = pretend.stub( + signed=pretend.stub( + roles={"timestamp": pretend.stub(keyids={"k1": "v1"})}, + version=1, + ) + ) + test_repo._storage_backend.get = pretend.call_recorder( + lambda *a: fake_old_root_md + ) + test_repo._trusted_root_update = pretend.raiser( + repository.UnsignedMetadataError() + ) + + fake_new_root_md.to_dict = pretend.call_recorder(lambda: "fake dict") + test_repo.write_repository_settings = pretend.call_recorder( + lambda *a: "fake" + ) + + result = test_repo._root_metadata_update(fake_new_root_md) + + assert result == { + "task": "metadata_update", + "status": True, + "last_update": fake_datetime.now(), + "details": { + "message": "Metadata Update Processed", + "role": "root", + "update": "Root v2 is pending signatures", + }, + } + assert test_repo._storage_backend.get.calls == [ + pretend.call(repository.Root.type) + ] + assert test_repo.write_repository_settings.calls == [ + pretend.call("ROOT_SIGNING", "fake dict") + ] + def test__root_metadata_update_not_trusted( self, test_repo, mocked_datetime ): From bc2b3b5e2b31cc0522d1ffc8ba5961122587b9bf Mon Sep 17 00:00:00 2001 From: Lukas Puehringer Date: Fri, 20 Oct 2023 12:10:23 +0200 Subject: [PATCH 12/14] Rename two boolean variables in sign_metadata Signed-off-by: Lukas Puehringer --- repository_service_tuf_worker/repository.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/repository_service_tuf_worker/repository.py b/repository_service_tuf_worker/repository.py index c63a1165..fcc893c8 100644 --- a/repository_service_tuf_worker/repository.py +++ b/repository_service_tuf_worker/repository.py @@ -1414,12 +1414,12 @@ def _result(status, error=None, bootstrap=None, update=None): # - threshold must validate with the threshold of keys as defined # in the trusted root AND as defined in the new root trusted_root = self._storage_backend.get("root") - trusted_signature = self._validate_signature( + is_valid_trusted = self._validate_signature( root, signature, trusted_root ) - new_signature = self._validate_signature(root, signature) + is_valid_new = self._validate_signature(root, signature) - if not (trusted_signature or new_signature): + if not (is_valid_trusted or is_valid_new): return _result(False, error="Invalid signature") root.signatures[signature.keyid] = signature From f8555609f0775345cde4bde679706a759e995670 Mon Sep 17 00:00:00 2001 From: Lukas Puehringer Date: Fri, 20 Oct 2023 12:48:08 +0200 Subject: [PATCH 13/14] Change test_sign_metadata__update test style Use test copy pasta instead of @parametrize. Signed-off-by: Lukas Puehringer --- .../test_repository.py | 443 ++++++++++++++---- 1 file changed, 354 insertions(+), 89 deletions(-) diff --git a/tests/unit/tuf_repository_service_worker/test_repository.py b/tests/unit/tuf_repository_service_worker/test_repository.py index ee9ed670..31c70295 100644 --- a/tests/unit/tuf_repository_service_worker/test_repository.py +++ b/tests/unit/tuf_repository_service_worker/test_repository.py @@ -3310,83 +3310,67 @@ def fake_get_fresh(key): pretend.call("ROOT_SIGNING", "fake_metadata") ] - @pytest.mark.parametrize( - "payload_patch, validation_results, details, status", - [ - ( - {"role": "foo"}, - {}, - { - "message": "Signature Failed", - "error": "Expected 'root', got 'foo'", - }, - False, - ), - ( - {}, - {"signature": iter((False, False))}, - {"message": "Signature Failed", "error": "Invalid signature"}, - False, - ), - ( - {}, - { - "signature": iter((True, False)), - "threshold": iter((False, False)), - }, - { - "message": "Signature Processed", - "update": "Root v2 is pending signatures", - }, - True, - ), - ( - {}, - { - "signature": iter((False, True)), - "threshold": iter((False, True)), - }, - { - "message": "Signature Processed", - "update": "Root v2 is pending signatures", - }, - True, - ), - ( - {}, - { - "signature": iter((True, False)), - "threshold": iter((True, False)), - }, - { - "message": "Signature Processed", - "update": "Root v2 is pending signatures", - }, - True, - ), - ( - {}, - { - "signature": iter((True, True)), - "threshold": iter((True, True)), - }, - { - "message": "Signature Processed", - "update": "Metadata update finished", - }, - True, - ), - ], - ) - def test_sign_metadata__update( + def test_sign_metadata__update__bad_role_type( + self, test_repo, monkeypatch, mocked_datetime + ): + def fake_get_fresh(key): + if key == "BOOTSTRAP": + return "" + if key == "ROOT_SIGNING": + return {"metadata": "fake"} + + fake_settings = pretend.stub( + get_fresh=pretend.call_recorder(fake_get_fresh), + ) + monkeypatch.setattr( + repository, + "get_repository_settings", + lambda *a, **kw: fake_settings, + ) + fake_signature = pretend.stub(keyid="fake") + repository.Signature.from_dict = pretend.call_recorder( + lambda *a: fake_signature + ) + fake_trusted_root = repository.Metadata( + signed=repository.Root(version=1) + ) + test_repo._storage_backend.get = pretend.call_recorder( + lambda r: fake_trusted_root + ) + fake_new_root = repository.Metadata(signed=repository.Root(version=2)) + repository.Metadata.from_dict = pretend.call_recorder( + lambda *a: fake_new_root + ) + fake_new_root.to_dict = pretend.call_recorder(lambda: "fake") + test_repo.write_repository_settings = pretend.call_recorder( + lambda *a: "fake" + ) + + test_repo._root_metadata_update = pretend.call_recorder( + lambda a: { + "message": "Metadata Update Processed", + "role": "root", + } + ) + + payload = {"signature": "fake", "role": "foo"} + result = test_repo.sign_metadata(payload) + + assert result == { + "task": "sign_metadata", + "status": False, + "last_update": mocked_datetime.now(), + "details": { + "message": "Signature Failed", + "error": "Expected 'root', got 'foo'", + }, + } + + def test_sign_metadata__update__invalid_signature( self, test_repo, monkeypatch, mocked_datetime, - payload_patch, - validation_results, - details, - status, ): def fake_get_fresh(key): if key == "BOOTSTRAP": @@ -3428,31 +3412,312 @@ def fake_get_fresh(key): } ) - # Mock return values of `_validate_{signature, threshold}` - # By using ``next`` on the fixture values, we can get different results - # in subsequent calls, and test the correct usage of AND/OR operators. - fake_signature_validation = validation_results.get("signature") - if fake_signature_validation: - test_repo._validate_signature = pretend.call_recorder( - lambda *a: next(fake_signature_validation) - ) - fake_threshold_validation = validation_results.get("threshold") - if fake_threshold_validation: - test_repo._validate_threshold = pretend.call_recorder( - lambda *a: next(fake_threshold_validation) - ) + # Use `next` below to mock subsequent calls + fake_signature_result = iter((False, False)) + test_repo._validate_signature = pretend.call_recorder( + lambda *a: next(fake_signature_result) + ) + + payload = {"signature": "fake", "role": "root"} + result = test_repo.sign_metadata(payload) + + assert result == { + "task": "sign_metadata", + "status": False, + "last_update": mocked_datetime.now(), + "details": { + "message": "Signature Failed", + "error": "Invalid signature", + }, + } + + def test_sign_metadata__update__invalid_threshold__trusted_and_new( + self, + test_repo, + monkeypatch, + mocked_datetime, + ): + def fake_get_fresh(key): + if key == "BOOTSTRAP": + return "" + if key == "ROOT_SIGNING": + return {"metadata": "fake"} + + fake_settings = pretend.stub( + get_fresh=pretend.call_recorder(fake_get_fresh), + ) + monkeypatch.setattr( + repository, + "get_repository_settings", + lambda *a, **kw: fake_settings, + ) + fake_signature = pretend.stub(keyid="fake") + repository.Signature.from_dict = pretend.call_recorder( + lambda *a: fake_signature + ) + fake_trusted_root = repository.Metadata( + signed=repository.Root(version=1) + ) + test_repo._storage_backend.get = pretend.call_recorder( + lambda r: fake_trusted_root + ) + fake_new_root = repository.Metadata(signed=repository.Root(version=2)) + repository.Metadata.from_dict = pretend.call_recorder( + lambda *a: fake_new_root + ) + fake_new_root.to_dict = pretend.call_recorder(lambda: "fake") + test_repo.write_repository_settings = pretend.call_recorder( + lambda *a: "fake" + ) + + test_repo._root_metadata_update = pretend.call_recorder( + lambda a: { + "message": "Metadata Update Processed", + "role": "root", + } + ) + + # Use `next` below to mock subsequent calls + fake_signature_result = iter((True, False)) + fake_threshold_result = iter((False, False)) + + test_repo._validate_signature = pretend.call_recorder( + lambda *a: next(fake_signature_result) + ) + test_repo._validate_threshold = pretend.call_recorder( + lambda *a: next(fake_threshold_result) + ) + + # Call sign_metadata with fake payload + # All deserialization and validation is mocked + payload = {"signature": "fake", "role": "root"} + result = test_repo.sign_metadata(payload) + + assert result == { + "task": "sign_metadata", + "status": True, + "last_update": mocked_datetime.now(), + "details": { + "message": "Signature Processed", + "update": "Root v2 is pending signatures", + }, + } + + def test_sign_metadata__update__invalid_threshold__trusted( + self, + test_repo, + monkeypatch, + mocked_datetime, + ): + def fake_get_fresh(key): + if key == "BOOTSTRAP": + return "" + if key == "ROOT_SIGNING": + return {"metadata": "fake"} + + fake_settings = pretend.stub( + get_fresh=pretend.call_recorder(fake_get_fresh), + ) + monkeypatch.setattr( + repository, + "get_repository_settings", + lambda *a, **kw: fake_settings, + ) + fake_signature = pretend.stub(keyid="fake") + repository.Signature.from_dict = pretend.call_recorder( + lambda *a: fake_signature + ) + fake_trusted_root = repository.Metadata( + signed=repository.Root(version=1) + ) + test_repo._storage_backend.get = pretend.call_recorder( + lambda r: fake_trusted_root + ) + fake_new_root = repository.Metadata(signed=repository.Root(version=2)) + repository.Metadata.from_dict = pretend.call_recorder( + lambda *a: fake_new_root + ) + fake_new_root.to_dict = pretend.call_recorder(lambda: "fake") + test_repo.write_repository_settings = pretend.call_recorder( + lambda *a: "fake" + ) + + test_repo._root_metadata_update = pretend.call_recorder( + lambda a: { + "message": "Metadata Update Processed", + "role": "root", + } + ) + + # Use `next` below to mock subsequent calls + fake_signature_result = iter((False, True)) + fake_threshold_result = iter((False, True)) + + test_repo._validate_signature = pretend.call_recorder( + lambda *a: next(fake_signature_result) + ) + test_repo._validate_threshold = pretend.call_recorder( + lambda *a: next(fake_threshold_result) + ) + + # Call sign_metadata with fake payload + # All deserialization and validation is mocked + payload = {"signature": "fake", "role": "root"} + result = test_repo.sign_metadata(payload) + + assert result == { + "task": "sign_metadata", + "status": True, + "last_update": mocked_datetime.now(), + "details": { + "message": "Signature Processed", + "update": "Root v2 is pending signatures", + }, + } + + def test_sign_metadata__update__invalid_threshold__new( + self, + test_repo, + monkeypatch, + mocked_datetime, + ): + """Test: New root does not meet signature threshold.""" + + def fake_get_fresh(key): + if key == "BOOTSTRAP": + return "" + if key == "ROOT_SIGNING": + return {"metadata": "fake"} + + fake_settings = pretend.stub( + get_fresh=pretend.call_recorder(fake_get_fresh), + ) + monkeypatch.setattr( + repository, + "get_repository_settings", + lambda *a, **kw: fake_settings, + ) + fake_signature = pretend.stub(keyid="fake") + repository.Signature.from_dict = pretend.call_recorder( + lambda *a: fake_signature + ) + fake_trusted_root = repository.Metadata( + signed=repository.Root(version=1) + ) + test_repo._storage_backend.get = pretend.call_recorder( + lambda r: fake_trusted_root + ) + fake_new_root = repository.Metadata(signed=repository.Root(version=2)) + repository.Metadata.from_dict = pretend.call_recorder( + lambda *a: fake_new_root + ) + fake_new_root.to_dict = pretend.call_recorder(lambda: "fake") + test_repo.write_repository_settings = pretend.call_recorder( + lambda *a: "fake" + ) + + test_repo._root_metadata_update = pretend.call_recorder( + lambda a: { + "message": "Metadata Update Processed", + "role": "root", + } + ) + + # Use `next` below to mock subsequent calls + fake_signature_result = iter((True, True)) + fake_threshold_result = iter((True, False)) + + test_repo._validate_signature = pretend.call_recorder( + lambda *a: next(fake_signature_result) + ) + test_repo._validate_threshold = pretend.call_recorder( + lambda *a: next(fake_threshold_result) + ) + + # Call sign_metadata with fake payload + # All deserialization and validation is mocked + payload = {"signature": "fake", "role": "root"} + result = test_repo.sign_metadata(payload) + + assert result == { + "task": "sign_metadata", + "status": True, + "last_update": mocked_datetime.now(), + "details": { + "message": "Signature Processed", + "update": "Root v2 is pending signatures", + }, + } + + def test_sign_metadata__update__valid_threshold( + self, + test_repo, + monkeypatch, + mocked_datetime, + ): + def fake_get_fresh(key): + if key == "BOOTSTRAP": + return "" + if key == "ROOT_SIGNING": + return {"metadata": "fake"} + + fake_settings = pretend.stub( + get_fresh=pretend.call_recorder(fake_get_fresh), + ) + monkeypatch.setattr( + repository, + "get_repository_settings", + lambda *a, **kw: fake_settings, + ) + fake_signature = pretend.stub(keyid="fake") + repository.Signature.from_dict = pretend.call_recorder( + lambda *a: fake_signature + ) + fake_trusted_root = repository.Metadata( + signed=repository.Root(version=1) + ) + test_repo._storage_backend.get = pretend.call_recorder( + lambda r: fake_trusted_root + ) + fake_new_root = repository.Metadata(signed=repository.Root(version=2)) + repository.Metadata.from_dict = pretend.call_recorder( + lambda *a: fake_new_root + ) + fake_new_root.to_dict = pretend.call_recorder(lambda: "fake") + test_repo.write_repository_settings = pretend.call_recorder( + lambda *a: "fake" + ) + + test_repo._root_metadata_update = pretend.call_recorder( + lambda a: { + "message": "Metadata Update Processed", + "role": "root", + } + ) + # Use `next` below to mock subsequent calls + fake_signature_result = iter((True, True)) + fake_threshold_result = iter((True, True)) + + test_repo._validate_signature = pretend.call_recorder( + lambda *a: next(fake_signature_result) + ) + test_repo._validate_threshold = pretend.call_recorder( + lambda *a: next(fake_threshold_result) + ) # Call sign_metadata with fake payload # All deserialization and validation is mocked payload = {"signature": "fake", "role": "root"} - payload.update(payload_patch) result = test_repo.sign_metadata(payload) assert result == { "task": "sign_metadata", - "status": status, + "status": True, "last_update": mocked_datetime.now(), - "details": details, + "details": { + "message": "Signature Processed", + "update": "Metadata update finished", + }, } def test_delete_sign_metadata_bootstrap_signing_state( From ac87f2f8d211527c2bf4141406bfe93f269cae2c Mon Sep 17 00:00:00 2001 From: Lukas Puehringer Date: Fri, 27 Oct 2023 14:10:38 +0200 Subject: [PATCH 14/14] Remove unused mocks in test_sign_metadata__update Signed-off-by: Lukas Puehringer --- .../test_repository.py | 74 ------------------- 1 file changed, 74 deletions(-) diff --git a/tests/unit/tuf_repository_service_worker/test_repository.py b/tests/unit/tuf_repository_service_worker/test_repository.py index 31c70295..dbf3dc6a 100644 --- a/tests/unit/tuf_repository_service_worker/test_repository.py +++ b/tests/unit/tuf_repository_service_worker/test_repository.py @@ -3313,46 +3313,10 @@ def fake_get_fresh(key): def test_sign_metadata__update__bad_role_type( self, test_repo, monkeypatch, mocked_datetime ): - def fake_get_fresh(key): - if key == "BOOTSTRAP": - return "" - if key == "ROOT_SIGNING": - return {"metadata": "fake"} - - fake_settings = pretend.stub( - get_fresh=pretend.call_recorder(fake_get_fresh), - ) - monkeypatch.setattr( - repository, - "get_repository_settings", - lambda *a, **kw: fake_settings, - ) fake_signature = pretend.stub(keyid="fake") repository.Signature.from_dict = pretend.call_recorder( lambda *a: fake_signature ) - fake_trusted_root = repository.Metadata( - signed=repository.Root(version=1) - ) - test_repo._storage_backend.get = pretend.call_recorder( - lambda r: fake_trusted_root - ) - fake_new_root = repository.Metadata(signed=repository.Root(version=2)) - repository.Metadata.from_dict = pretend.call_recorder( - lambda *a: fake_new_root - ) - fake_new_root.to_dict = pretend.call_recorder(lambda: "fake") - test_repo.write_repository_settings = pretend.call_recorder( - lambda *a: "fake" - ) - - test_repo._root_metadata_update = pretend.call_recorder( - lambda a: { - "message": "Metadata Update Processed", - "role": "root", - } - ) - payload = {"signature": "fake", "role": "foo"} result = test_repo.sign_metadata(payload) @@ -3400,17 +3364,6 @@ def fake_get_fresh(key): repository.Metadata.from_dict = pretend.call_recorder( lambda *a: fake_new_root ) - fake_new_root.to_dict = pretend.call_recorder(lambda: "fake") - test_repo.write_repository_settings = pretend.call_recorder( - lambda *a: "fake" - ) - - test_repo._root_metadata_update = pretend.call_recorder( - lambda a: { - "message": "Metadata Update Processed", - "role": "root", - } - ) # Use `next` below to mock subsequent calls fake_signature_result = iter((False, False)) @@ -3470,13 +3423,6 @@ def fake_get_fresh(key): lambda *a: "fake" ) - test_repo._root_metadata_update = pretend.call_recorder( - lambda a: { - "message": "Metadata Update Processed", - "role": "root", - } - ) - # Use `next` below to mock subsequent calls fake_signature_result = iter((True, False)) fake_threshold_result = iter((False, False)) @@ -3542,13 +3488,6 @@ def fake_get_fresh(key): lambda *a: "fake" ) - test_repo._root_metadata_update = pretend.call_recorder( - lambda a: { - "message": "Metadata Update Processed", - "role": "root", - } - ) - # Use `next` below to mock subsequent calls fake_signature_result = iter((False, True)) fake_threshold_result = iter((False, True)) @@ -3616,13 +3555,6 @@ def fake_get_fresh(key): lambda *a: "fake" ) - test_repo._root_metadata_update = pretend.call_recorder( - lambda a: { - "message": "Metadata Update Processed", - "role": "root", - } - ) - # Use `next` below to mock subsequent calls fake_signature_result = iter((True, True)) fake_threshold_result = iter((True, False)) @@ -3688,12 +3620,6 @@ def fake_get_fresh(key): lambda *a: "fake" ) - test_repo._root_metadata_update = pretend.call_recorder( - lambda a: { - "message": "Metadata Update Processed", - "role": "root", - } - ) # Use `next` below to mock subsequent calls fake_signature_result = iter((True, True)) fake_threshold_result = iter((True, True))