Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dispatch Managing Actor #54

Merged
merged 11 commits into from
Sep 30, 2024
2 changes: 1 addition & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

## New Features

<!-- Here goes the main new features and examples or instructions on how to use them -->
* We now provide the `DispatchManagingActor` class, a class to manage actors based on incoming dispatches.

## Bug Fixes

Expand Down
7 changes: 4 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ dependencies = [
# Make sure to update the version for cross-referencing also in the
# mkdocs.yml file when changing the version here (look for the config key
# plugins.mkdocstrings.handlers.python.import)
"frequenz-sdk >= 1.0.0-rc900, < 1.0.0-rc1000",
"frequenz-channels >= 1.1.0, < 2.0.0",
"frequenz-client-dispatch >= 0.6.0, < 0.7.0",
"frequenz-sdk == 1.0.0-rc900, < 1.0.0-rc1000",
"frequenz-channels >= 1.2.0, < 2.0.0",
"frequenz-client-dispatch >= 0.7.0, < 0.8.0",
]
dynamic = ["version"]

Expand Down Expand Up @@ -165,6 +165,7 @@ disable = [
[tool.pytest.ini_options]
testpaths = ["tests", "src"]
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "function"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should remove this and keep the default as it is?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we get a depreciation warning saying the behavior will change and one should set a default

Copy link
Contributor

@llucax llucax Sep 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, OK, I'm confused now. So this has nothing to do with trying to make tests pass and is just a new deprecation message in a new version of pytest-asyncio? If so it would be probably nice to put it in a separate commit with a corresponding commit message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I def. tried this to make the test pass, bit it also fixes a deprecation warning, it just didn't help though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

required_plugins = ["pytest-asyncio", "pytest-mock"]

[tool.mypy]
Expand Down
5 changes: 5 additions & 0 deletions src/frequenz/dispatch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
* [Dispatcher][frequenz.dispatch.Dispatcher]: The entry point for the API.
* [Dispatch][frequenz.dispatch.Dispatch]: A dispatch type with lots of useful extra functionality.
* [DispatchManagingActor][frequenz.dispatch.DispatchManagingActor]: An actor to
manage other actors based on incoming dispatches.
* [Created][frequenz.dispatch.Created],
[Updated][frequenz.dispatch.Updated],
[Deleted][frequenz.dispatch.Deleted]: Dispatch event types.
Expand All @@ -16,6 +18,7 @@
from ._dispatch import Dispatch, RunningState
from ._dispatcher import Dispatcher, ReceiverFetcher
from ._event import Created, Deleted, DispatchEvent, Updated
from ._managing_actor import DispatchManagingActor, DispatchUpdate

__all__ = [
"Created",
Expand All @@ -26,4 +29,6 @@
"Updated",
"Dispatch",
"RunningState",
"DispatchManagingActor",
"DispatchUpdate",
]
14 changes: 14 additions & 0 deletions src/frequenz/dispatch/_dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,13 @@ def running(self, type_: str) -> RunningState:
return RunningState.STOPPED

now = datetime.now(tz=timezone.utc)

if now < self.start_time:
return RunningState.STOPPED
# A dispatch without duration is always running once it started
if self.duration is None:
return RunningState.RUNNING

if until := self._until(now):
return RunningState.RUNNING if now < until else RunningState.STOPPED

Expand Down Expand Up @@ -185,6 +192,7 @@ def next_run_after(self, after: datetime) -> datetime | None:
if (
not self.recurrence.frequency
or self.recurrence.frequency == Frequency.UNSPECIFIED
or self.duration is None # Infinite duration
llucax marked this conversation as resolved.
Show resolved Hide resolved
):
if after > self.start_time:
return None
Expand Down Expand Up @@ -236,7 +244,13 @@ def _until(self, now: datetime) -> datetime | None:

Returns:
The time when the dispatch should end or None if the dispatch is not running.

Raises:
ValueError: If the dispatch has no duration.
"""
if self.duration is None:
raise ValueError("_until: Dispatch has no duration")

if (
not self.recurrence.frequency
or self.recurrence.frequency == Frequency.UNSPECIFIED
Expand Down
180 changes: 180 additions & 0 deletions src/frequenz/dispatch/_managing_actor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
# License: All rights reserved
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH

"""Helper class to manage actors based on dispatches."""

import logging
from dataclasses import dataclass
from typing import Any, Set

from frequenz.channels import Receiver, Sender
from frequenz.client.dispatch.types import ComponentSelector
from frequenz.sdk.actor import Actor

from ._dispatch import Dispatch, RunningState

_logger = logging.getLogger(__name__)


@dataclass(frozen=True, kw_only=True)
class DispatchUpdate:
"""Event emitted when the dispatch changes."""

components: ComponentSelector
"""Components to be used."""

dry_run: bool
"""Whether this is a dry run."""

options: dict[str, Any]
"""Additional options."""


class DispatchManagingActor(Actor):
"""Helper class to manage actors based on dispatches.

Example usage:

```python
import os
import asyncio
from frequenz.dispatch import Dispatcher, DispatchManagingActor, DispatchUpdate
from frequenz.client.dispatch.types import ComponentSelector
from frequenz.client.common.microgrid.components import ComponentCategory

from frequenz.channels import Receiver, Broadcast

class MyActor(Actor):
def __init__(self, updates_channel: Receiver[DispatchUpdate]):
super().__init__()
self._updates_channel = updates_channel
self._dry_run: bool
self._options : dict[str, Any]

async def _run(self) -> None:
while True:
update = await self._updates_channel.receive()
print("Received update:", update)

self.set_components(update.components)
self._dry_run = update.dry_run
self._options = update.options

def set_components(self, components: ComponentSelector) -> None:
match components:
case [int(), *_] as component_ids:
print("Dispatch: Setting components to %s", components)
case [ComponentCategory.BATTERY, *_]:
print("Dispatch: Using all battery components")
case unsupported:
print(
"Dispatch: Requested an unsupported selector %r, "
"but only component IDs or category BATTERY are supported.",
unsupported,
)

async def run():
url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
key = os.getenv("DISPATCH_API_KEY", "some-key")

microgrid_id = 1

dispatcher = Dispatcher(
microgrid_id=microgrid_id,
server_url=url,
key=key
)

# Create update channel to receive dispatch update events pre-start and mid-run
dispatch_updates_channel = Broadcast[DispatchUpdate](name="dispatch_updates_channel")

# Start actor and give it an dispatch updates channel receiver
my_actor = MyActor(dispatch_updates_channel.new_receiver())

status_receiver = dispatcher.running_status_change.new_receiver()

managing_actor = DispatchManagingActor(
actor=my_actor,
dispatch_type="EXAMPLE",
running_status_receiver=status_receiver,
updates_sender=dispatch_updates_channel.new_sender(),
)

await asyncio.gather(dispatcher.start(), managing_actor.start())
```
"""

def __init__(
self,
actor: Actor | Set[Actor],
dispatch_type: str,
running_status_receiver: Receiver[Dispatch],
updates_sender: Sender[DispatchUpdate] | None = None,
) -> None:
"""Initialize the dispatch handler.

Args:
actor: A set of actors or a single actor to manage.
dispatch_type: The type of dispatches to handle.
running_status_receiver: The receiver for dispatch running status changes.
updates_sender: The sender for dispatch events
"""
super().__init__()
self._dispatch_rx = running_status_receiver
self._actors = frozenset([actor] if isinstance(actor, Actor) else actor)
self._dispatch_type = dispatch_type
self._updates_sender = updates_sender

def _start_actors(self) -> None:
"""Start all actors."""
for actor in self._actors:
if actor.is_running:
_logger.warning("Actor %s is already running", actor.name)
else:
actor.start()

async def _stop_actors(self, msg: str) -> None:
"""Stop all actors.

Args:
msg: The message to be passed to the actors being stopped.
"""
for actor in self._actors:
if actor.is_running:
await actor.stop(msg)
else:
_logger.warning("Actor %s is not running", actor.name)

async def _run(self) -> None:
"""Wait for dispatches and handle them."""
async for dispatch in self._dispatch_rx:
await self._handle_dispatch(dispatch=dispatch)

async def _handle_dispatch(self, dispatch: Dispatch) -> None:
"""Handle a dispatch.

Args:
dispatch: The dispatch to handle.
"""
running = dispatch.running(self._dispatch_type)
match running:
case RunningState.STOPPED:
_logger.info("Stopped by dispatch %s", dispatch.id)
await self._stop_actors("Dispatch stopped")
case RunningState.RUNNING:
if self._updates_sender is not None:
_logger.info("Updated by dispatch %s", dispatch.id)
await self._updates_sender.send(
DispatchUpdate(
components=dispatch.selector,
dry_run=dispatch.dry_run,
options=dispatch.payload,
)
)

_logger.info("Started by dispatch %s", dispatch.id)
self._start_actors()
case RunningState.DIFFERENT_TYPE:
_logger.debug(
"Unknown dispatch! Ignoring dispatch of type %s", dispatch.type
)
Loading
Loading