Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support 'metadata update' in 'sign_metadata' task #355

Merged
merged 15 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
245 changes: 129 additions & 116 deletions repository_service_tuf_worker/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -1109,19 +1109,19 @@ def _trusted_root_update(
f"Expected 'root', got '{new_root.signed.type}'"
)

# Verify that new root is signed by trusted root
current_root.verify_delegate(Root.type, new_root)

# Verify that new root is signed by itself
new_root.verify_delegate(Root.type, new_root)

# Verify the new root version
if new_root.signed.version != current_root.signed.version + 1:
raise BadVersionNumberError(
f"Expected root version {current_root.signed.version + 1}"
f" instead got version {new_root.signed.version}"
)

# Verify that new root is signed by trusted root
current_root.verify_delegate(Root.type, new_root)

# Verify that new root is signed by itself
new_root.verify_delegate(Root.type, new_root)

def _root_metadata_update(
self, new_root: Metadata[Root]
) -> Dict[str, Any]:
Expand All @@ -1130,9 +1130,26 @@ def _root_metadata_update(

try:
self._trusted_root_update(current_root, new_root)

except UnsignedMetadataError:
# TODO: Add missing sanity check - new root must have at least 1
# and only valid signature - use `get_verification_status` (#367)
self.write_repository_settings("ROOT_SIGNING", new_root.to_dict())
return self._task_result(
TaskName.METADATA_UPDATE,
True,
{
"message": "Metadata Update Processed",
"role": Root.type,
"update": (
f"Root v{new_root.signed.version} is "
"pending signatures"
),
},
)

except (
ValueError,
UnsignedMetadataError,
TypeError,
BadVersionNumberError,
RepositoryError,
Expand All @@ -1146,6 +1163,16 @@ def _root_metadata_update(
},
)

self._root_metadata_update_finalize(current_root, new_root)
return self._task_result(
TaskName.METADATA_UPDATE,
True,
{"message": "Metadata Update Processed", "role": Root.type},
)

def _root_metadata_update_finalize(
self, current_root: Metadata[Root], new_root: Metadata[Root]
) -> None:
# We always persist the new root metadata, but we cannot persist
# without verifying if the online key is rotated to avoid a mismatch
# with the rest of the roles using the online key.
Expand Down Expand Up @@ -1185,12 +1212,6 @@ def _root_metadata_update(
f"({self._timeout} seconds)"
)

return self._task_result(
TaskName.METADATA_UPDATE,
True,
{"message": "Metadata Update Processed", "role": Root.type},
)

def metadata_update(
self,
payload: Dict[Literal["metadata"], Dict[Literal[Root.type], Any]],
Expand Down Expand Up @@ -1262,20 +1283,24 @@ def metadata_rotation(
return self.metadata_update(payload, update_state)

@staticmethod
def _validate_signature(metadata: Metadata, signature: Signature) -> bool:
def _validate_signature(
metadata: Metadata,
signature: Signature,
delegator: Optional[Metadata] = None,
) -> bool:
"""
Validate signature over metadata using appropriate delegator.

NOTE: In "metadata update" signing event, the public key and
authorization info is retrieved from "trusted root"

Validate signature over metadata using appropriate delegator.
If no delegator is passed, the metadata itself is used as delegator.
"""
if delegator is None:
delegator = metadata

keyid = signature.keyid
if keyid not in metadata.signed.roles[Root.type].keyids:
if keyid not in delegator.signed.roles[Root.type].keyids:
logging.info(f"signature '{keyid}' not authorized")
return False

key = metadata.signed.keys.get(signature.keyid)
key = delegator.signed.keys.get(signature.keyid)
if not key:
logging.info(f"no key for signature '{keyid}'")
return False
Expand All @@ -1292,25 +1317,18 @@ def _validate_signature(metadata: Metadata, signature: Signature) -> bool:
return True

@staticmethod
def _validate_threshold(metadata: Metadata) -> bool:
def _validate_threshold(
metadata: Metadata, delegator: Optional[Metadata] = None
) -> bool:
"""
Validate signature threshold using appropriate delegator(s).

NOTE: In "metadata update" signing event, the threshold for:
- root is validated with the passed metadata AND the trusted root;
- top-level targets is validated with the trusted root;
- delegated targets is validated with the delegating targets;
as delegator.
If no delegator is passed, the metadata itself is used as delegator.
"""
if delegator is None:
delegator = metadata

try:
# TODO: `verify_delegate` does not tell us if there are any
# superfluous valid or invalid signatures. Is this something we
# want to know, e.g. to detect mistakes? To detect superfluous
# signatures if verify_delegate succeeds, would be easy: `assert
# len(signatures) == threshold`. Anything, else would require a
# custom `verify_delegate` function.
metadata.verify_delegate(Root.type, metadata)
delegator.verify_delegate(Root.type, metadata)

except UnsignedMetadataError as e:
logging.info(e)
Expand All @@ -1327,100 +1345,95 @@ def sign_metadata(
) -> Dict[str, Any]:
"""Add signature to metadata for pending signing event.

Add signature (from payload) to cached role metadata (from settings)
for the role that matches the passed rolename (from payload), if a
signing event exists for that role, and the signature is valid.
Add signature (from payload) to cached root metadata (from settings),
if a signing event exists, and the signature is valid.

Signing event types are 'bootstrap' or 'metadata update'.

If the signature threshold is reached, the signing event is finalized.
If the signature threshold is reached, the signing event is finalized,
otherwise it remains in pending state.
"""

** Signing event types (and details) **
def _result(status, error=None, bootstrap=None, update=None):
details = {}
if status:
details["message"] = "Signature Processed"
else:
details["message"] = "Signature Failed"
if error:
details["error"] = error
elif bootstrap:
details["bootstrap"] = bootstrap
elif update:
details["update"] = update
kairoaraujo marked this conversation as resolved.
Show resolved Hide resolved

BOOTSTRAP: Only root metadata can be updated in this event. To verify
the passed signature, the keys and threshold are read from the root
metadata to be updated itself. If the threshold is reached, the
bootstrap process is finalized.
return self._task_result(TaskName.SIGN_METADATA, status, details)

METADATA UPDATE: Root, targets or delegated targets metadata can be
updated in this event. Depending on the metadata type, the authorized
public keys and threshold are read from different delegating metadata.
"""
signature = Signature.from_dict(payload["signature"])
rolename = payload["role"]
signature_dict = payload["signature"]

# Assert pending signing event
metadata_dict = self._settings.get_fresh(f"{rolename.upper()}_SIGNING")
if metadata_dict is None:
return self._task_result(
TaskName.SIGN_METADATA,
False,
{
"message": "Signature Failed",
"error": f"No signatures pending for {rolename}",
},
)
# Assert requested metadata type is root
if rolename != Root.type:
msg = f"Expected '{Root.type}', got '{rolename}'"
return _result(False, error=msg)

# Assert signing event type (currently bootstrap only)
# TODO: repository-service-tuf/repository-service-tuf-worker#336
bootstrap_state = self._settings.get_fresh("BOOTSTRAP")
if "signing" not in bootstrap_state:
return self._task_result(
TaskName.SIGN_METADATA,
False,
{
"message": "Signature Failed",
"error": "No bootstrap available for signing",
},
)
# Assert pending signing event exists
metadata_dict = self._settings.get_fresh("ROOT_SIGNING")
kairoaraujo marked this conversation as resolved.
Show resolved Hide resolved
if metadata_dict is None:
msg = "No signatures pending for root"
lukpueh marked this conversation as resolved.
Show resolved Hide resolved
return _result(False, error=msg)

# Assert metadata type is allowed for signing event
# Assert metadata type is root
lukpueh marked this conversation as resolved.
Show resolved Hide resolved
root = Metadata.from_dict(metadata_dict)
lukpueh marked this conversation as resolved.
Show resolved Hide resolved
if not isinstance(root.signed, Root):
return self._task_result(
TaskName.SIGN_METADATA,
False,
{
"message": "Signature Failed",
"error": f"Role {rolename} has wrong type",
},
)
msg = f"Expected 'root', got '{root.signed.type}'"
lukpueh marked this conversation as resolved.
Show resolved Hide resolved
return _result(False, error=msg)

# Assert passed signature is valid for metadata
signature = Signature.from_dict(signature_dict)
if not self._validate_signature(root, signature):
return self._task_result(
TaskName.SIGN_METADATA,
False,
{
"message": "Signature Failed",
"error": "Invalid signature",
},
)
# If it isn't a "bootstrap" signing event, it must be "update metadata"
bootstrap_state = self._settings.get_fresh("BOOTSTRAP")
if "signing" in bootstrap_state:
# Signature and threshold of initial root can only self-validate,
# there is no "trusted root" at bootstrap time yet.
if not self._validate_signature(root, signature):
return _result(False, error="Invalid signature")

root.signatures[signature.keyid] = signature
if not self._validate_threshold(root):
self.write_repository_settings("ROOT_SIGNING", root.to_dict())
MVrachev marked this conversation as resolved.
Show resolved Hide resolved
msg = f"Root v{root.signed.version} is pending signatures"
MVrachev marked this conversation as resolved.
Show resolved Hide resolved
return _result(True, bootstrap=msg)

bootstrap_task_id = bootstrap_state.split("signing-")[1]
self._bootstrap_finalize(root, bootstrap_task_id)
return _result(True, bootstrap="Bootstrap Finished")

# Check threshold with new signature included
root.signatures[signature.keyid] = signature
if not self._validate_threshold(root):
self.write_repository_settings("ROOT_SIGNING", root.to_dict())
root_version = root.signed.version
return self._task_result(
TaskName.SIGN_METADATA,
True,
{
"message": "Signature Processed",
"bootstrap": f"Root v{root_version} is pending signatures",
},
else:
# We need the "trusted root" when updating to a new root:
# - signature could come from a key, which is only in the trusted
# root, OR from a key, which is only in the new root
# - threshold must validate with the threshold of keys as defined
# in the trusted root AND as defined in the new root
trusted_root = self._storage_backend.get("root")
is_valid_trusted = self._validate_signature(
root, signature, trusted_root
)

# Finalize bootstrap
bootstrap_task_id = bootstrap_state.split("signing-")[1]
self._bootstrap_finalize(root, bootstrap_task_id)
return self._task_result(
TaskName.SIGN_METADATA,
True,
{
"message": "Signature Processed",
"bootstrap": "Bootstrap Finished",
},
)
is_valid_new = self._validate_signature(root, signature)

if not (is_valid_trusted or is_valid_new):
return _result(False, error="Invalid signature")

root.signatures[signature.keyid] = signature
trusted_threshold = self._validate_threshold(root, trusted_root)
new_threshold = self._validate_threshold(root)
if not (trusted_threshold and new_threshold):
self.write_repository_settings("ROOT_SIGNING", root.to_dict())
msg = f"Root v{root.signed.version} is pending signatures"
return _result(True, update=msg)

# Threshold reached -> finalize event
self._root_metadata_update_finalize(trusted_root, root)
self.write_repository_settings("ROOT_SIGNING", None)
return _result(True, update="Metadata update finished")

def delete_sign_metadata(
self,
Expand Down
Loading