A collection of wrapper classes for event broadcast and task management for python (Python Threads or Asyncio)
from zthreading.events import EventHandler
handler = EventHandler()
def handle_test_event(msg: str):
print("The event messge: " + msg)
handler.on("test", handle_test_event)
handler.emit("test", "the message")
from zthreading.events import EventHandler
from zthreading.tasks import Task
handler = EventHandler()
def handle_test_event(msg: str):
print("The event messge: " + msg)
def run_in_a_different_thread(msg):
handler.emit("test", msg)
handler.on("test", handle_test_event)
Task(run_in_a_different_thread).start("A message from a thread").join()
from zthreading.decorators import collect_consecutive_calls_async, as_task, catch_signal
from singal import Signals, raise_signal
from zthreading.tasks import Task
for i in range(1, 20):
consecutive_calls_action()
@as_task
def my_method_as_task(a:int,b:str):
print(b+str(a))
my_task:Task = my_method_as_task("a",1)
my_task.join()
@catch_signal(Signals.SIGTERM)
def my_signal_ignore_method():
# will be ignored.
raise_signal(Signals.SIGTERM)
@collect_consecutive_calls_async()
def consecutive_calls_action(): # Like save this to file.. for example.
# should be printed twice, once for the first call, and another for the last call.
print("consecutive called action")
See decorator help for more
- TASKS_DEFAULT_TO_ASYNC_LOOP - If set to "true", will default all tasks to use asyncio.
Note: The task object is an EventHandler and has all the capabilities of one.
(Examples in code)
- Task.wait_for_all (tasks.... )
- Task.wait_for_some (tasks... )
- Task.wait_for_one (tasks...)
- Task.wait_for_events(tasks, event names....)
Transferring events from one handler to another. If a weak reference is used then then the second handler can be deleted by garbage collection.
from zthreading.events import EventHandler
handler_a = EventHandler()
handler_b = EventHandler()
# Transfer all events to handler b, as
# long as handler b is object is in memory. Will
# not keep handler_b in memory.
handler_a.pipe(handler_b, use_weak_reference=True)
def handle_test_event(msg: str):
print("The event messge: " + msg)
handler_b.on("test", handle_test_event)
handler_a.emit("test", "The piped message")
Events can be streamed (yield generator),
from random import random
from time import sleep
from zthreading.tasks import Task
# A task is an EventHandler, and has the
# on method as well.
task: Task = None
def invoke_timed_events():
sleep(1)
for i in range(1, 10):
sleep(random() / 10)
task.emit("test", f"loop index {i}")
task.stop_all_streams()
task = Task(invoke_timed_events).start()
for ev in task.stream("test"):
print(f"{ev.name}, {ev.args[0]}")
pip install zthreading
To install from master branch,
pip install git+https://github.com/LamaAni/zthreading.py.git@master
To install from a release (tag)
pip install git+https://github.com/LamaAni/zthreading.py.git@[tag]
Feel free to ping me in issues or directly on LinkedIn to contribute.
Copyright ©
Zav Shotan
and other contributors.
It is free software, released under the MIT licence, and may be redistributed under the terms specified in LICENSE
.