Source code for fennel.settings

import multiprocessing

from pydantic import BaseSettings, PyObject, validator


[docs]class Settings(BaseSettings): """ Settings can be configured via environment variables or keyword arguments for the fennel.App instance (which take priority). Examples -------- For environment variables, the prefix is ``FENNEL_``, for instance: | ``FENNEL_REDIS_URL=redis://127.0.0.1:6379`` | ``FENNEL_DEFAULT_RETRIES=3`` | ``FENNEL_RESULTS_ENABLED=true`` Or via App kwargs: >>> from fennel import App ... >>> app = App( ... name='myapp', ... redis_url='redis://127.0.0.1', ... default_retries=3, ... results_enabled=True, ... log_level='info', ... log_format='json', ... autodiscover='**/tasks.py', ... interface='sync', ... ) Parameters ---------- redis_url : str Redis URL. Default ``'redis://127.0.0.1:6369'`` interface : str Which client interface should we use -- sync or async? Default ``'sync'`` processes : int How many executor processes to run in each worker. Default ``multiprocessing.cpu_count()`` concurrency : int How many concurrent consumers to run (we make at least this many Redis connections) in each executor process. The default, 8, can handle 160 req/s in a single executor process if each task is IO-bound and lasts on average 50ms. If you have long running CPU-bound tasks, you will want to run multiple executor processes (and set heartbeat_timeout to greater than your maximum expected task duration). Default ``8`` default_retries : int How many times to retry a task in case it raises an exception during execution. With 10 retries and the default :func:`fennel.utils.backoff` function, this will be approximately 30 days of retries. Default ``10`` retry_backoff : Callable Which algorithm to use to determine the retry schedule. The default is exponential backoff via :func:`fennel.utils.backoff`. read_timeout : int How many milliseconds to wait for messages in the main task queue. Default ``4000`` prefetch_count : int How many messages to read in a single call to `XREADGROUP`. Default ``1`` heartbeat_timeout : float How many seconds before an executor is considered dead if heartbeats are missed. If you have long-running CPU-bound tasks, this value should be greater than your maximum expected task duration. Default ``60`` heartbeat_interval : float How many seconds to sleep between heartbeats are stored for each executor process. Default ``6`` schedule_interval : float How many seconds to sleep between polling for scheduled tasks. Default ``4`` maintenance_interval : float How many seconds to sleep between running the maintenance script. Default ``8`` task_timeout : int How long to wait for results to be computed when calling .get(), seconds. Default ``10`` grace_period : int How many seconds to wait for in-flight tasks to complete before forcefully exiting. Default: ``30`` restults_enabled : bool Whether to store results. Can be disabled if your only use-case is 'fire-and-forget'. Default ``True`` results_ttl : int How long before expiring results in seconds. Default ``3600`` (one hour). log_format : str Whether to pretty print a human-readable log ("console") or JSON ("json"). Default ``'console'`` log_level : str The minimum log level to emit. Default ``'debug'`` autodiscover : str The pattern for :func:`pathlib.Path.glob` to find modules containing task-decorated functions, which the worker must import on startup. Will be called relative to current working directory. Can be set to the empty string to disable. Default ``'**/tasks.py'`` """ class Config: env_prefix = "FENNEL_" case_insensitive = True redis_url: str = "redis://127.0.0.1:6379" interface: str = "sync" processes: int = multiprocessing.cpu_count() concurrency: int = 8 default_retries: int = 10 retry_backoff: PyObject = "fennel.utils.backoff" read_timeout = 4000 prefetch_count: int = 1 heartbeat_timeout: float = 60 heartbeat_interval: float = 6 schedule_interval: float = 4 maintenance_interval: float = 8 task_timeout: int = 10 grace_period: int = 30 results_enabled: bool = True results_ttl: int = 60 * 60 log_format: str = "console" log_level: str = "debug" autodiscover: str = "**/tasks.py" @validator("interface") def is_valid_interface(cls, value): assert value.lower() in ["sync", "async"] return value.lower() @validator("log_format") def is_valid_format(cls, value): assert value.lower() in ["console", "json"] return value.lower() @validator("autodiscover") def matches_python(cls, value): assert value == "" or value.endswith(".py") return value