API Reference¶
fennel¶
-
class
fennel.
App
(name: str, **kwargs)[source]¶ The app is the main abstraction provided by Fennel. Python functions are decorated via
@app.task
to enable background processing. All settings are configured on this object.- Parameters
name (str) – Used to identify this application, e.g. to set which tasks a worker will execute.
kwargs – Any settings found in
fennel.settings.Settings
Examples
>>> from fennel import App ... >>> app = App( ... name='myapp', ... redis_url='redis://127.0.0.1', ... default_retries=3, ... results_enabled=True, ... log_level='info', ... log_format='json', ... autodiscover='**/tasks.py', ... interface='sync', ... ) ... >>> @app.task(retries=1) >>> def foo(x): ... return x ... >>> x = foo.delay(7) # Execute in the background. >>> x AsyncResult(uuid=Tjr75jM3QDOHoLTLyrsY1g) >>> x.get() # Wait for the result. 7
If your code is running in an asynchronous event loop (e.g. via Starlette, FastAPI, Quart), you will want to use the async interface instead:
>>> import asyncio ... >>> app = App(name='foo', interface='async') ... >>> @app.task >>> async def bar(x) ... await asyncio.sleep(x) ... return x ... >>> x = await bar.delay(5) >>> await x.status() SENT >>> await x.get() 5
-
task
(func: Callable = None, *, name=None, retries=<object object>) → Any[source]¶ A decorator to register a function with the app to enable background processing via the task queue.
The worker process (see
fennel.worker.worker
) will need to discover all registered tasks on startup. The means all the modules containing tasks need to be imported. Fennel will import modules found viafennel.settings.Settings.autodiscover
, which by default is'**/tasks.py'
.- Parameters
func (Callable) – The decorated function.
name (str) – The representation used to uniquely identify this task.
retries (int) – The number of attempts at execution after a task has failed (meaning raised any exception).
Examples
Exposes an interface similar to Celery:
>>> @app.task(retries=1) >>> def foo(x): ... return x
Tasks can be enqueued for processing via:
>>> foo.delay(8) AsyncResult(uuid=q_jb6KaUT-G4tOAoyQ0yaA)
The can also be called normally, bypassing the Fennel system entirely:
>>> foo(3) 3
By default, tasks are ‘fire-and-forget’, meaning we will not wait for their completion. They will be executed by worker process and will be retried automatically on failure (using exponential backoff), so we assume tasks are idempotent.
You can also wait for the result:
>>> x = foo.delay(4) >>> x.status() SENT >>> x.get(timeout=10) 4
If instead you have many tasks and wish to wait for them to complete you can use the waiting primitives provided (you will want to ensure all tasks have retries=0, which you can set by default with an app setting):
>>> from fennel.client import gather, wait >>> results = [foo.delay(x) for x in range(10)] >>> gathered = gather(results) # Or: >>> done, pending = wait(results, timeout=2)
If your application is running in an event loop you can elect to use the async interface for your fennel app (see
fennel.settings.Settings.interface
), which uses aioredis under the hood to enqueue items, retrieve results, etc, so you will need to await those coroutines:>>> app = App(name='foo', interface='async') >>> >>> @app.task >>> async def bar(x) ... await asyncio.sleep(x) >>> >>> x = await bar.delay(1) >>> await x.status() SUCCESS
fennel.settings¶
-
class
fennel.settings.
Settings
[source]¶ Settings can be configured via environment variables or keyword arguments for the fennel.App instance (which take priority).
Examples
For environment variables, the prefix is
FENNEL_
, for instance:FENNEL_REDIS_URL=redis://127.0.0.1:6379
FENNEL_DEFAULT_RETRIES=3
FENNEL_RESULTS_ENABLED=true
Or via App kwargs:
>>> from fennel import App ... >>> app = App( ... name='myapp', ... redis_url='redis://127.0.0.1', ... default_retries=3, ... results_enabled=True, ... log_level='info', ... log_format='json', ... autodiscover='**/tasks.py', ... interface='sync', ... )
- Parameters
redis_url (str) – Redis URL. Default
'redis://127.0.0.1:6369'
interface (str) – Which client interface should we use – sync or async? Default
'sync'
processes (int) – How many executor processes to run in each worker. Default
multiprocessing.cpu_count()
concurrency (int) – How many concurrent consumers to run (we make at least this many Redis connections) in each executor process. The default, 8, can handle 160 req/s in a single executor process if each task is IO-bound and lasts on average 50ms. If you have long running CPU-bound tasks, you will want to run multiple executor processes (and set heartbeat_timeout to greater than your maximum expected task duration). Default
8
default_retries (int) – How many times to retry a task in case it raises an exception during execution. With 10 retries and the default
fennel.utils.backoff()
function, this will be approximately 30 days of retries. Default10
retry_backoff (Callable) – Which algorithm to use to determine the retry schedule. The default is exponential backoff via
fennel.utils.backoff()
.read_timeout (int) – How many milliseconds to wait for messages in the main task queue. Default
4000
prefetch_count (int) – How many messages to read in a single call to XREADGROUP. Default
1
heartbeat_timeout (float) – How many seconds before an executor is considered dead if heartbeats are missed. If you have long-running CPU-bound tasks, this value should be greater than your maximum expected task duration. Default
60
heartbeat_interval (float) – How many seconds to sleep between heartbeats are stored for each executor process. Default
6
schedule_interval (float) – How many seconds to sleep between polling for scheduled tasks. Default
4
maintenance_interval (float) – How many seconds to sleep between running the maintenance script. Default
8
task_timeout (int) – How long to wait for results to be computed when calling .get(), seconds. Default
10
grace_period (int) – How many seconds to wait for in-flight tasks to complete before forcefully exiting. Default:
30
restults_enabled (bool) – Whether to store results. Can be disabled if your only use-case is ‘fire-and-forget’. Default
True
results_ttl (int) – How long before expiring results in seconds. Default
3600
(one hour).log_format (str) – Whether to pretty print a human-readable log (“console”) or JSON (“json”). Default
'console'
log_level (str) – The minimum log level to emit. Default
'debug'
autodiscover (str) – The pattern for
pathlib.Path.glob()
to find modules containing task-decorated functions, which the worker must import on startup. Will be called relative to current working directory. Can be set to the empty string to disable. Default'**/tasks.py'
fennel.worker¶
fennel.client¶
A collection of synchronous classes and functions to interact with the Fennel system.
-
fennel.client.
purge_dead
(app, filter=<function <lambda>>, batchsize=100)[source]¶ Iterate over the dead-letter queue and delete any jobs for which filter(job) evaluates to True. The default is to delete all jobs.
-
fennel.client.
read_dead
(app, batchsize=100)[source]¶ Iterate over the dead-letter queue and return all job data.
-
fennel.client.
replay_dead
(app, filter=<function <lambda>>, batchsize=100)[source]¶ Iterate over the dead-letter queue and replay any jobs for which filter(job) evaluates to True. The default is to replay all jobs.
-
class
fennel.client.
AsyncResult
(job: fennel.job.Job, app)[source]¶ A handle for a task that is being processed by workers via the task queue.
Conceptually similar to the AsyncResult from the mutliprocessing library.
-
status
()[source]¶ Return the status of the task execution.
Examples
>>> @app.task >>> def bar(x) ... time.sleep(x) ... return x ... >>> x = bar.delay(5) >>> x.status() SENT >>> x.status() # After roughly 5 seconds... SUCCESS
-
get
(timeout: int = <object object>) → Any[source]¶ Wait for the result to become available and return it.
- Raises
fennel.exceptions.TaskFailed – If the original function raised an exception.
fennel.exceptions.Timeout – If > timeout seconds elapse before a result is available.
Examples
>>> @app.task(retries=0) >>> def foo(x): ... return x ... >>> x = foo.delay(7) >>> x.get() # Wait for the result. 7
Warning
You must have results storage enabled (
fennel.settings.Settings.results_enabled
)If you have retries enabled, they may be rescheduled many times, so you may prefer to use retries=0 for tasks whose result you intend to wait for.
-
-
class
fennel.client.
Task
(name: str, func: Callable, retries: int, app)[source]¶ -
delay
(*args: Any, **kwargs: Any) → fennel.client.results.AsyncResult[source]¶ Traditional Celery-like interface to enqueue a task for execution by the workers.
The args and kwargs will be passed through to the task when executed.
Examples
>>> @app.task >>> def foo(x, bar=None): ... time.sleep(x) ... if bar == "mystr": ... return False ... return True ... >>> foo.delay(1) >>> foo.delay(2, bar="mystr")
-
fennel.aio.client¶
A collection of asynchronous classes and functions, expected to be run in an asyncio-compatible event loop, to interact with the Fennel system.
-
async
fennel.client.aio.
purge_dead
(app, filter=<function <lambda>>, batchsize=100)[source]¶ Iterate over the dead-letter queue and delete any jobs for which filter(job) evaluates to True. The default is to delete all jobs.
-
async
fennel.client.aio.
read_dead
(app, batchsize=100)[source]¶ Iterate over the dead-letter queue and return all job data.
-
async
fennel.client.aio.
replay_dead
(app, filter=<function <lambda>>, batchsize=100)[source]¶ Iterate over the dead-letter queue and replay any jobs for which filter(job) evaluates to True. The default is to replay all jobs.
-
class
fennel.client.aio.
AsyncResult
(job: fennel.job.Job, app)[source]¶ A handle for a task that is being processed by workers via the task queue.
Conceptually similar to the AsyncResult from the mutliprocessing library.
-
async
status
()[source]¶ Return the status of the task execution.
Examples
>>> @app.task >>> async def bar(x) ... await asyncio.sleep(x) ... return x ... >>> x = await bar.delay(5) >>> await x.status() SENT >>> await x.status() # After roughly 5 seconds... SUCCESS
-
async
get
(timeout: int = <object object>) → Any[source]¶ Wait for the result to become available and return it.
- Raises
fennel.exceptions.TaskFailed – If the original function raised an exception.
fennel.exceptions.Timeout – If > timeout seconds elapse before a result is available.
Examples
>>> @app.task(retries=0) >>> def foo(x): ... return x ... >>> x = await foo.delay(7) >>> await x.get() # Wait for the result. 7
Warning
You must have results storage enabled (
fennel.settings.Settings.results_enabled
)If you have retries enabled, they may be rescheduled many times, so you may prefer to use retries=0 for tasks whose result you intend to wait for.
-
async
-
class
fennel.client.aio.
Task
(name: str, func: Callable, retries: int, app)[source]¶ -
async
delay
(*args: Any, **kwargs: Any) → fennel.client.aio.results.AsyncResult[source]¶ Enqueue a task for execution by the workers.
Similar to asyncio.create_task (but also works with non-async functions and runs on our Redis-backed task queue with distributed workers, automatic retry, and result storage with configurable TTL).
The args and kwargs will be passed through to the task when executed.
Examples
>>> @app.task(retries=1) >>> async def foo(x, bar=None): ... asyncio.sleep(x) ... if bar == "mystr": ... return False ... return True ... >>> await foo.delay(1) >>> await foo.delay(2, bar="mystr")
-
async
fennel.status¶
Jobs have a number of statuses through their lifecycle. This module contains the constants. If you have enqueued a task for execution, then you can obtain its status as follows:
>>> x = mytask.delay()
>>> x.status()
EXECUTING
-
fennel.status.
UNKNOWN
= 'UNKNOWN'¶ The job’s status is not stored in Redis. Presumably no action has been taken on the job.
-
fennel.status.
SENT
= 'SENT'¶ The job has been sent to Redis, but execution has not yet started.
-
fennel.status.
EXECUTING
= 'EXECUTING'¶ A worker has received the job from the queue and has begun executing it.
-
fennel.status.
SUCCESS
= 'SUCCESS'¶ Execution was successful and the job’s result is ready (if results storage is enabled).
-
fennel.status.
RETRY
= 'RETRY'¶ Execution was not successful (an exception was raised) and a retry is scheduled to occur in the future.
-
fennel.status.
DEAD
= 'DEAD'¶ Execution was not successful (an exception was raised) and retries have been exhausted, so the job is now in the dead-letter queue where it will remain until manual intervention (via the CLI or client code).
fennel.exceptions¶
-
exception
fennel.exceptions.
TaskFailed
(original_type: str, original_args: List)[source]¶ This exception is returned by worker processes which experienced an exception when executing a task.
- Parameters
original_type (str) – The name of the original exception, e.g.
'ValueError'
.original_args (List) – The arguments given to the original exception, e.g.
['Not found']
Examples
>>> @app.task(retries=0) >>> async def foo(n): ... raise Exception("baz") ... >>> x = await foo.delay(3) >>> try: ... result = await x.get() >>> except TaskFailed as e: ... assert e.original_type == "Exception" ... assert e.original_args == ["baz"]
-
exception
fennel.exceptions.
ResultsDisabled
[source]¶ Raised when
results_enabled=False
and code attempts to access a tasks result via.get()
.
-
exception
fennel.exceptions.
UnknownTask
[source]¶ Raised by a worker process if it is unable to find a Python function corresponding to the task it has read from the queue.
-
exception
fennel.exceptions.
Timeout
[source]¶ Raised by client code when a given timeout is exceeded when waiting for results to arrive.
fennel.utils¶
-
fennel.utils.
backoff
(retries: int, jitter: bool = True) → int[source]¶ Compute duration (seconds) to wait before retrying using exponential backoff with jitter based on the number of retries a message has already experienced.
The minimum returned value is 1s The maximum returned value is 604800s (7 days)
With max_retries=9, you will have roughly 30 days to fix and redeploy the the task code.
- Parameters
retries (int) – How many retries have already been attemped.
jitter (bool) – Whether to add random noise to the return value (recommended).
Notes
https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
fennel.job¶
-
class
fennel.job.
Job
(task: str, args: List, kwargs: Dict, tries: int = 0, max_retries: int = 9, exception: Dict = <factory>, return_value: Any = None, status: str = 'UNKNOWN', uuid: str = <factory>)[source]¶ The internal representation of a job.
- Parameters
task (str) – The name of the task. By default will use
f"{func.__module__}.{func.__qualname__}"
, where func is the Python callable.args (List) – The job’s args.
kwargs (Dict) – The job’s kwargs.
tries (int) – The number of attempted executions.
max_retries (int) – The maximum number of retries to attempt after failure.
exception (Dict) – Exception information for the latest failure, contains ‘original_type’ (str, e.g. ‘ValueError’) and ‘original_args’ (List, e.g. [‘Not found’]).
return_value (Any) – The return value of the Python callable when execution succeeds.
status (str) – One of
fennel.status
, the current lifecycle stage.uuid (str) – Base64-encoded unique identifier.