import base64
import importlib
import multiprocessing as mp
import random
import time
import uuid
from contextlib import contextmanager
from typing import Any, Generator

import aredis
import redis

EMPTY = object()  # A unique value used to distinguish emptiness from None

[docs]def backoff(retries: int, jitter: bool = True) -> int: """ Compute duration (seconds) to wait before retrying using exponential backoff with jitter based on the number of retries a message has already experienced. The minimum returned value is 1s The maximum returned value is 604800s (7 days) With max_retries=9, you will have roughly 30 days to fix and redeploy the the task code. Parameters ---------- retries : int How many retries have already been attemped. jitter : bool Whether to add random noise to the return value (recommended). Notes ----- """ x = 6**(retries + 1) if jitter: x = random.randrange(x // 3, x * 2) return min(604_800, x)
def base64uuid() -> str: return base64.urlsafe_b64encode(uuid.uuid4().bytes).rstrip(b"=").decode("ascii") def now() -> float: return time.time() @contextmanager def duration(logger, event: str, **extra: Any) -> Generator:"started-{event}", **extra) start = time.perf_counter() yield duration = time.perf_counter() - start"ended-{event}", duration=duration, **extra) def get_object(name: str) -> Any: module_name, instance_name = name.split(":", 1) module = importlib.import_module(module_name) return getattr(module, instance_name) def get_redis(app): return redis.Redis.from_url( app.settings.redis_url, decode_responses=True, ) def get_aredis(app): return aredis.StrictRedis.from_url( app.settings.redis_url, decode_responses=True, ) def get_mp_context(): # The default changed to 'spawn' in 3.8 on macOS due to return mp.get_context("fork")