Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DIDComm v1 interfaces like those available for v2 #39

Merged
merged 8 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 13 additions & 163 deletions didcomm_messaging/__init__.py
Original file line number Diff line number Diff line change
@@ -1,170 +1,20 @@
"""DIDComm Messaging."""

from dataclasses import dataclass
import json
from typing import Generic, Optional, List

from pydid.service import DIDCommV2Service

from didcomm_messaging.crypto import CryptoService, SecretsManager, P, S
from didcomm_messaging.crypto import CryptoService, P, S, SecretsManager
from didcomm_messaging.messaging import DIDCommMessaging, DIDCommMessagingService
from didcomm_messaging.packaging import PackagingService
from didcomm_messaging.resolver import DIDResolver
from didcomm_messaging.routing import RoutingService


@dataclass
class PackResult:
"""Result of packing a message."""

message: bytes
target_services: List[DIDCommV2Service]

def get_endpoint(self, protocol: str) -> str:
"""Get the first matching endpoint to send the message to."""
return self.get_service(protocol).service_endpoint.uri

def get_service(self, protocol: str) -> DIDCommV2Service:
"""Get the first matching service to send the message to."""
return self.filter_services_by_protocol(protocol)[0]

def filter_services_by_protocol(self, protocol: str) -> List[DIDCommV2Service]:
"""Get all services that start with a specific uri protocol."""
return [
service
for service in self.target_services
if service.service_endpoint.uri.startswith(protocol)
]


@dataclass
class UnpackResult:
"""Result of unpacking a message."""

message: dict
encrytped: bool
authenticated: bool
recipient_kid: str
sender_kid: Optional[str] = None


class DIDCommMessagingService(Generic[P, S]):
"""Main entrypoint for DIDComm Messaging."""

def service_to_target(self, service: DIDCommV2Service) -> str:
"""Convert a service to a target uri.

This is a very simple implementation that just returns the first one.
"""
if isinstance(service.service_endpoint, list):
service_endpoint = service.service_endpoint[0]
else:
service_endpoint = service.service_endpoint

return service_endpoint.uri

async def pack(
self,
crypto: CryptoService[P, S],
resolver: DIDResolver,
secrets: SecretsManager[S],
packaging: PackagingService[P, S],
routing: RoutingService,
message: dict,
to: str,
frm: Optional[str] = None,
**options,
):
"""Pack a message."""
# TODO crypto layer permits packing to multiple recipients; should we as well?

encoded_message = await packaging.pack(
crypto,
resolver,
secrets,
json.dumps(message).encode(),
[to],
frm,
**options,
)

forward, services = await routing.prepare_forward(
crypto, packaging, resolver, secrets, to, encoded_message
)
return PackResult(forward, services)

async def unpack(
self,
crypto: CryptoService[P, S],
resolver: DIDResolver,
secrets: SecretsManager[S],
packaging: PackagingService[P, S],
encoded_message: bytes,
**options,
) -> UnpackResult:
"""Unpack a message."""
unpacked, metadata = await packaging.unpack(
crypto, resolver, secrets, encoded_message, **options
)
message = json.loads(unpacked.decode())
return UnpackResult(
message,
encrytped=bool(metadata.method),
authenticated=bool(metadata.sender_kid),
recipient_kid=metadata.recip_key.kid,
sender_kid=metadata.sender_kid,
)


class DIDCommMessaging(Generic[P, S]):
"""Main entrypoint for DIDComm Messaging."""

def __init__(
self,
crypto: CryptoService[P, S],
secrets: SecretsManager[S],
resolver: DIDResolver,
packaging: PackagingService[P, S],
routing: RoutingService,
):
"""Initialize the DIDComm Messaging service."""
self.crypto = crypto
self.secrets = secrets
self.resolver = resolver
self.packaging = packaging
self.routing = routing
self.dmp = DIDCommMessagingService()

async def pack(
self,
message: dict,
to: str,
frm: Optional[str] = None,
**options,
) -> PackResult:
"""Pack a message."""
return await self.dmp.pack(
self.crypto,
self.resolver,
self.secrets,
self.packaging,
self.routing,
message,
to,
frm,
**options,
)

async def unpack(
self,
encoded_message: bytes,
**options,
) -> UnpackResult:
"""Unpack a message."""
return await self.dmp.unpack(
self.crypto,
self.resolver,
self.secrets,
self.packaging,
encoded_message,
**options,
)
__all__ = [
"CryptoService",
"DIDCommMessaging",
"DIDCommMessagingService",
"DIDResolver",
"P",
"PackagingService",
"RoutingService",
"S",
"SecretsManager",
]
10 changes: 10 additions & 0 deletions didcomm_messaging/legacy/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
"""Legacy DIDComm v1 Interfaces.

These components are intended to provide a similar structure to the DIDComm v2
interfaces provided by this library. While the community is transitioning from
DIDComm v1 to v2, having a consistent interface to interact with will help
implementers to support both versions until the transition is complete.

It is expected that a future version of this library will eventually remove
these interfaces.
"""
135 changes: 135 additions & 0 deletions didcomm_messaging/legacy/askar.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
"""LegacyCryptoService implementation for askar."""

from collections import OrderedDict
from typing import Optional, Sequence, Tuple, cast

from base58 import b58decode
from pydid import VerificationMethod

from didcomm_messaging.crypto.jwe import JweBuilder, JweEnvelope, JweRecipient
from didcomm_messaging.legacy.base import (
LegacyCryptoService,
LegacyUnpackResult,
RecipData,
)

try:
from aries_askar import Key, KeyAlg, crypto_box
from aries_askar.bindings import key_get_secret_bytes
from didcomm_messaging.crypto.backend.askar import AskarKey, AskarSecretKey
except ImportError:
raise ImportError("Legacy Askar backend requires the 'askar' extra to be installed")


class AskarLegacyCryptoService(LegacyCryptoService[AskarKey, AskarSecretKey]):
"""Legacy crypto service implementation for askar."""

def kid_to_public_key(self, kid: str) -> AskarKey:
"""Get a public key from a kid.

In DIDComm v1, kids are the base58 encoded keys.
"""
return AskarKey(Key.from_public_bytes(KeyAlg.ED25519, b58decode(kid)), kid)

@classmethod
def verification_method_to_public_key(cls, vm: VerificationMethod) -> AskarKey:
"""Convert a verification method to a public key."""
return AskarKey.from_verification_method(vm)

async def pack_message(
self,
to_verkeys: Sequence[AskarKey],
from_key: Optional[AskarSecretKey],
message: bytes,
) -> JweEnvelope:
"""Encode a message using the DIDComm v1 'pack' algorithm."""
builder = JweBuilder(
with_protected_recipients=True, with_flatten_recipients=False
)
cek = Key.generate(KeyAlg.C20P)
# avoid converting to bytes object: this way the only copy is zeroed afterward
# tell type checking it's bytes to make it happy
cek_b = cast(bytes, key_get_secret_bytes(cek._handle))
sender_vk = from_key.kid if from_key else None
sender_xk = from_key.key.convert_key(KeyAlg.X25519) if from_key else None

for target_vk in to_verkeys:
target_xk = target_vk.key.convert_key(KeyAlg.X25519)
if sender_vk and sender_xk:
enc_sender = crypto_box.crypto_box_seal(target_xk, sender_vk)
nonce = crypto_box.random_nonce()
enc_cek = crypto_box.crypto_box(target_xk, sender_xk, cek_b, nonce)
builder.add_recipient(
JweRecipient(
encrypted_key=enc_cek,
header=OrderedDict(
[
("kid", target_vk.kid),
("sender", self.b64url.encode(enc_sender)),
("iv", self.b64url.encode(nonce)),
]
),
)
)
else:
enc_sender = None
nonce = None
enc_cek = crypto_box.crypto_box_seal(target_xk, cek_b)
builder.add_recipient(
JweRecipient(encrypted_key=enc_cek, header={"kid": target_vk.kid})
)
builder.set_protected(
OrderedDict(
[
("enc", "xchacha20poly1305_ietf"),
("typ", "JWM/1.0"),
("alg", "Authcrypt" if from_key else "Anoncrypt"),
]
),
)
enc = cek.aead_encrypt(message, aad=builder.protected_bytes)
ciphertext, tag, nonce = enc.parts
builder.set_payload(ciphertext, nonce, tag)
return builder.build()

async def unpack_message(
self,
wrapper: JweEnvelope,
recip_key: AskarSecretKey,
recip_data: RecipData,
) -> LegacyUnpackResult:
"""Decode a message using the DIDComm v1 'unpack' algorithm."""
payload_key, sender_vk = self._extract_payload_key(recip_key.key, recip_data)

cek = Key.from_secret_bytes(KeyAlg.C20P, payload_key)
message = cek.aead_decrypt(
wrapper.ciphertext,
nonce=wrapper.iv,
tag=wrapper.tag,
aad=wrapper.protected_b64,
)
return LegacyUnpackResult(message, recip_key.kid, sender_vk)

def _extract_payload_key(
self, recip_key: Key, recip_data: RecipData
) -> Tuple[bytes, Optional[str]]:
"""Extract the payload key from pack recipient details.

Returns: A tuple of the CEK and sender verkey
"""
recip_x = recip_key.convert_key(KeyAlg.X25519)

if recip_data.nonce and recip_data.enc_sender:
sender_vk = crypto_box.crypto_box_seal_open(
recip_x, recip_data.enc_sender
).decode("utf-8")
sender_x = Key.from_public_bytes(
KeyAlg.ED25519, b58decode(sender_vk)
).convert_key(KeyAlg.X25519)
cek = crypto_box.crypto_box_open(
recip_x, sender_x, recip_data.enc_cek, recip_data.nonce
)
else:
sender_vk = None
cek = crypto_box.crypto_box_seal_open(recip_x, recip_data.enc_cek)
return cek, sender_vk
56 changes: 56 additions & 0 deletions didcomm_messaging/legacy/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
"""DIDComm v1 Base Services."""

from abc import ABC, abstractmethod
from typing import Generic, NamedTuple, Optional, Sequence

from pydid import VerificationMethod
from didcomm_messaging.crypto.base import P, S
from didcomm_messaging.crypto.jwe import JweEnvelope
from didcomm_messaging.multiformats.multibase import Base64UrlEncoder


class RecipData(NamedTuple):
"""Recipient metadata."""

kid: str
enc_sender: Optional[bytes]
nonce: Optional[bytes]
enc_cek: bytes


class LegacyUnpackResult(NamedTuple):
"""Result of unpacking."""

message: bytes
recip: str
sender: Optional[str]


class LegacyCryptoService(ABC, Generic[P, S]):
"""CryptoService interface for DIDComm v1."""

b64url = Base64UrlEncoder()

@abstractmethod
def kid_to_public_key(self, kid: str) -> P:
"""Get a public key from a kid.

In DIDComm v1, kids are the base58 encoded keys.
"""

@classmethod
@abstractmethod
def verification_method_to_public_key(cls, vm: VerificationMethod) -> P:
"""Convert a verification method to a public key."""

@abstractmethod
async def pack_message(
self, to_verkeys: Sequence[P], from_key: Optional[S], message: bytes
) -> JweEnvelope:
"""Encode a message using the DIDComm v1 'pack' algorithm."""

@abstractmethod
async def unpack_message(
self, wrapper: JweEnvelope, recip_key: S, recip_data: RecipData
) -> LegacyUnpackResult:
"""Decode a message using DIDCvomm v1 'unpack' algorithm."""
6 changes: 5 additions & 1 deletion didcomm_messaging/legacy/crypto.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
"""DIDComm v1 packing and unpacking."""
"""DIDComm v1 packing and unpacking.

This implementation is kept around for backwards compatibility. It is
likely that you should use the LegacyCryptoService interfaces instead.
"""

from collections import OrderedDict
from typing import Iterable, Optional, Sequence, Dict, Union, Tuple, cast
Expand Down
Loading