Skip to content

Commit

Permalink
Add dispatch runner
Browse files Browse the repository at this point in the history
Signed-off-by: Mathias L. Baumann <[email protected]>
  • Loading branch information
Marenz committed Sep 12, 2024
1 parent 46d5b17 commit 7e3419b
Show file tree
Hide file tree
Showing 4 changed files with 364 additions and 1 deletion.
2 changes: 1 addition & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

## New Features

<!-- Here goes the main new features and examples or instructions on how to use them -->
* We now provide the `DispatchRunnerActor` class, a class to manage actors based on incoming dispatches.

## Bug Fixes

Expand Down
5 changes: 5 additions & 0 deletions src/frequenz/dispatch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@
* [Dispatcher][frequenz.dispatch.Dispatcher]: The entry point for the API.
* [Dispatch][frequenz.dispatch.Dispatch]: A dispatch type with lots of useful extra functionality.
* [DispatchRunnerActor][frequenz.dispatch.DispatchRunnerActor]: An actor to
manage other actors based on incoming dispatches.
* [Created][frequenz.dispatch.Created],
[Updated][frequenz.dispatch.Updated],
[Deleted][frequenz.dispatch.Deleted]: Dispatch event types.
"""

from ._actor_runner import DispatchConfigurationEvent, DispatchRunnerActor
from ._dispatch import Dispatch, RunningState
from ._dispatcher import Dispatcher, ReceiverFetcher
from ._event import Created, Deleted, DispatchEvent, Updated
Expand All @@ -26,4 +29,6 @@
"Updated",
"Dispatch",
"RunningState",
"DispatchRunnerActor",
"DispatchConfigurationEvent",
]
187 changes: 187 additions & 0 deletions src/frequenz/dispatch/_actor_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
# License: All rights reserved
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH

"""Helper class to manage actors based on dispatches."""

import logging
from dataclasses import dataclass
from typing import Any

from frequenz.channels import Receiver, Sender
from frequenz.client.dispatch.types import ComponentSelector
from frequenz.sdk.actor import Actor

from ._dispatch import Dispatch, RunningState

_logger = logging.getLogger(__name__)


@dataclass(frozen=True, kw_only=True)
class DispatchConfigurationEvent:
"""Event emitted when the dispatch configuration changes."""

components: ComponentSelector
"""Components to be used."""

dry_run: bool
"""Whether this is a dry run."""

payload: dict[str, Any]
"""Additional payload."""


class DispatchRunnerActor(Actor):
"""Helper class to manage actors based on dispatches.
Example usage:
```python
import os
import asyncio
from frequenz.dispatch import Dispatcher, DispatchRunnerActor, DispatchConfigurationEvent
from frequenz.client.dispatch.types import ComponentSelector
from frequenz.client.common.microgrid.components import ComponentCategory
from frequenz.channels import Receiver, Broadcast
from unittest.mock import MagicMock
class MyActor(Actor):
def __init__(self, config_channel: Receiver[DispatchConfigurationEvent]):
super().__init__()
self._config_channel = config_channel
self._dry_run: bool
self._payload: dict[str, Any]
async def _run(self) -> None:
while True:
config = await self._config_channel.receive()
print("Received config:", config)
self.set_components(config.components)
self._dry_run = config.dry_run
self._payload = config.payload
def set_components(self, components: ComponentSelector) -> None:
match components:
case [int(), *_] as component_ids:
print("Dispatch: Setting components to %s", components)
case [ComponentCategory.BATTERY, *_]:
print("Dispatch: Using all battery components")
case _ as unsupported:
print(
"Dispatch: Requested an unsupported selector %r, "
"but only component IDs or category BATTERY are supported.",
unsupported,
)
async def run():
url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
key = os.getenv("DISPATCH_API_KEY", "some-key")
microgrid_id = 1
dispatcher = Dispatcher(
microgrid_id=microgrid_id,
server_url=url,
key=key
)
# Create config channel to receive (re-)configuration events pre-start and mid-run
config_channel = Broadcast[DispatchConfigurationEvent](name="config_channel")
# Start actor and supporting actor, give each a config channel receiver
my_actor = MyActor(config_channel.new_receiver())
supporting_actor = MagicMock(config_channel.new_receiver())
status_receiver = dispatcher.running_status_change.new_receiver()
dispatch_handler = DispatchRunnerActor(
actors=frozenset([my_actor, supporting_actor]),
dispatch_type="EXAMPLE",
running_status_receiver=status_receiver,
configuration_sender=config_channel.new_sender(),
)
await asyncio.gather(dispatcher.start(), dispatch_handler.start())
```
"""

def __init__(
self,
actors: frozenset[Actor],
dispatch_type: str,
running_status_receiver: Receiver[Dispatch],
configuration_sender: Sender[DispatchConfigurationEvent] | None = None,
) -> None:
"""Initialize the dispatch handler.
Args:
actors: The actors to handle.
dispatch_type: The type of dispatches to handle.
running_status_receiver: The receiver for dispatch running status changes.
configuration_sender: The sender for dispatch configuration events
"""
super().__init__()
self._dispatch_rx = running_status_receiver
self._actors = actors
self._dispatch_type = dispatch_type
self._configuration_sender = configuration_sender

def _start_actors(self) -> None:
"""Start all actors."""
for actor in self._actors:
if actor.is_running:
_logger.warning("Actor %s is already running", actor.name)
else:
actor.start()

async def _stop_actors(self, msg: str) -> None:
"""Stop all actors.
Args:
msg: The message to be passed to the actors being stopped.
"""
for actor in self._actors:
if actor.is_running:
await actor.stop(msg)
else:
_logger.warning("Actor %s is not running", actor.name)

async def _run(self) -> None:
"""Wait for dispatches and handle them."""
while True:
_logger.info("Waiting for dispatch...")
dispatch = await self._dispatch_rx.receive()
await self._handle_dispatch(dispatch=dispatch)

async def _handle_dispatch(self, dispatch: Dispatch) -> None:
"""Handle a dispatch.
Args:
dispatch: The dispatch to handle.
Returns:
The running state.
"""
running = dispatch.running(self._dispatch_type)
match running:
case RunningState.STOPPED:
_logger.info("Stopping dispatch...")
await self._stop_actors("Dispatch stopped")
case RunningState.RUNNING:
if self._configuration_sender is not None:
_logger.info("Updating configuration...")
await self._configuration_sender.send(
DispatchConfigurationEvent(
components=dispatch.selector,
dry_run=dispatch.dry_run,
payload=dispatch.payload,
)
)

_logger.info("Running dispatch...")
self._start_actors()
case RunningState.DIFFERENT_TYPE:
_logger.debug(
"Unknown dispatch! Ignoring dispatch of type %s", dispatch.type
)
171 changes: 171 additions & 0 deletions tests/test_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
# LICENSE: ALL RIGHTS RESERVED
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH

"""Test the dispatch runner."""

import asyncio
from dataclasses import dataclass, replace
from datetime import datetime, timedelta, timezone
from typing import AsyncIterator, Iterator

import async_solipsism
import pytest
import time_machine
from frequenz.channels import Broadcast, Receiver, Sender
from frequenz.client.dispatch.test.generator import DispatchGenerator
from frequenz.client.dispatch.types import Frequency
from frequenz.sdk.actor import Actor
from pytest import fixture

from frequenz.dispatch import Dispatch, DispatchConfigurationEvent, DispatchRunnerActor

# pylint: disable=protected-access


# This method replaces the event loop for all tests in the file.
@pytest.fixture
def event_loop_policy() -> async_solipsism.EventLoopPolicy:
"""Return an event loop policy that uses the async solipsism event loop."""
return async_solipsism.EventLoopPolicy()


@fixture
def fake_time() -> Iterator[time_machine.Coordinates]:
"""Replace real time with a time machine that doesn't automatically tick."""
# destination can be a datetime or a timestamp (int), so are moving to the
# epoch (in UTC!)
with time_machine.travel(destination=0, tick=False) as traveller:
yield traveller


def _now() -> datetime:
"""Return the current time in UTC."""
return datetime.now(tz=timezone.utc)


class MockActor(Actor):
"""Mock actor for testing."""

async def _run(self) -> None:
while True:
await asyncio.sleep(1)


@dataclass
class TestEnv:
"""Test environment."""

actor: Actor
runner_actor: DispatchRunnerActor
running_status_sender: Sender[Dispatch]
configuration_receiver: Receiver[DispatchConfigurationEvent]
generator: DispatchGenerator = DispatchGenerator()


@fixture
async def test_env() -> AsyncIterator[TestEnv]:
"""Create a test environment."""
channel = Broadcast[Dispatch](name="dispatch ready test channel")
config_channel = Broadcast[DispatchConfigurationEvent](
name="dispatch config test channel"
)

actor = MockActor()

runner_actor = DispatchRunnerActor(
actors=frozenset([actor]),
dispatch_type="UNIT_TEST",
running_status_receiver=channel.new_receiver(),
configuration_sender=config_channel.new_sender(),
)

runner_actor.start()

yield TestEnv(
actor=actor,
runner_actor=runner_actor,
running_status_sender=channel.new_sender(),
configuration_receiver=config_channel.new_receiver(),
)

await runner_actor.stop()


# Disable for _handle_dispatch access
# pylint: disable=protected-access


async def test_simple_start_stop(
test_env: TestEnv,
fake_time: time_machine.Coordinates,
) -> None:
"""Test behavior when receiving start/stop messages."""
now = _now()
duration = timedelta(minutes=10)
dispatch = test_env.generator.generate_dispatch()
dispatch = replace(
dispatch,
active=True,
dry_run=False,
duration=duration,
start_time=now,
payload={"test": True},
type="UNIT_TEST",
recurrence=replace(
dispatch.recurrence,
frequency=Frequency.UNSPECIFIED,
),
)

await test_env.running_status_sender.send(Dispatch(dispatch))
fake_time.shift(timedelta(seconds=1))

event = await test_env.configuration_receiver.receive()
assert event.payload == {"test": True}
assert event.components == dispatch.selector
assert event.dry_run is False

assert test_env.actor.is_running is True

fake_time.shift(duration)
await test_env.running_status_sender.send(Dispatch(dispatch))

# Give await actor.stop a chance to run in DispatchRunnerActor
await asyncio.sleep(0.1)

assert test_env.actor.is_running is False


async def test_dry_run(test_env: TestEnv, fake_time: time_machine.Coordinates) -> None:
"""Test the dry run mode."""
dispatch = test_env.generator.generate_dispatch()
dispatch = replace(
dispatch,
dry_run=True,
active=True,
start_time=_now(),
duration=timedelta(minutes=10),
type="UNIT_TEST",
recurrence=replace(
dispatch.recurrence,
frequency=Frequency.UNSPECIFIED,
),
)

await test_env.running_status_sender.send(Dispatch(dispatch))
fake_time.shift(timedelta(seconds=1))

event = await test_env.configuration_receiver.receive()

assert event.dry_run is dispatch.dry_run
assert event.components == dispatch.selector
assert event.payload == dispatch.payload
assert test_env.actor.is_running is True

fake_time.shift(dispatch.duration)
await test_env.running_status_sender.send(Dispatch(dispatch))

# Give await actor.stop a chance to run in DispatchRunnerActor
await asyncio.sleep(0.1)

assert test_env.actor.is_running is False

0 comments on commit 7e3419b

Please sign in to comment.