diff --git a/repository_service_tuf/cli/admin/metadata.py b/repository_service_tuf/cli/admin/metadata.py index 25c8beae..39091a0b 100644 --- a/repository_service_tuf/cli/admin/metadata.py +++ b/repository_service_tuf/cli/admin/metadata.py @@ -2,6 +2,7 @@ # # SPDX-License-Identifier: MIT import copy +import json import sys from datetime import datetime, timedelta from typing import Any, Dict, List, Optional @@ -112,15 +113,16 @@ # Metadata Signing Metadata signing allows sending signature of pending Repository Service for TUF -(RSTUF) role metadata to an existing RSTUF API deployment. +(RSTUF) role metadata to an existing RSTUF API deployment. Or saving the +signature locally. The Metadata Signing does the following steps: -- retrieves the metadata pending for signatures from RSTUF API +- retrieves the metadata pending for signatures from RSTUF API or local file - selects the metadata role for signing - loads the private key for signing After loading the key it will sign the role metadata and send the request to -the RSTUF API with the signature. +the RSTUF API with the signature, or save the signature locally. """ @@ -599,26 +601,32 @@ def update( def _get_pending_roles( - settings: Any, api_server: Optional[str] + settings: Any, + api_server: Optional[str], + offline_metadata: Optional[click.File], ) -> Dict[str, Any]: - if api_server: - settings.SERVER = api_server + response_data: Dict[str, Any] + if offline_metadata: + response_data = json.load(offline_metadata) # type: ignore + else: + if api_server: + settings.SERVER = api_server - if settings.get("SERVER") is None: - api_server = prompt.Prompt.ask("\n[cyan]API[/] URL address") - settings.SERVER = api_server + if settings.get("SERVER") is None: + api_server = prompt.Prompt.ask("\n[cyan]API[/] URL address") + settings.SERVER = api_server - response = request_server( - settings.SERVER, URL.METADATA_SIGN.value, Methods.GET - ) - if response.status_code != 200: - raise click.ClickException( - f"Failed to retrieve metadata for signing. Error: {response.text}" + response = request_server( + settings.SERVER, URL.METADATA_SIGN.value, Methods.GET ) + if response.status_code != 200: + raise click.ClickException( + f"Failed to fetch metadata for signing. Error: {response.text}" + ) - response_data: Dict[str, Any] = response.json().get("data") - if response_data is None: - raise click.ClickException(response.text) + response_data = response.json().get("data") + if response_data is None: + raise click.ClickException(response.text) pending_roles: Dict[str, Any] = response_data.get("metadata", {}) if len(pending_roles) == 0: @@ -689,8 +697,14 @@ def _sign_metadata(role_info: MetadataInfo, rstuf_key: RSTUFKey) -> Signature: required=False, is_flag=True, ) +@click.argument("offline_metadata", required=False, type=click.File("r")) @click.pass_context -def sign(context, api_server: Optional[str], delete: Optional[bool]) -> None: +def sign( + context, + api_server: Optional[str], + delete: Optional[bool], + offline_metadata: Optional[click.File], +) -> None: """ Start metadata signature. """ @@ -698,7 +712,7 @@ def sign(context, api_server: Optional[str], delete: Optional[bool]) -> None: settings = context.obj["settings"] - pending_roles = _get_pending_roles(settings, api_server) + pending_roles = _get_pending_roles(settings, api_server, offline_metadata) role_info: MetadataInfo rolename: str @@ -743,15 +757,22 @@ def sign(context, api_server: Optional[str], delete: Optional[bool]) -> None: rstuf_key = _get_signing_key(role_info) signature = _sign_metadata(role_info, rstuf_key) - payload = {"role": rolename, "signature": signature.to_dict()} - console.print("\nSending signature") - task_id = send_payload( - settings, - URL.METADATA_SIGN.value, - payload, - "Metadata sign accepted.", - "Metadata sign", - ) - task_status(task_id, settings, "Metadata sign status:") + + if offline_metadata: + with open("signature.json", "w") as write_signature: + json.dump(payload, write_signature) + + else: + console.print("\nSending signature") + task_id = send_payload( + settings, + URL.METADATA_SIGN.value, + payload, + "Metadata sign accepted.", + "Metadata sign", + ) + task_status(task_id, settings, "Metadata sign status:") + + console.print_json(json.dumps(payload)) console.print("\nMetadata Signed! 🔑\n") diff --git a/tests/unit/cli/admin/test_metadata.py b/tests/unit/cli/admin/test_metadata.py index 93e36577..d0f8774a 100644 --- a/tests/unit/cli/admin/test_metadata.py +++ b/tests/unit/cli/admin/test_metadata.py @@ -1210,3 +1210,30 @@ def test_metadata_sign_delete(self, client, test_context): "Signing process status: ", ) ] + + def test_metadata_sign_offline(self, client, test_context): + input_step = [ + "root", # Choose a metadata to sign [root] + "y", # Do you still want to sign root? [y] + "Jimi Hendrix", # Choose a private key to load [Jimi Hendrix] + "", # Choose Jimi Hendrix key type [ed25519/ecdsa/rsa] + "tests/files/key_storage/JimiHendrix.key", # Enter the Jimi Hendrix`s private key path # noqa + "strongPass", # Enter the Jimi Hendrix`s private key password + ] + with open("tests/files/das-root.json", "r") as file: + das_root = file.read() + fake_offline_md = {"metadata": json.loads(das_root)} + json.load = pretend.call_recorder(lambda *a: fake_offline_md) + json.dump = pretend.call_recorder(lambda *a: "fake_write") + test_result = client.invoke( + metadata.sign, + ["tests/files/das-root.json"], + input="\n".join(input_step), + obj=test_context, + catch_exceptions=False, + ) + + assert test_result.exit_code == 0, test_result.output + assert "Metadata Signed! 🔑" in test_result.output + assert "SIGNING KEYS" in test_result.output + assert "PENDING KEYS" in test_result.output