From a922d17aa61f88f158aee50894ae0f9ce8801dd3 Mon Sep 17 00:00:00 2001 From: "Colton Wolkins (Indicio work address)" Date: Mon, 1 Apr 2024 10:53:44 -0600 Subject: [PATCH] feat: Add did:web resolver support Signed-off-by: Colton Wolkins (Indicio work address) --- didcomm_messaging/resolver/web.py | 117 ++++++++++++++++++++++++++++++ tests/conftest.py | 24 ++++++ tests/test_didweb.py | 106 +++++++++++++++++++++++++++ 3 files changed, 247 insertions(+) create mode 100644 didcomm_messaging/resolver/web.py create mode 100644 tests/conftest.py create mode 100644 tests/test_didweb.py diff --git a/didcomm_messaging/resolver/web.py b/didcomm_messaging/resolver/web.py new file mode 100644 index 0000000..76f5400 --- /dev/null +++ b/didcomm_messaging/resolver/web.py @@ -0,0 +1,117 @@ +"""did:web resolver. + +Resolve did:web style dids to a did document. did:web spec: +https://w3c-ccg.github.io/did-method-web/ +""" + +from . import DIDResolver, DIDNotFound, DIDResolutionError +from pydid import DID +from urllib.parse import urlparse +from datetime import datetime, timedelta +import urllib.request as url_request +import re +import json +import urllib + +domain_regex = ( + r"((?!-))(xn--)?[a-z0-9][a-z0-9-_]{0,61}[a-z0-9]{0,1}" + r"\.(xn--)?([a-z0-9\._-]{1,61}|[a-z0-9-]{1,30})" + r"(%3[aA]\d+)?" # Port + r"(:[a-zA-Z]+)*" # Path +) +did_web_pattern = re.compile(rf"^did:web:{domain_regex}$") +cache = {} +TIME_TO_CACHE = 1800 # 30 minutes + + +class DIDWeb(DIDResolver): + """Utility functions for building and interacting with did:web.""" + + async def resolve(self, did: str) -> dict: + """Resolve a did:web to a did document via http request.""" + + # Check to see if we've seen the did recently + if did in cache: + if cache[did]["timestamp"] > datetime.now() + timedelta( + seconds=-TIME_TO_CACHE + ): + return cache[did]["doc"] + else: + del cache[did] + + uri = DIDWeb._did_to_uri(did) + headers = { + "User-Agent": "DIDCommRelay/1.0", + } + request = url_request.Request(url=uri, method="GET", headers=headers) + try: + with url_request.urlopen(request) as response: + doc = json.loads(response.read().decode()) + cache[did] = { + "timestamp": datetime.now(), + "doc": doc, + } + return doc + except urllib.error.HTTPError as e: + if e.code == 404: + raise DIDNotFound( + f"The did:web {did} returned a 404 not found while resolving" + ) + else: + raise DIDResolutionError( + f"Unknown server error ({e.code}) while resolving did:web: {did}" + ) + except json.decoder.JSONDecodeError as e: + msg = str(e) + raise DIDNotFound(f"The did:web {did} returned invalid JSON {msg}") + except Exception as e: + raise DIDResolutionError("Failed to fetch did document") from e + + @staticmethod + def _did_to_uri(did: str) -> str: + # Split the did by it's segments + did_segments = did.split(":") + + # Get the hostname & port + hostname = did_segments[2].lower() + hostname = hostname.replace("%3a", ":") + + # Resolve the path portion of the DID, if there is no path, default to + # a .well-known address + path = ".well-known" + if len(did_segments) > 3: + path = "/".join(did_segments[3:]) + + # Assemble the URI + did_uri = f"https://{hostname}/{path}/did.json" + + return did_uri + + async def is_resolvable(self, did: str) -> bool: + """Determine if the did is a valid did:web did that can be resolved.""" + if DID.is_valid(did) and did_web_pattern.match(did): + return True + return False + + @staticmethod + def did_from_url(url: str) -> DID: + """Convert a URL into a did:web did.""" + + # Make sure that the URL starts with a scheme + if not url.startswith("http"): + url = f"https://{url}" + + # Parse it out to we can grab pieces + parsed_url = urlparse(url) + + # Assemble the domain portion of the DID + did = "did:web:%s" % parsed_url.netloc.replace(":", "%3A") + + # Cleanup the path + path = parsed_url.path.replace(".well-known/did.json", "") + path = path.replace("/did.json", "") + + # Add the path portion of the did + if len(path) > 1: + did += path.replace("/", ":") + return did diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..015a009 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,24 @@ +import pytest + + +def pytest_addoption(parser): + parser.addoption( + "--runexternal", + action="store_true", + default=False, + help="run tests that make external requests", + ) + + +def pytest_configure(config): + config.addinivalue_line("markers", "external_fetch: mark test as slow to run") + + +def pytest_collection_modifyitems(config, items): + if config.getoption("--runexternal"): + # --runslow given in cli: do not skip slow tests + return + skip_external = pytest.mark.skip(reason="need --runexternal option to run") + for item in items: + if "external_fetch" in item.keywords: + item.add_marker(skip_external) diff --git a/tests/test_didweb.py b/tests/test_didweb.py new file mode 100644 index 0000000..906d2dd --- /dev/null +++ b/tests/test_didweb.py @@ -0,0 +1,106 @@ +import pytest + + +from didcomm_messaging.resolver.web import DIDWeb + +DIDWEB = "did:web:example.com" +DIDWEB_URI = "https://example.com/.well-known/did.json" +DIDWEB_COMPLEX = "did:web:example.com%3A4443:DIDs:alice:relay" +DIDWEB_COMPLEX_URI = "https://example.com:4443/DIDs/alice/relay/did.json" + + +@pytest.mark.asyncio +async def test_didweb_from_didurl_domain(): + did = DIDWeb.did_from_url("example.com") + assert did + assert did == DIDWEB + + +@pytest.mark.asyncio +async def test_didweb_from_didurl_schema_and_domain(): + did = DIDWeb.did_from_url("https://example.com") + assert did + assert did == DIDWEB + + +@pytest.mark.asyncio +async def test_didweb_from_didurl_schema_and_domain_slash(): + did = DIDWeb.did_from_url("https://example.com/") + assert did + assert did == DIDWEB + + +@pytest.mark.asyncio +async def test_didweb_from_didurl_schema_and_domain_path(): + did = DIDWeb.did_from_url("https://example.com/did.json") + assert did + assert did == DIDWEB + + +@pytest.mark.asyncio +async def test_didweb_from_didurl_schema_and_domain_wellknown(): + did = DIDWeb.did_from_url("https://example.com/.well-known/did.json") + assert did + assert did == DIDWEB + + +@pytest.mark.asyncio +async def test_didweb_from_didurl_schema_and_domain_port_wellknown(): + did = DIDWeb.did_from_url("https://example.com:443/.well-known/did.json") + assert did + assert did == DIDWEB + "%3A443" + + +@pytest.mark.asyncio +async def test_didweb_from_didurl_schema_and_complex_domain_path(): + did = DIDWeb.did_from_url("https://example.com:4443/DIDs/alice/relay/did.json") + assert did + assert did == DIDWEB_COMPLEX + + +@pytest.mark.asyncio +async def test_didweb_to_url(): + uri = DIDWeb._did_to_uri(DIDWEB) + assert uri + assert uri == DIDWEB_URI + + +@pytest.mark.asyncio +async def test_didweb_to_url_complex(): + uri = DIDWeb._did_to_uri(DIDWEB_COMPLEX) + assert uri + assert uri == DIDWEB_COMPLEX_URI + + +@pytest.mark.asyncio +async def test_didweb_is_resolvable(): + resolver = DIDWeb() + resolvable = await resolver.is_resolvable(DIDWEB) + assert resolvable + resolvable_complex = await resolver.is_resolvable(DIDWEB_COMPLEX) + assert resolvable_complex + + +@pytest.mark.external_fetch +@pytest.mark.asyncio +async def test_didweb_fetch(): + did_web = "did:web:colton.wolkins.net" + resolver = DIDWeb() + uri = await resolver.resolve(did_web) + print(uri) + assert uri + assert isinstance(uri, dict) + + +@pytest.mark.external_fetch +@pytest.mark.asyncio +async def test_didweb_double_fetch(): + did_web = "did:web:colton.wolkins.net" + resolver = DIDWeb() + uri = await resolver.resolve(did_web) + print(uri) + assert uri + assert isinstance(uri, dict) + uri = await resolver.resolve(did_web) + assert uri + assert isinstance(uri, dict)