Source code for fennel.job

import json
from dataclasses import asdict, field, replace
from typing import Any, Dict, List

from pydantic import validator
from pydantic.dataclasses import dataclass

from fennel.status import UNKNOWN
from fennel.utils import base64uuid

[docs]@dataclass class Job: """ The internal representation of a job. Parameters ---------- task : str The name of the task. By default will use ``f"{func.__module__}.{func.__qualname__}"``, where `func` is the Python callable. args : List The job's args. kwargs : Dict The job's kwargs. tries : int The number of attempted executions. max_retries : int The maximum number of retries to attempt after failure. exception : Dict Exception information for the latest failure, contains 'original_type' (str, e.g. 'ValueError') and 'original_args' (List, e.g. ['Not found']). return_value : Any The return value of the Python callable when execution succeeds. status : str One of :mod:`fennel.status`, the current lifecycle stage. uuid : str Base64-encoded unique identifier. """ task: str args: List kwargs: Dict tries: int = 0 max_retries: int = 9 exception: Dict = field(default_factory=dict) return_value: Any = None status: str = UNKNOWN uuid: str = field(default_factory=base64uuid) @validator("args", "kwargs", "exception", "return_value", pre=True, whole=True) def json_decode(cls, v): if isinstance(v, str): try: return json.loads(v) except ValueError: pass return v def increment(self) -> "Job": return replace(self, tries=self.tries + 1) def replace(self, **kwargs): return replace(self, **kwargs) def to_json(self) -> str: return json.dumps(asdict(self)) @classmethod def from_json(cls, s: str) -> "Job": return cls(**json.loads(s)) def serialise(self) -> Dict: d = asdict(self) d["args"] = json.dumps(d["args"]) d["kwargs"] = json.dumps(d["kwargs"]) d["exception"] = json.dumps(d["exception"]) d["return_value"] = json.dumps(d["return_value"]) return d @classmethod def deserialise(cls, fields: Dict) -> "Job": return cls(**fields) @property def result(self) -> str: return json.dumps({ "return_value": self.return_value, "exception": self.exception, })