Skip to content

Commit

Permalink
chore: Add commands & clean websock code
Browse files Browse the repository at this point in the history
Signed-off-by: Colton Wolkins (Indicio work address) <[email protected]>
  • Loading branch information
TheTechmage authored and dbluhm committed Apr 26, 2024
1 parent c9df710 commit 5161af4
Showing 1 changed file with 40 additions and 10 deletions.
50 changes: 40 additions & 10 deletions didcomm_messaging/quickstart.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,16 +357,17 @@ async def fetch_relayed_messages(
# soon as they arrive instead of waiting to be picked up with the pickup
# protocol.

# TODO add comments to the below code segments and tidy it up

# Because the quickstart can be used without using websockets or connecting to
# an inbound relay/mediator. If we don't have the necessary libraries, we can
# just raise a warning about the lack of support.
try:
import websockets
import asyncio
except ImportError:
import warnings

warnings.warn(
"Missing websockets or asyncio import, live-delivery will be unavailable",
"Missing websockets or asyncio import, live-delivery support will be unavailable",
ImportWarning,
)

Expand All @@ -379,25 +380,44 @@ async def handle_websocket(
callback: Callable[[Dict[str, Any]], Awaitable[None]] = _message_callback,
):
"""Loop over messages received and process them."""

# Using async with on the websocket allows us to wait for then loop over
# new messages that come in over the websocket.
async with mediator_websocket as websocket:
# Send our live-delivery request to the inbound relay.
await websocket.send(live_delivery_message)
LOG.debug("Connected to WebSocket and requested Live Delivery!")
LOG.debug("Connected to WebSocket and requested Live Delivery.")

# Loop over all incoming messages
while True:
message = await websocket.recv()
LOG.debug("Received message over websocket")

try:
# Unpack/Decrypt the message, decode it, and load the JSON into
# a native python object.
unpacked_message, metadata = await dmp.packaging.unpack(message)
msg = unpacked_message.decode()
msg = json.loads(msg)
LOG.debug("Received websocket message %s", msg["type"])

# If the message is not from the relay, process it via the callback
if msg["from"] != relay_did:
await callback(msg)

# NOTE: While out of scope for this example, when implementing
# your own handler, it's advisable to handle messages that come
# from the relay. For example: the relay will respond to the
# live-delivery call with a pickup protocol status message. The
# status message may contain a message count that is greater
# than zero. If there are messages that are waiting, it is
# advisable to request the messages and process them.

except Exception as err:
LOG.error("Error encountered while decrypting websocket message")
LOG.exception(err)

# Clean up the websocket
await websocket.close()


Expand All @@ -410,6 +430,7 @@ async def activate_websocket(
) -> Union[Awaitable[None], "asyncio.Task"]:
"""Connect to a websocket and request message forwarding."""

# Construct a message to request live-delivery and pack it
message = {
"type": "https://didcomm.org/messagepickup/3.0/live-delivery-change",
"id": str(uuid.uuid4()),
Expand All @@ -424,10 +445,14 @@ async def activate_websocket(
to=relay_did,
frm=my_did,
)

# Get the websocket endpoint from the packed message. This will pull out
# the endpoint that is closest to us, the sender.
endpoint = packed.get_endpoint("ws")
LOG.info("Relay Websocket Address: %s", endpoint)
LOG.debug("Relay Websocket Address: %s", endpoint)

if endpoint:
LOG.info("Found Relay websocket, connecting")
# Connect to the websocket and prepare our handler function
mediator_websocket = websockets.connect(uri=endpoint)
websocket_handler = handle_websocket(
dmp,
Expand All @@ -436,6 +461,9 @@ async def activate_websocket(
packed.message,
callback,
)

# Create an asyncio task upon request to run in another async "thread",
# otherwise we'll just return the function that runs in the thread.
if create_task:
return asyncio.create_task(websocket_handler)
else:
Expand All @@ -450,17 +478,19 @@ async def websocket_loop(
) -> None:
"""Run the websocket handler in a task and reconnect on failure."""

# Helper method to create the async task
async def create_task():
return await activate_websocket(dmp, my_did, relay_did, callback, True)

# Create the initial task
mediator_websocket_proc = await create_task()

# Check to make sure the thread is still running every 5 seconds (to give
# the OS some breathing room). Should the thread "crash" or exit for any
# reason, log any exceptions and attempt to restart it.
while True:
await asyncio.sleep(5)
if mediator_websocket_proc.done():
LOG.exception(mediator_websocket_proc.exception())
try:
LOG.error("Websocket died, re-establishing connection!")
except Exception:
pass
LOG.error("Websocket died, re-establishing connection!")
mediator_websocket_proc = await create_task()

0 comments on commit 5161af4

Please sign in to comment.