Skip to content
This repository has been archived by the owner on Aug 25, 2024. It is now read-only.

Commit

Permalink
util: asynchelper: aenter_stack method
Browse files Browse the repository at this point in the history
Signed-off-by: John Andersen <[email protected]>
  • Loading branch information
John Andersen authored and pdxjohnny committed Jun 7, 2019
1 parent ce03aa7 commit 36d88ae
Show file tree
Hide file tree
Showing 9 changed files with 147 additions and 68 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Auto generation of documentation for operation implementations, models, and
sources. Generated docs include information on configuration options and
inputs and outputs for operation implementations.
- Async helpers got an `aenter_stack` method which creates and returns and
`contextlib.AsyncExitStack` after entering all the context's passed to it.
### Changed
- OperationImplementation add_label and add_orig_label methods now use op.name
instead of ENTRY_POINT_ORIG_LABEL and ENTRY_POINT_NAME.
Expand All @@ -22,6 +24,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- MemorySource now decorated with `entry_point`
- MemorySource takes arguments correctly via `config_set` and `config_get`
- skel modules have `long_description_content_type` set to "text/markdown"
- Base Orchestrator `__aenter__` and `__aexit__` methods were moved to the
Memory Orchestrator because they are specific to that config.

## [0.2.0] - 2019-05-23
### Added
Expand Down
80 changes: 19 additions & 61 deletions dffml/df/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
)
from ..util.cli.arg import Arg
from ..util.cli.cmd import CMD
from ..util.asynchelper import context_stacker, aenter_stack
from ..util.entrypoint import base_entry_point


Expand Down Expand Up @@ -101,7 +102,7 @@ def add_label(cls, *above):
return list(above) + cls.op.name.split("_")


def op(**kwargs):
def op(imp_enter=None, ctx_enter=None, **kwargs):
def wrap(func):
if not "name" in kwargs:
kwargs["name"] = func.__name__
Expand All @@ -115,7 +116,9 @@ def wrap(func):
func, OperationImplementationContext
):

class Implementation(OperationImplementation):
class Implementation(
context_stacker(OperationImplementation, imp_enter)
):

op = func.op
CONTEXT = func
Expand All @@ -124,14 +127,25 @@ class Implementation(OperationImplementation):
return func
else:

class ImplementationContext(OperationImplementationContext):
class ImplementationContext(
context_stacker(OperationImplementationContext, ctx_enter)
):
async def run(
self, inputs: Dict[str, Any]
) -> Union[bool, Dict[str, Any]]:
# TODO Add auto thread pooling of non-async functions
# If imp_enter or ctx_enter exist then bind the function to
# the ImplementationContext so that it has access to the
# context and it's parent
if imp_enter is not None or ctx_enter is not None:
return await (
func.__get__(self, self.__class__)(**inputs)
)
return await func(**inputs)

class Implementation(OperationImplementation):
class Implementation(
context_stacker(OperationImplementation, imp_enter)
):

op = func.op
CONTEXT = ImplementationContext
Expand Down Expand Up @@ -609,37 +623,6 @@ class BaseOrchestratorConfig(BaseConfig, NamedTuple):


class BaseOrchestratorContext(BaseDataFlowObjectContext):
def __init__(self, parent: "BaseOrchestrator") -> None:
super().__init__(parent)
self.__stack = None

async def __aenter__(self) -> "BaseOrchestratorContext":
"""
Ahoy matey, enter if ye dare into the management of the contexts. Eh
well not sure if there's really any context being managed here...
"""
self.__stack = AsyncExitStack()
await self.__stack.__aenter__()
self.rctx = await self.__stack.enter_async_context(
self.parent.rchecker()
)
self.ictx = await self.__stack.enter_async_context(
self.parent.input_network()
)
self.octx = await self.__stack.enter_async_context(
self.parent.operation_network()
)
self.lctx = await self.__stack.enter_async_context(
self.parent.lock_network()
)
self.nctx = await self.__stack.enter_async_context(
self.parent.opimp_network()
)
return self

async def __aexit__(self, exc_type, exc_value, traceback):
await self.__stack.aclose()

@abc.abstractmethod
async def run_operations(
self, strict: bool = False
Expand All @@ -651,29 +634,4 @@ async def run_operations(

@base_entry_point("dffml.orchestrator", "dff")
class BaseOrchestrator(BaseDataFlowObject):
def __init__(self, config: "BaseConfig") -> None:
super().__init__(config)
self.__stack = None

async def __aenter__(self) -> "DataFlowFacilitator":
self.__stack = AsyncExitStack()
await self.__stack.__aenter__()
self.rchecker = await self.__stack.enter_async_context(
self.config.rchecker
)
self.input_network = await self.__stack.enter_async_context(
self.config.input_network
)
self.operation_network = await self.__stack.enter_async_context(
self.config.operation_network
)
self.lock_network = await self.__stack.enter_async_context(
self.config.lock_network
)
self.opimp_network = await self.__stack.enter_async_context(
self.config.opimp_network
)
return self

async def __aexit__(self, exc_type, exc_value, traceback):
await self.__stack.aclose()
pass # pragma: no cov
43 changes: 43 additions & 0 deletions dffml/df/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
from ..util.cli.arg import Arg
from ..util.cli.cmd import CMD
from ..util.data import ignore_args
from ..util.asynchelper import context_stacker, aenter_stack

from .log import LOGGER

Expand Down Expand Up @@ -804,6 +805,27 @@ class MemoryOrchestratorConfig(BaseOrchestratorConfig):


class MemoryOrchestratorContext(BaseOrchestratorContext):
def __init__(self, parent: "BaseOrchestrator") -> None:
super().__init__(parent)
self._stack = None

async def __aenter__(self) -> "BaseOrchestratorContext":
self._stack = AsyncExitStack()
self._stack = await aenter_stack(
self,
{
"rctx": self.parent.rchecker,
"ictx": self.parent.input_network,
"octx": self.parent.operation_network,
"lctx": self.parent.lock_network,
"nctx": self.parent.opimp_network,
},
)
return self

async def __aexit__(self, exc_type, exc_value, traceback):
await self._stack.aclose()

async def run_operations(
self, strict: bool = False
) -> AsyncIterator[Tuple[BaseContextHandle, Dict[str, Any]]]:
Expand Down Expand Up @@ -995,6 +1017,27 @@ class MemoryOrchestrator(BaseOrchestrator):

CONTEXT = MemoryOrchestratorContext

def __init__(self, config: "BaseConfig") -> None:
super().__init__(config)
self._stack = None

async def __aenter__(self) -> "DataFlowFacilitator":
self._stack = await aenter_stack(
self,
{
"rchecker": self.config.rchecker,
"input_network": self.config.input_network,
"operation_network": self.config.operation_network,
"lock_network": self.config.lock_network,
"opimp_network": self.config.opimp_network,
},
call=False,
)
return self

async def __aexit__(self, exc_type, exc_value, traceback):
await self._stack.aclose()

@classmethod
def args(cls, args, *above) -> Dict[str, Arg]:
# Extending above is done right before loading args of subclasses
Expand Down
56 changes: 55 additions & 1 deletion dffml/util/asynchelper.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import asyncio
from collections import UserList
from contextlib import AsyncExitStack
from typing import Dict, Any, AsyncIterator, Tuple
from typing import Dict, Any, AsyncIterator, Tuple, Type, AsyncContextManager

from .log import LOGGER

Expand Down Expand Up @@ -106,3 +106,57 @@ async def concurrently(
# For tasks which are done but have expections which we didn't
# raise, collect their execptions
task.exception()


async def aenter_stack(
obj: Any,
context_managers: Dict[str, AsyncContextManager],
call: bool = True,
) -> AsyncExitStack:
"""
Create a :py:class:`contextlib.AsyncExitStack` then go through each key,
value pair in the dict of async context managers. Enter the context of each
async context manager and call setattr on ``obj`` to set the attribute by
the name of ``key`` to the value yielded by the async context manager.
If ``call`` is true then the context entered will be the context returned by
calling each context manager respectively.
"""
stack = AsyncExitStack()
await stack.__aenter__()
if context_managers is not None:
for key, ctxmanager in context_managers.items():
if call:
# Bind if not bound
if hasattr(ctxmanager, "__self__"):
ctxmanager = ctxmanager.__get__(obj, obj.__class__)
setattr(
obj, key, await stack.enter_async_context(ctxmanager())
)
else:
setattr(obj, key, await stack.enter_async_context(ctxmanager))
return stack


def context_stacker(
inherit: Type, context_managers: Dict[str, AsyncContextManager]
) -> Type:
"""
Using :func:`aenter_stack`
"""

class ContextStacker(inherit):
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self._stack = None

async def __aenter__(self):
await super().__aenter__()
self._stack = await aenter_stack(self, context_managers)
return self

async def __aexit__(self, exc_type, exc_value, traceback):
await super().__aexit__(exc_type, exc_value, traceback)
await self._stack.aclose()

return ContextStacker
11 changes: 6 additions & 5 deletions docs/api.rst → docs/api/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ API Reference
:maxdepth: 2
:caption: Contents:

api/df/index
api/feature
api/repo
api/model/index
api/source/index
df/index
feature
repo
model/index
source/index
util/index
5 changes: 5 additions & 0 deletions docs/api/util/asynchelper.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Asyncio Helpers
===============

.. automodule:: dffml.util.asynchelper
:members:
11 changes: 11 additions & 0 deletions docs/api/util/index.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
API Helper Utilities Reference
==============================

:py:mod:`asyncio` testing, command line, and configuration helpers live here.

.. toctree::
:glob:
:maxdepth: 2
:caption: Contents:

asynchelper
3 changes: 3 additions & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,14 @@
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
# ones.
extensions = [
'sphinx.ext.intersphinx',
'sphinx.ext.autodoc',
'sphinx.ext.viewcode',
'sphinxcontrib.asyncio',
]

intersphinx_mapping = {'python': ('https://docs.python.org/3', None)}

# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']

Expand Down
2 changes: 1 addition & 1 deletion docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ set of feature generators which gather data from git repositories.
usage/*
concepts/index
plugins/index
api
api/index


Indices and tables
Expand Down

0 comments on commit 36d88ae

Please sign in to comment.