Source code for fennel.utils

import base64
import importlib
import multiprocessing as mp
import random
import time
import uuid
from contextlib import contextmanager
from typing import Any, Generator

import aredis
import redis

EMPTY = object()  # A unique value used to distinguish emptiness from None

[docs]def backoff(retries: int, jitter: bool = True) -> int: """ Compute duration (seconds) to wait before retrying using exponential backoff with jitter based on the number of retries a message has already experienced. The minimum returned value is 1s The maximum returned value is 604800s (7 days) With max_retries=9, you will have roughly 30 days to fix and redeploy the the task code. Parameters ---------- retries : int How many retries have already been attemped. jitter : bool Whether to add random noise to the return value (recommended). Notes ----- """ x = 6**(retries + 1) if jitter: x = random.randrange(x // 3, x * 2) return min(604_800, x)
def base64uuid() -> str: return base64.urlsafe_b64encode(uuid.uuid4().bytes).rstrip(b"=").decode("ascii") def now() -> float: return time.time() @contextmanager def duration(logger, event: str, **extra: Any) -> Generator:"started-{event}", **extra) start = time.perf_counter() yield duration = time.perf_counter() - start"ended-{event}", duration=duration, **extra) def get_object(name: str) -> Any: module_name, instance_name = name.split(":", 1) module = importlib.import_module(module_name) return getattr(module, instance_name) def get_redis(app): return redis.Redis.from_url( app.settings.redis_url, decode_responses=True, ) def get_aredis(app): return aredis.StrictRedis.from_url( app.settings.redis_url, decode_responses=True, ) def get_mp_context(): # The default changed to 'spawn' in 3.8 on macOS due to return mp.get_context("fork")