Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rq retry #120

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion src/flask_rq2/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,8 @@ def my_custom_handler(job, *exc_info):
return callback

def job(self, func_or_queue=None, timeout=None, result_ttl=None, ttl=None,
depends_on=None, at_front=None, meta=None, description=None):
depends_on=None, at_front=None, meta=None, description=None,
on_success=None, on_failure=None, retry=None):
"""
Decorator to mark functions for queuing via RQ, e.g.::

Expand Down Expand Up @@ -274,6 +275,16 @@ def add(x, y):
:param description: Description of the job.
:type description: str

:param on_success: Callback when job success.
:type on_success: Callable

:param on_failure: Callback when job fails.
:type on_failure: Callable

:param retry: A Retry() object that specifies how the job should be retried on failure.
:type retry: rq.job.Retry


"""
if callable(func_or_queue):
func = func_or_queue
Expand All @@ -295,6 +306,9 @@ def wrapper(wrapped):
at_front=at_front,
meta=meta,
description=description,
on_success=on_success,
on_failure=on_failure,
retry=retry,
)
wrapped.helper = helper
for function in helper.functions:
Expand Down
22 changes: 21 additions & 1 deletion src/flask_rq2/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class JobFunctions(object):
functions = ['queue', 'schedule', 'cron']

def __init__(self, rq, wrapped, queue_name, timeout, result_ttl, ttl,
depends_on, at_front, meta, description):
depends_on, at_front, meta, description, on_success, on_failure, retry):
self.rq = rq
self.wrapped = wrapped
self._queue_name = queue_name
Expand All @@ -28,6 +28,9 @@ def __init__(self, rq, wrapped, queue_name, timeout, result_ttl, ttl,
self._at_front = at_front
self._meta = meta
self._description = description
self._on_success = on_success
self._on_failure = on_failure
self._retry = retry

def __repr__(self):
full_name = '.'.join([self.wrapped.__module__, self.wrapped.__name__])
Expand Down Expand Up @@ -104,6 +107,17 @@ def add(x, y):
:mod:`UUID <uuid>`.
:type job_id: str

:param on_success: A callback when the job suceeds. Defaults
to None
:type on_success: Callable

:param on_failure: A callback when the job fails. Defaults
to None
:type on_failure: Callable

:param retry: A Retry() object that specifies how the job should be retried on failure.
:type retry: rq.job.Retry

:param at_front: Whether or not the job is queued in front of all other
enqueued jobs.
:type at_front: bool
Expand All @@ -124,6 +138,9 @@ def add(x, y):
at_front = kwargs.pop('at_front', self._at_front)
meta = kwargs.pop('meta', self._meta)
description = kwargs.pop('description', self._description)
on_success = kwargs.pop('on_success', self._on_success)
on_failure = kwargs.pop('on_failure', self._on_failure)
retry = kwargs.pop('retry', self._retry)
return self.rq.get_queue(queue_name).enqueue_call(
self.wrapped,
args=args,
Expand All @@ -136,6 +153,9 @@ def add(x, y):
at_front=at_front,
meta=meta,
description=description,
on_success=on_success,
on_failure=on_failure,
retry=retry
)

def schedule(self, time_or_delta, *args, **kwargs):
Expand Down