Skip to content

Commit

Permalink
Add dispatch mananging actor
Browse files Browse the repository at this point in the history
A useful actor to help control and manange another actor using
dispatches.

Signed-off-by: Mathias L. Baumann <[email protected]>
  • Loading branch information
Marenz committed Sep 24, 2024
1 parent f316833 commit de4682a
Show file tree
Hide file tree
Showing 5 changed files with 349 additions and 2 deletions.
2 changes: 1 addition & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

## New Features

<!-- Here goes the main new features and examples or instructions on how to use them -->
* We now provide the `DispatchManagingActor` class, a class to manage actors based on incoming dispatches.

## Bug Fixes

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ dependencies = [
# plugins.mkdocstrings.handlers.python.import)
"frequenz-sdk == 1.0.0-rc900",
# "frequenz-channels >= 1.1.0, < 2.0.0",
"frequenz-channels @ git+https://github.com/frequenz-floss/frequenz-channels-python.git@refs/pull/323/head",
"frequenz-channels @ git+https://github.com/frequenz-floss/frequenz-channels-python.git@v1.x.x",
# "frequenz-client-dispatch >= 0.6.0, < 0.7.0",
"frequenz-client-dispatch @ git+https://github.com/frequenz-floss/frequenz-client-dispatch-python.git@refs/pull/87/head",
]
Expand Down
5 changes: 5 additions & 0 deletions src/frequenz/dispatch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
* [Dispatcher][frequenz.dispatch.Dispatcher]: The entry point for the API.
* [Dispatch][frequenz.dispatch.Dispatch]: A dispatch type with lots of useful extra functionality.
* [DispatchManagingActor][frequenz.dispatch.DispatchManagingActor]: An actor to
manage other actors based on incoming dispatches.
* [Created][frequenz.dispatch.Created],
[Updated][frequenz.dispatch.Updated],
[Deleted][frequenz.dispatch.Deleted]: Dispatch event types.
Expand All @@ -16,6 +18,7 @@
from ._dispatch import Dispatch, RunningState
from ._dispatcher import Dispatcher, ReceiverFetcher
from ._event import Created, Deleted, DispatchEvent, Updated
from ._managing_actor import DispatchManagingActor, DispatchUpdate

__all__ = [
"Created",
Expand All @@ -26,4 +29,6 @@
"Updated",
"Dispatch",
"RunningState",
"DispatchManagingActor",
"DispatchUpdate",
]
178 changes: 178 additions & 0 deletions src/frequenz/dispatch/_managing_actor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
# License: All rights reserved
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH

"""Helper class to manage actors based on dispatches."""

import logging
from dataclasses import dataclass
from typing import Any

from frequenz.channels import Receiver, Sender
from frequenz.client.dispatch.types import ComponentSelector
from frequenz.sdk.actor import Actor

from ._dispatch import Dispatch, RunningState

_logger = logging.getLogger(__name__)


@dataclass(frozen=True, kw_only=True)
class DispatchUpdate:
"""Event emitted when the dispatch changes."""

components: ComponentSelector
"""Components to be used."""

dry_run: bool
"""Whether this is a dry run."""

options: dict[str, Any]
"""Additional options."""


class DispatchManagingActor(Actor):
"""Helper class to manage actors based on dispatches.
Example usage:
```python
import os
import asyncio
from frequenz.dispatch import Dispatcher, DispatchManagingActor, DispatchUpdate
from frequenz.client.dispatch.types import ComponentSelector
from frequenz.client.common.microgrid.components import ComponentCategory
from frequenz.channels import Receiver, Broadcast
class MyActor(Actor):
def __init__(self, updates_channel: Receiver[DispatchUpdate]):
super().__init__()
self._updates_channel = updates_channel
self._dry_run: bool
self._options : dict[str, Any]
async def _run(self) -> None:
while True:
update = await self._updates_channel.receive()
print("Received update:", update)
self.set_components(update.components)
self._dry_run = update.dry_run
self._options = update.options
def set_components(self, components: ComponentSelector) -> None:
match components:
case [int(), *_] as component_ids:
print("Dispatch: Setting components to %s", components)
case [ComponentCategory.BATTERY, *_]:
print("Dispatch: Using all battery components")
case unsupported:
print(
"Dispatch: Requested an unsupported selector %r, "
"but only component IDs or category BATTERY are supported.",
unsupported,
)
async def run():
url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
key = os.getenv("DISPATCH_API_KEY", "some-key")
microgrid_id = 1
dispatcher = Dispatcher(
microgrid_id=microgrid_id,
server_url=url,
key=key
)
# Create update channel to receive dispatch update events pre-start and mid-run
dispatch_updates_channel = Broadcast[DispatchUpdate](name="dispatch_updates_channel")
# Start actor and supporting actor, give each a config channel receiver
my_actor = MyActor(dispatch_updates_channel.new_receiver())
status_receiver = dispatcher.running_status_change.new_receiver()
dispatch_runner = DispatchManagingActor(
actor=my_actor,
dispatch_type="EXAMPLE",
running_status_receiver=status_receiver,
updates_sender=dispatch_updates_channel.new_sender(),
)
await asyncio.gather(dispatcher.start(), dispatch_runner.start())
```
"""

def __init__(
self,
actor: Actor,
dispatch_type: str,
running_status_receiver: Receiver[Dispatch],
updates_sender: Sender[DispatchUpdate] | None = None,
) -> None:
"""Initialize the dispatch handler.
Args:
actor: The actor to manage.
dispatch_type: The type of dispatches to handle.
running_status_receiver: The receiver for dispatch running status changes.
updates_sender: The sender for dispatch events
"""
super().__init__()
self._dispatch_rx = running_status_receiver
self._actor = actor
self._dispatch_type = dispatch_type
self._updates_sender = updates_sender

def _start_actor(self) -> None:
"""Start the actor."""
if self._actor.is_running:
_logger.warning("Actor %s is already running", self._actor.name)
else:
self._actor.start()

async def _stop_actor(self, msg: str) -> None:
"""Stop the actor.
Args:
msg: The message to be passed to the actor being stopped.
"""
if self._actor.is_running:
await self._actor.stop(msg)
else:
_logger.warning("Actor %s is not running", self._actor.name)

async def _run(self) -> None:
"""Wait for dispatches and handle them."""
async for dispatch in self._dispatch_rx:
await self._handle_dispatch(dispatch=dispatch)

async def _handle_dispatch(self, dispatch: Dispatch) -> None:
"""Handle a dispatch.
Args:
dispatch: The dispatch to handle.
"""
running = dispatch.running(self._dispatch_type)
match running:
case RunningState.STOPPED:
_logger.info("Stopped by dispatch %s", dispatch.id)
await self._stop_actor("Dispatch stopped")
case RunningState.RUNNING:
if self._updates_sender is not None:
_logger.info("Updated by dispatch %s", dispatch.id)
await self._updates_sender.send(
DispatchUpdate(
components=dispatch.selector,
dry_run=dispatch.dry_run,
options=dispatch.payload,
)
)

_logger.info("Started by dispatch %s", dispatch.id)
self._start_actor()
case RunningState.DIFFERENT_TYPE:
_logger.debug(
"Unknown dispatch! Ignoring dispatch of type %s", dispatch.type
)
164 changes: 164 additions & 0 deletions tests/test_mananging_actor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
# LICENSE: ALL RIGHTS RESERVED
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH

"""Test the dispatch runner."""

import asyncio
from dataclasses import dataclass, replace
from datetime import datetime, timedelta, timezone
from typing import AsyncIterator, Iterator

import async_solipsism
import pytest
import time_machine
from frequenz.channels import Broadcast, Receiver, Sender
from frequenz.client.dispatch.test.generator import DispatchGenerator
from frequenz.client.dispatch.types import Frequency
from frequenz.sdk.actor import Actor
from pytest import fixture

from frequenz.dispatch import Dispatch, DispatchManagingActor, DispatchUpdate


# This method replaces the event loop for all tests in the file.
@pytest.fixture
def event_loop_policy() -> async_solipsism.EventLoopPolicy:
"""Return an event loop policy that uses the async solipsism event loop."""
return async_solipsism.EventLoopPolicy()


@fixture
def fake_time() -> Iterator[time_machine.Coordinates]:
"""Replace real time with a time machine that doesn't automatically tick."""
# destination can be a datetime or a timestamp (int), so are moving to the
# epoch (in UTC!)
with time_machine.travel(destination=0, tick=False) as traveller:
yield traveller


def _now() -> datetime:
"""Return the current time in UTC."""
return datetime.now(tz=timezone.utc)


class MockActor(Actor):
"""Mock actor for testing."""

async def _run(self) -> None:
while True:
await asyncio.sleep(1)


@dataclass
class TestEnv:
"""Test environment."""

actor: Actor
runner_actor: DispatchManagingActor
running_status_sender: Sender[Dispatch]
configuration_receiver: Receiver[DispatchUpdate]
generator: DispatchGenerator = DispatchGenerator()


@fixture
async def test_env() -> AsyncIterator[TestEnv]:
"""Create a test environment."""
channel = Broadcast[Dispatch](name="dispatch ready test channel")
config_channel = Broadcast[DispatchUpdate](name="dispatch config test channel")

actor = MockActor()

runner_actor = DispatchManagingActor(
actor=actor,
dispatch_type="UNIT_TEST",
running_status_receiver=channel.new_receiver(),
updates_sender=config_channel.new_sender(),
)

runner_actor.start()

yield TestEnv(
actor=actor,
runner_actor=runner_actor,
running_status_sender=channel.new_sender(),
configuration_receiver=config_channel.new_receiver(),
)

await runner_actor.stop()


async def test_simple_start_stop(
test_env: TestEnv,
fake_time: time_machine.Coordinates,
) -> None:
"""Test behavior when receiving start/stop messages."""
now = _now()
duration = timedelta(minutes=10)
dispatch = test_env.generator.generate_dispatch()
dispatch = replace(
dispatch,
active=True,
dry_run=False,
duration=duration,
start_time=now,
payload={"test": True},
type="UNIT_TEST",
recurrence=replace(
dispatch.recurrence,
frequency=Frequency.UNSPECIFIED,
),
)

await test_env.running_status_sender.send(Dispatch(dispatch))
fake_time.shift(timedelta(seconds=1))

event = await test_env.configuration_receiver.receive()
assert event.options == {"test": True}
assert event.components == dispatch.selector
assert event.dry_run is False

assert test_env.actor.is_running is True

fake_time.shift(duration)
await test_env.running_status_sender.send(Dispatch(dispatch))

# Give await actor.stop a chance to run in DispatchManagingActor
await asyncio.sleep(0.1)

assert test_env.actor.is_running is False


async def test_dry_run(test_env: TestEnv, fake_time: time_machine.Coordinates) -> None:
"""Test the dry run mode."""
dispatch = test_env.generator.generate_dispatch()
dispatch = replace(
dispatch,
dry_run=True,
active=True,
start_time=_now(),
duration=timedelta(minutes=10),
type="UNIT_TEST",
recurrence=replace(
dispatch.recurrence,
frequency=Frequency.UNSPECIFIED,
),
)

await test_env.running_status_sender.send(Dispatch(dispatch))
fake_time.shift(timedelta(seconds=1))

event = await test_env.configuration_receiver.receive()

assert event.dry_run is dispatch.dry_run
assert event.components == dispatch.selector
assert event.options == dispatch.payload
assert test_env.actor.is_running is True

assert dispatch.duration is not None
fake_time.shift(dispatch.duration)
await test_env.running_status_sender.send(Dispatch(dispatch))

# Give await actor.stop a chance to run in DispatchManagingActor
await asyncio.sleep(0.1)

assert test_env.actor.is_running is False

0 comments on commit de4682a

Please sign in to comment.