From 36d88ae2b1ffb2ef9e315887150b2e1bd9e14523 Mon Sep 17 00:00:00 2001 From: John Andersen Date: Fri, 7 Jun 2019 13:59:16 -0700 Subject: [PATCH] util: asynchelper: aenter_stack method Signed-off-by: John Andersen --- CHANGELOG.md | 4 ++ dffml/df/base.py | 80 ++++++++------------------------- dffml/df/memory.py | 43 ++++++++++++++++++ dffml/util/asynchelper.py | 56 ++++++++++++++++++++++- docs/{api.rst => api/index.rst} | 11 ++--- docs/api/util/asynchelper.rst | 5 +++ docs/api/util/index.rst | 11 +++++ docs/conf.py | 3 ++ docs/index.rst | 2 +- 9 files changed, 147 insertions(+), 68 deletions(-) rename docs/{api.rst => api/index.rst} (52%) create mode 100644 docs/api/util/asynchelper.rst create mode 100644 docs/api/util/index.rst diff --git a/CHANGELOG.md b/CHANGELOG.md index 61268ab0df..c10bd1b2f8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Auto generation of documentation for operation implementations, models, and sources. Generated docs include information on configuration options and inputs and outputs for operation implementations. +- Async helpers got an `aenter_stack` method which creates and returns and + `contextlib.AsyncExitStack` after entering all the context's passed to it. ### Changed - OperationImplementation add_label and add_orig_label methods now use op.name instead of ENTRY_POINT_ORIG_LABEL and ENTRY_POINT_NAME. @@ -22,6 +24,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - MemorySource now decorated with `entry_point` - MemorySource takes arguments correctly via `config_set` and `config_get` - skel modules have `long_description_content_type` set to "text/markdown" +- Base Orchestrator `__aenter__` and `__aexit__` methods were moved to the + Memory Orchestrator because they are specific to that config. ## [0.2.0] - 2019-05-23 ### Added diff --git a/dffml/df/base.py b/dffml/df/base.py index a3345072e6..4b91f0633e 100644 --- a/dffml/df/base.py +++ b/dffml/df/base.py @@ -26,6 +26,7 @@ ) from ..util.cli.arg import Arg from ..util.cli.cmd import CMD +from ..util.asynchelper import context_stacker, aenter_stack from ..util.entrypoint import base_entry_point @@ -101,7 +102,7 @@ def add_label(cls, *above): return list(above) + cls.op.name.split("_") -def op(**kwargs): +def op(imp_enter=None, ctx_enter=None, **kwargs): def wrap(func): if not "name" in kwargs: kwargs["name"] = func.__name__ @@ -115,7 +116,9 @@ def wrap(func): func, OperationImplementationContext ): - class Implementation(OperationImplementation): + class Implementation( + context_stacker(OperationImplementation, imp_enter) + ): op = func.op CONTEXT = func @@ -124,14 +127,25 @@ class Implementation(OperationImplementation): return func else: - class ImplementationContext(OperationImplementationContext): + class ImplementationContext( + context_stacker(OperationImplementationContext, ctx_enter) + ): async def run( self, inputs: Dict[str, Any] ) -> Union[bool, Dict[str, Any]]: # TODO Add auto thread pooling of non-async functions + # If imp_enter or ctx_enter exist then bind the function to + # the ImplementationContext so that it has access to the + # context and it's parent + if imp_enter is not None or ctx_enter is not None: + return await ( + func.__get__(self, self.__class__)(**inputs) + ) return await func(**inputs) - class Implementation(OperationImplementation): + class Implementation( + context_stacker(OperationImplementation, imp_enter) + ): op = func.op CONTEXT = ImplementationContext @@ -609,37 +623,6 @@ class BaseOrchestratorConfig(BaseConfig, NamedTuple): class BaseOrchestratorContext(BaseDataFlowObjectContext): - def __init__(self, parent: "BaseOrchestrator") -> None: - super().__init__(parent) - self.__stack = None - - async def __aenter__(self) -> "BaseOrchestratorContext": - """ - Ahoy matey, enter if ye dare into the management of the contexts. Eh - well not sure if there's really any context being managed here... - """ - self.__stack = AsyncExitStack() - await self.__stack.__aenter__() - self.rctx = await self.__stack.enter_async_context( - self.parent.rchecker() - ) - self.ictx = await self.__stack.enter_async_context( - self.parent.input_network() - ) - self.octx = await self.__stack.enter_async_context( - self.parent.operation_network() - ) - self.lctx = await self.__stack.enter_async_context( - self.parent.lock_network() - ) - self.nctx = await self.__stack.enter_async_context( - self.parent.opimp_network() - ) - return self - - async def __aexit__(self, exc_type, exc_value, traceback): - await self.__stack.aclose() - @abc.abstractmethod async def run_operations( self, strict: bool = False @@ -651,29 +634,4 @@ async def run_operations( @base_entry_point("dffml.orchestrator", "dff") class BaseOrchestrator(BaseDataFlowObject): - def __init__(self, config: "BaseConfig") -> None: - super().__init__(config) - self.__stack = None - - async def __aenter__(self) -> "DataFlowFacilitator": - self.__stack = AsyncExitStack() - await self.__stack.__aenter__() - self.rchecker = await self.__stack.enter_async_context( - self.config.rchecker - ) - self.input_network = await self.__stack.enter_async_context( - self.config.input_network - ) - self.operation_network = await self.__stack.enter_async_context( - self.config.operation_network - ) - self.lock_network = await self.__stack.enter_async_context( - self.config.lock_network - ) - self.opimp_network = await self.__stack.enter_async_context( - self.config.opimp_network - ) - return self - - async def __aexit__(self, exc_type, exc_value, traceback): - await self.__stack.aclose() + pass # pragma: no cov diff --git a/dffml/df/memory.py b/dffml/df/memory.py index 4c7f8d69ba..5d73583aa6 100644 --- a/dffml/df/memory.py +++ b/dffml/df/memory.py @@ -49,6 +49,7 @@ from ..util.cli.arg import Arg from ..util.cli.cmd import CMD from ..util.data import ignore_args +from ..util.asynchelper import context_stacker, aenter_stack from .log import LOGGER @@ -804,6 +805,27 @@ class MemoryOrchestratorConfig(BaseOrchestratorConfig): class MemoryOrchestratorContext(BaseOrchestratorContext): + def __init__(self, parent: "BaseOrchestrator") -> None: + super().__init__(parent) + self._stack = None + + async def __aenter__(self) -> "BaseOrchestratorContext": + self._stack = AsyncExitStack() + self._stack = await aenter_stack( + self, + { + "rctx": self.parent.rchecker, + "ictx": self.parent.input_network, + "octx": self.parent.operation_network, + "lctx": self.parent.lock_network, + "nctx": self.parent.opimp_network, + }, + ) + return self + + async def __aexit__(self, exc_type, exc_value, traceback): + await self._stack.aclose() + async def run_operations( self, strict: bool = False ) -> AsyncIterator[Tuple[BaseContextHandle, Dict[str, Any]]]: @@ -995,6 +1017,27 @@ class MemoryOrchestrator(BaseOrchestrator): CONTEXT = MemoryOrchestratorContext + def __init__(self, config: "BaseConfig") -> None: + super().__init__(config) + self._stack = None + + async def __aenter__(self) -> "DataFlowFacilitator": + self._stack = await aenter_stack( + self, + { + "rchecker": self.config.rchecker, + "input_network": self.config.input_network, + "operation_network": self.config.operation_network, + "lock_network": self.config.lock_network, + "opimp_network": self.config.opimp_network, + }, + call=False, + ) + return self + + async def __aexit__(self, exc_type, exc_value, traceback): + await self._stack.aclose() + @classmethod def args(cls, args, *above) -> Dict[str, Arg]: # Extending above is done right before loading args of subclasses diff --git a/dffml/util/asynchelper.py b/dffml/util/asynchelper.py index d9e01b7f5b..89848f09e0 100644 --- a/dffml/util/asynchelper.py +++ b/dffml/util/asynchelper.py @@ -8,7 +8,7 @@ import asyncio from collections import UserList from contextlib import AsyncExitStack -from typing import Dict, Any, AsyncIterator, Tuple +from typing import Dict, Any, AsyncIterator, Tuple, Type, AsyncContextManager from .log import LOGGER @@ -106,3 +106,57 @@ async def concurrently( # For tasks which are done but have expections which we didn't # raise, collect their execptions task.exception() + + +async def aenter_stack( + obj: Any, + context_managers: Dict[str, AsyncContextManager], + call: bool = True, +) -> AsyncExitStack: + """ + Create a :py:class:`contextlib.AsyncExitStack` then go through each key, + value pair in the dict of async context managers. Enter the context of each + async context manager and call setattr on ``obj`` to set the attribute by + the name of ``key`` to the value yielded by the async context manager. + + If ``call`` is true then the context entered will be the context returned by + calling each context manager respectively. + """ + stack = AsyncExitStack() + await stack.__aenter__() + if context_managers is not None: + for key, ctxmanager in context_managers.items(): + if call: + # Bind if not bound + if hasattr(ctxmanager, "__self__"): + ctxmanager = ctxmanager.__get__(obj, obj.__class__) + setattr( + obj, key, await stack.enter_async_context(ctxmanager()) + ) + else: + setattr(obj, key, await stack.enter_async_context(ctxmanager)) + return stack + + +def context_stacker( + inherit: Type, context_managers: Dict[str, AsyncContextManager] +) -> Type: + """ + Using :func:`aenter_stack` + """ + + class ContextStacker(inherit): + def __init__(self, *args, **kwargs) -> None: + super().__init__(*args, **kwargs) + self._stack = None + + async def __aenter__(self): + await super().__aenter__() + self._stack = await aenter_stack(self, context_managers) + return self + + async def __aexit__(self, exc_type, exc_value, traceback): + await super().__aexit__(exc_type, exc_value, traceback) + await self._stack.aclose() + + return ContextStacker diff --git a/docs/api.rst b/docs/api/index.rst similarity index 52% rename from docs/api.rst rename to docs/api/index.rst index 471d7a371b..2c617b4542 100644 --- a/docs/api.rst +++ b/docs/api/index.rst @@ -6,8 +6,9 @@ API Reference :maxdepth: 2 :caption: Contents: - api/df/index - api/feature - api/repo - api/model/index - api/source/index + df/index + feature + repo + model/index + source/index + util/index diff --git a/docs/api/util/asynchelper.rst b/docs/api/util/asynchelper.rst new file mode 100644 index 0000000000..82639cb124 --- /dev/null +++ b/docs/api/util/asynchelper.rst @@ -0,0 +1,5 @@ +Asyncio Helpers +=============== + +.. automodule:: dffml.util.asynchelper + :members: diff --git a/docs/api/util/index.rst b/docs/api/util/index.rst new file mode 100644 index 0000000000..f2bbd8c65f --- /dev/null +++ b/docs/api/util/index.rst @@ -0,0 +1,11 @@ +API Helper Utilities Reference +============================== + +:py:mod:`asyncio` testing, command line, and configuration helpers live here. + +.. toctree:: + :glob: + :maxdepth: 2 + :caption: Contents: + + asynchelper diff --git a/docs/conf.py b/docs/conf.py index bda5c0ade6..ae52c27ca8 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -34,11 +34,14 @@ # extensions coming with Sphinx (named 'sphinx.ext.*') or your custom # ones. extensions = [ + 'sphinx.ext.intersphinx', 'sphinx.ext.autodoc', 'sphinx.ext.viewcode', 'sphinxcontrib.asyncio', ] +intersphinx_mapping = {'python': ('https://docs.python.org/3', None)} + # Add any paths that contain templates here, relative to this directory. templates_path = ['_templates'] diff --git a/docs/index.rst b/docs/index.rst index 5a45fde248..1ad9478914 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -32,7 +32,7 @@ set of feature generators which gather data from git repositories. usage/* concepts/index plugins/index - api + api/index Indices and tables