Skip to content

Commit

Permalink
Support 'metadata update' in 'sign_metadata' task
Browse files Browse the repository at this point in the history
Implement support for distributed asynchronous root metadata signging
in the course of a "metadata update" event.

Other than the already supported "bootstrap" signing event, signatures
added to root during "metadata update" must validate with keys from
trusted OR new root, and meet the signature threshold of trusted AND new
root.

*Related changes:*
- Refactor `_validate_{signature, threshold}` helpers to accept an
  optional delegator (e.g. trusted root).
- Add `_sign_result` helper to return a "sign metadata"-specific
  task result.

Signed-off-by: Lukas Puehringer <[email protected]>
  • Loading branch information
lukpueh committed Aug 14, 2023
1 parent 391c8e5 commit 3a9c034
Showing 1 changed file with 90 additions and 98 deletions.
188 changes: 90 additions & 98 deletions repository_service_tuf_worker/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -1292,20 +1292,25 @@ def metadata_rotation(
return self.metadata_update(payload, update_state)

@staticmethod
def _validate_signature(metadata: Metadata, signature: Signature) -> bool:
def _validate_signature(
metadata: Metadata,
signature: Signature,
delegator: Optional[Metadata] = None,
) -> bool:
"""
Validate signature over metadata using appropriate delegator.
NOTE: In "metadata update" signing event, the public key and
authorization info is retrieved from "trusted root"
Validate signature over metadata using appropriate delegator.
If no delegator is passed, the metadata itself is used as delegator.
"""
if delegator is None:
delegator = metadata

keyid = signature.keyid
if keyid not in metadata.signed.roles[Root.type].keyids:
if keyid not in delegator.signed.roles[Root.type].keyids:
logging.info(f"signature '{keyid}' not authorized")
return False

key = metadata.signed.keys.get(signature.keyid)
key = delegator.signed.keys.get(signature.keyid)
if not key:
logging.info(f"no key for signature '{keyid}'")
return False
Expand All @@ -1322,32 +1327,43 @@ def _validate_signature(metadata: Metadata, signature: Signature) -> bool:
return True

@staticmethod
def _validate_threshold(metadata: Metadata) -> bool:
def _validate_threshold(
metadata: Metadata, delegator: Optional[Metadata] = None
) -> bool:
"""
Validate signature threshold using appropriate delegator(s).
If no delegator is passed, the metadata itself is used as delegator.
NOTE: In "metadata update" signing event, the threshold for:
- root is validated with the passed metadata AND the trusted root;
- top-level targets is validated with the trusted root;
- delegated targets is validated with the delegating targets;
as delegator.
"""
if delegator is None:
delegator = metadata

try:
# TODO: `verify_delegate` does not tell us if there are any
# superfluous valid or invalid signatures. Is this something we
# want to know, e.g. to detect mistakes? To detect superfluous
# signatures if verify_delegate succeeds, would be easy: `assert
# len(signatures) == threshold`. Anything, else would require a
# custom `verify_delegate` function.
metadata.verify_delegate(Root.type, metadata)
delegator.verify_delegate(Root.type, metadata)

except UnsignedMetadataError as e:
logging.info(e)
return False

return True

def _sign_result(
self, message: str, sign_metadata: bool
) -> Dict[str, Any]:
"""Helper to return task result for "sign metadata" event."""
if sign_metadata:
status = "Signature processed"
else:
status = "Signature Failed"

return self._task_result(
status,
{
"sign_metadata": sign_metadata,
"message": message,
},
)

def sign_metadata(
self,
payload: Dict[str, Any],
Expand All @@ -1357,92 +1373,68 @@ def sign_metadata(
) -> Dict[str, Any]:
"""Add signature to metadata for pending signing event.
Add signature (from payload) to cached role metadata (from settings)
for the role that matches the passed rolename (from payload), if a
signing event exists for that role, and the signature is valid.
Add signature (from payload) to cached root metadata (from settings), if
a signing event exists, and the signature is valid.
If the signature threshold is reached, the signing event is finalized.
Signing event types are 'bootstrap' or 'metadata update'.
** Signing event types (and details) **
BOOTSTRAP: Only root metadata can be updated in this event. To verify
the passed signature, the keys and threshold are read from the root
metadata to be updated itself. If the threshold is reached, the
bootstrap process is finalized.
METADATA UPDATE: Root, targets or delegated targets metadata can be
updated in this event. Depending on the metadata type, the authorized
public keys and threshold are read from different delegating metadata.
If the signature threshold is reached, the signing event is finalized,
otherwise it remains in pending state.
"""
rolename = payload["role"]
signature_dict = payload["signature"]
signature = Signature.from_dict(payload["signature"])

# Assert pending signing event
metadata_dict = self._settings.get_fresh(f"{rolename.upper()}_SIGNING")
# Assert pending signing event exists
metadata_dict = self._settings.get_fresh("ROOT_SIGNING")
if metadata_dict is None:
return self._task_result(
"Signature Failed",
{
"sign_metadata": False,
"message": f"No signatures pending for {rolename}",
},
)
return self._sign_result("No pending metadata", False)

# Assert signing event type (currently bootstrap only)
# TODO: repository-service-tuf/repository-service-tuf-worker#336
bootstrap_state = self._settings.get_fresh("BOOTSTRAP")
if "signing" not in bootstrap_state:
return self._task_result(
"Signature Failed",
{
"sign_metadata": False,
"message": "No bootstrap available for signing",
},
)

# Assert metadata type is allowed for signing event
# Assert metadata type is root
root = Metadata.from_dict(metadata_dict)
if not isinstance(root.signed, Root):
return self._task_result(
"Signature Failed",
{
"sign_metadata": False,
"message": f"Role {rolename} has wrong type",
},
)
msg = f"Expected 'root', got '{root.signed.type}'"
return self._sign_result(msg, False)

# Assert passed signature is valid for metadata
signature = Signature.from_dict(signature_dict)
if not self._validate_signature(root, signature):
return self._task_result(
"Signature Failed",
{
"sign_metadata": False,
"message": "Invalid signature",
},
)
# If it isn't a "bootstrap" signing event, it must be "update metadata"
bootstrap_state = self._settings.get_fresh("BOOTSTRAP")
if "signing" in bootstrap_state:
# Signature and threshold of initial root can only self-validate,
# there is no "trusted root" at bootstrap time yet.
if not self._validate_signature(root, signature):
return self._sign_result("Invalid signature", False)

root.signatures[signature.keyid] = signature
if not self._validate_threshold(root):
self.write_repository_settings("ROOT_SIGNING", root.to_dict())
msg = f"Root v{root.signed.version} is pending signatures"
return self._sign_result(msg, True)

bootstrap_task_id = bootstrap_state.split("signing-")[1]
self._bootstrap_finalize(root, bootstrap_task_id)
return self._sign_result("Bootstrap finished", True)

# Check threshold with new signature included
root.signatures[signature.keyid] = signature
if not self._validate_threshold(root):
self.write_repository_settings("ROOT_SIGNING", root.to_dict())
return self._task_result(
"Signature processed",
{
"sign_metadata": True,
"message": (
f"Root v{root.signed.version} is pending signatures"
),
},
else:
# Also consult with "trusted root" when updating a new root
# - Signature must validate with trusted OR new root
# - Threshold must validate with trusted AND new root
trusted_root = self._storage_backend.get("root")
trusted_signature = self._validate_signature(
root, signature, trusted_root
)

# Finalize bootstrap
bootstrap_task_id = bootstrap_state.split("signing-")[1]
self._bootstrap_finalize(root, bootstrap_task_id)
return self._task_result(
"Signature processed",
{
"sign_metadata": True,
"message": "Bootstrap finished",
},
)
new_signature = self._validate_signature(root, signature)

if not (trusted_signature or new_signature):
return self._sign_result("Invalid signature", False)

root.signatures[signature.keyid] = signature
trusted_threshold = self._validate_threshold(root, trusted_root)
new_threshold = self._validate_threshold(root)
if not (trusted_threshold and new_threshold):
self.write_repository_settings("ROOT_SIGNING", root.to_dict())
msg = f"Root v{root.signed.version} is pending signatures"
return self._sign_result(msg, True)

# TODO: Refactor `_root_metadata_update` to de-duplicate validation
self._root_metadata_update(root)
# Update successful, root persisted -> finalize event...
self.write_repository_settings("ROOT_SIGNING", None)
return self._sign_result("Metadata update finished", True)

0 comments on commit 3a9c034

Please sign in to comment.