just_jobs

Python package just_jobs provides a lightweight asynchronous Python job executor. Using Redis by default (but not exclusivly, via custom Brokers), it is a smaller and production-ready alternative to Celery for applications where distributed microservices are overkill.

What is a "job"?

A job is any exposed function, asynchronous coroutine, or generic callable that has been queued to a worker for delayed execution by calling Manager.enqueue.

"Exposed" means the callable must be importable through a module-reference FQN string, such as "just_jobs.Manager.enqueue". This is due to a limitation with Python pickling.

 1"""
 2Python package `just_jobs` provides a lightweight asynchronous Python job executor.
 3Using Redis by default (but not exclusivly, via custom Brokers), it is a smaller and
 4production-ready alternative to Celery for applications where distributed microservices
 5are overkill.
 6
 7## What is a "job"?
 8
 9A job is any exposed function, asynchronous coroutine, or generic callable that has
10been queued to a worker for delayed execution by calling `Manager.enqueue`.
11
12"Exposed" means the callable must be importable through a module-reference
13FQN string, such as "just_jobs.Manager.enqueue". This is due to a limitation with Python
14[pickling](https://docs.python.org/3/library/pickle.html).
15"""
16
17from .brokers import Broker, RedisBroker
18from .manager import Manager
19
20__all__ = ["Manager", "Broker", "RedisBroker"]
class Manager:
 13class Manager:
 14    """The entrypoint for all job queueing and brokering within an application."""
 15
 16    def __init__(
 17        self,
 18        broker: Type[Broker] = RedisBroker,
 19        queue_names: Optional[List[str]] = None,
 20        **bkwargs,
 21    ):
 22        self.queue_names = queue_names or ["default"]
 23        """
 24        The queue names to which jobs will be placed.
 25
 26        It is ***highly*** recommended to set this list of queue names to
 27        logically / functionally separate action-flows in your application.
 28        This is because jobs in a single queue are processed sequentially
 29        (albeit in asyncio-parallel). Defining separate queues for unrelated
 30        actions ensures that a job in one queue does not block the unrelated
 31        job from executing.
 32        """
 33
 34        self.broker = broker(**bkwargs)
 35        """
 36        The broker class to instantiate. `RedisBroker` by default, but may be
 37        replaced with a custom `Broker` if desired.
 38        """
 39        self.bkwargs = bkwargs
 40        """
 41        Any keyword arguments to pass to the `broker` during initialization."""
 42
 43        self.processes: List[Process] = []
 44        self._initialized = False
 45
 46    async def startup(self):
 47        """
 48        Startup the broker to allow it to perform any initial actions, like connecting
 49        to a database. Also spawn/fork the processing queues and register them
 50        with the manager, to allow for graceful cleanup during shutdown.
 51        """
 52        # allow the broker to startup whatever it needs
 53        await self.broker.startup()
 54
 55        # spawn listening worker processes with communication events for cleanup
 56        self._pevent = Event()
 57        for queue_name in self.queue_names:
 58            p = Process(
 59                name=f"brokingworker-{queue_name}",
 60                target=self.broker._spawn_worker,
 61                args=(self._pevent, queue_name),
 62                kwargs=self.bkwargs,
 63            )
 64            self.processes.append(p)
 65            p.start()
 66
 67        self._initialized = True
 68
 69    async def shutdown(self):
 70        """
 71        Signal all processing workers to shutdown and wait for them to cleanup. Also
 72        gracefully shutdown the broker. This must be called after startup.
 73        """
 74        if not self._initialized:
 75            raise NotReadyException()
 76
 77        # shutdown the processes by setting their locking events rather than
 78        # sending a SIGTERM, to allow the workers to properly cleanup their resources
 79        # and connections.
 80        self._pevent.set()
 81        for process in self.processes:
 82            process.join()
 83
 84        await self.broker.shutdown()
 85        self._initialized = False
 86
 87    async def enqueue(
 88        self, func: Callable, *args, queue_name: str = "default", **kwargs
 89    ):
 90        """
 91        Enqueues the given function and its arguments for asynchronous execution
 92        sometime in the near future. All functions are wrapped and serialized. No
 93        guarantee is given for WHEN an enqueued job will be run, only that it will
 94        (or at least, attempted). The time until execution will depend on the number
 95        of enqueued tasks, their complexities, and the rest of the workload present on
 96        the system.
 97        """
 98        if not self._initialized:
 99            raise NotReadyException()
100        elif queue_name not in self.queue_names:
101            raise InvalidQueueException()
102        elif not callable(func):
103            raise InvalidEnqueueableFunction()
104
105        # wrap in a partial for serialization
106        partial = functools.partial(func, *args, **kwargs)
107        serialized = pickle.dumps(partial)
108
109        # request the storage broker dispatch the serialized job to the correct
110        # worker thread
111        return await self.broker.enqueue(queue_name, serialized)
112
113    async def __aenter__(self):
114        await self.startup()
115        return self
116
117    async def __aexit__(self, exc_type, exc, tb):
118        await self.shutdown()

The entrypoint for all job queueing and brokering within an application.

Manager( broker: Type[just_jobs.brokers.base.Broker] = <class 'just_jobs.brokers.redis.RedisBroker'>, queue_names: Optional[List[str]] = None, **bkwargs)
16    def __init__(
17        self,
18        broker: Type[Broker] = RedisBroker,
19        queue_names: Optional[List[str]] = None,
20        **bkwargs,
21    ):
22        self.queue_names = queue_names or ["default"]
23        """
24        The queue names to which jobs will be placed.
25
26        It is ***highly*** recommended to set this list of queue names to
27        logically / functionally separate action-flows in your application.
28        This is because jobs in a single queue are processed sequentially
29        (albeit in asyncio-parallel). Defining separate queues for unrelated
30        actions ensures that a job in one queue does not block the unrelated
31        job from executing.
32        """
33
34        self.broker = broker(**bkwargs)
35        """
36        The broker class to instantiate. `RedisBroker` by default, but may be
37        replaced with a custom `Broker` if desired.
38        """
39        self.bkwargs = bkwargs
40        """
41        Any keyword arguments to pass to the `broker` during initialization."""
42
43        self.processes: List[Process] = []
44        self._initialized = False
queue_names

The queue names to which jobs will be placed.

It is highly recommended to set this list of queue names to logically / functionally separate action-flows in your application. This is because jobs in a single queue are processed sequentially (albeit in asyncio-parallel). Defining separate queues for unrelated actions ensures that a job in one queue does not block the unrelated job from executing.

broker

The broker class to instantiate. RedisBroker by default, but may be replaced with a custom Broker if desired.

bkwargs

Any keyword arguments to pass to the broker during initialization.

async def startup(self)
46    async def startup(self):
47        """
48        Startup the broker to allow it to perform any initial actions, like connecting
49        to a database. Also spawn/fork the processing queues and register them
50        with the manager, to allow for graceful cleanup during shutdown.
51        """
52        # allow the broker to startup whatever it needs
53        await self.broker.startup()
54
55        # spawn listening worker processes with communication events for cleanup
56        self._pevent = Event()
57        for queue_name in self.queue_names:
58            p = Process(
59                name=f"brokingworker-{queue_name}",
60                target=self.broker._spawn_worker,
61                args=(self._pevent, queue_name),
62                kwargs=self.bkwargs,
63            )
64            self.processes.append(p)
65            p.start()
66
67        self._initialized = True

Startup the broker to allow it to perform any initial actions, like connecting to a database. Also spawn/fork the processing queues and register them with the manager, to allow for graceful cleanup during shutdown.

async def shutdown(self)
69    async def shutdown(self):
70        """
71        Signal all processing workers to shutdown and wait for them to cleanup. Also
72        gracefully shutdown the broker. This must be called after startup.
73        """
74        if not self._initialized:
75            raise NotReadyException()
76
77        # shutdown the processes by setting their locking events rather than
78        # sending a SIGTERM, to allow the workers to properly cleanup their resources
79        # and connections.
80        self._pevent.set()
81        for process in self.processes:
82            process.join()
83
84        await self.broker.shutdown()
85        self._initialized = False

Signal all processing workers to shutdown and wait for them to cleanup. Also gracefully shutdown the broker. This must be called after startup.

async def enqueue(self, func: Callable, *args, queue_name: str = 'default', **kwargs)
 87    async def enqueue(
 88        self, func: Callable, *args, queue_name: str = "default", **kwargs
 89    ):
 90        """
 91        Enqueues the given function and its arguments for asynchronous execution
 92        sometime in the near future. All functions are wrapped and serialized. No
 93        guarantee is given for WHEN an enqueued job will be run, only that it will
 94        (or at least, attempted). The time until execution will depend on the number
 95        of enqueued tasks, their complexities, and the rest of the workload present on
 96        the system.
 97        """
 98        if not self._initialized:
 99            raise NotReadyException()
100        elif queue_name not in self.queue_names:
101            raise InvalidQueueException()
102        elif not callable(func):
103            raise InvalidEnqueueableFunction()
104
105        # wrap in a partial for serialization
106        partial = functools.partial(func, *args, **kwargs)
107        serialized = pickle.dumps(partial)
108
109        # request the storage broker dispatch the serialized job to the correct
110        # worker thread
111        return await self.broker.enqueue(queue_name, serialized)

Enqueues the given function and its arguments for asynchronous execution sometime in the near future. All functions are wrapped and serialized. No guarantee is given for WHEN an enqueued job will be run, only that it will (or at least, attempted). The time until execution will depend on the number of enqueued tasks, their complexities, and the rest of the workload present on the system.

class Broker(abc.ABC):
 10class Broker(ABC):
 11    """
 12    An abtract Broker interface used to define custom functionality. Every
 13    Broker class must inherit from this interface and override the defined
 14    abstract methods.
 15    """
 16
 17    def __init__(self, coroutines_per_worker: int = 20):
 18        # settings
 19        self.coroutines_per_worker = coroutines_per_worker
 20        """
 21        The number of coroutine working tasks to spawn per queue. Each coroutine
 22        processes and runs jobs atomically, and their parallelism is handeled by
 23        asyncio. Defaults to 20.
 24        """
 25
 26        # operational
 27        self.is_worker: bool = False
 28        """
 29        Set by the `Manager` to mark this broker as a worker responsible for
 30        processing jobs. May be used during startup and shutdown to do
 31        worker-specific actions.
 32        """
 33        self.loop: Optional[asyncio.AbstractEventLoop] = None
 34
 35    @abstractmethod
 36    async def startup(self):
 37        """
 38        Performs any setup required by the storage broker, like connecting to a
 39        database.
 40        """
 41        raise NotImplementedError("Storage brokers must define a startup process.")
 42
 43    @abstractmethod
 44    async def shutdown(self):
 45        """
 46        Performs any shutdown required by the broker, like ensuring
 47        disconnection from a database.
 48        """
 49        raise NotImplementedError("Storage brokers must define a shutdown process.")
 50
 51    @abstractmethod
 52    async def enqueue(self, queue_name: str, job: bytes):
 53        """
 54        Enqueues the given serialized job to the provided queue for later
 55        processing.
 56        """
 57        raise NotImplementedError(
 58            "Storage brokers must define a way to enqueue serialized jobs."
 59        )
 60
 61    @abstractmethod
 62    async def process_jobs(self, queue_name: str):
 63        """
 64        Infinitly polls for new jobs pushed the given queue and attempts to run
 65        them via `Broker.run_job`. Jobs must be dequeued atomically. This
 66        method is also responsible for determining what to do if a job fails,
 67        like re-adding it to the queue.
 68
 69        ~ See `RedisBroker.process_jobs` for an example.
 70        """
 71        raise NotImplementedError(
 72            "Storage brokers must define a way to process queued jobs. "
 73            "This should run forever."
 74        )
 75
 76    async def run_job(self, job: bytes) -> bool:
 77        """
 78        Loads and executes a job, either directly in the event loop if it's a
 79        coroutine or in a threadpool if it isn't. Returns if the job ran
 80        successfully or not.
 81        """
 82        partial = pickle.loads(job)
 83
 84        try:
 85            # run the job as a coroutine or in a threadpool
 86            if asyncio.iscoroutinefunction(partial.func):
 87                await partial()
 88            elif self.loop:
 89                await self.loop.run_in_executor(None, partial)
 90
 91            return True
 92        except Exception:
 93            # TODO: what happens when a job fails? probably should keep track
 94            # somewhere with a retry counter instead of leaving it pending
 95            return False
 96
 97    @classmethod
 98    def _spawn_worker(cls, event: Event, queue_name: str, **kwargs):
 99        loop = asyncio.new_event_loop()
100        loop_thread = threading.Thread(target=loop.run_forever)
101        loop_thread.start()
102
103        # create worker and wait for it to startup
104        worker = cls(**kwargs)
105        worker.is_worker = True
106        worker.loop = loop
107        asyncio.run_coroutine_threadsafe(worker.startup(), loop).result()
108
109        # spawn N working coroutines
110        futs = [
111            asyncio.run_coroutine_threadsafe(worker.process_jobs(queue_name), loop)
112            for _ in range(worker.coroutines_per_worker)
113        ]
114
115        # block until the shutdown event is set
116        event.wait()
117
118        # cancel all worker coroutines
119        for fut in futs:
120            loop.call_soon_threadsafe(fut.cancel)
121
122        # wait for the worker shutdown to complete
123        asyncio.run_coroutine_threadsafe(worker.shutdown(), loop).result()
124
125        # cleanup the loop
126        loop.call_soon_threadsafe(loop.stop)
127        loop_thread.join()
128        loop.close()

An abtract Broker interface used to define custom functionality. Every Broker class must inherit from this interface and override the defined abstract methods.

coroutines_per_worker

The number of coroutine working tasks to spawn per queue. Each coroutine processes and runs jobs atomically, and their parallelism is handeled by asyncio. Defaults to 20.

is_worker: bool

Set by the Manager to mark this broker as a worker responsible for processing jobs. May be used during startup and shutdown to do worker-specific actions.

@abstractmethod
async def startup(self)
35    @abstractmethod
36    async def startup(self):
37        """
38        Performs any setup required by the storage broker, like connecting to a
39        database.
40        """
41        raise NotImplementedError("Storage brokers must define a startup process.")

Performs any setup required by the storage broker, like connecting to a database.

@abstractmethod
async def shutdown(self)
43    @abstractmethod
44    async def shutdown(self):
45        """
46        Performs any shutdown required by the broker, like ensuring
47        disconnection from a database.
48        """
49        raise NotImplementedError("Storage brokers must define a shutdown process.")

Performs any shutdown required by the broker, like ensuring disconnection from a database.

@abstractmethod
async def enqueue(self, queue_name: str, job: bytes)
51    @abstractmethod
52    async def enqueue(self, queue_name: str, job: bytes):
53        """
54        Enqueues the given serialized job to the provided queue for later
55        processing.
56        """
57        raise NotImplementedError(
58            "Storage brokers must define a way to enqueue serialized jobs."
59        )

Enqueues the given serialized job to the provided queue for later processing.

@abstractmethod
async def process_jobs(self, queue_name: str)
61    @abstractmethod
62    async def process_jobs(self, queue_name: str):
63        """
64        Infinitly polls for new jobs pushed the given queue and attempts to run
65        them via `Broker.run_job`. Jobs must be dequeued atomically. This
66        method is also responsible for determining what to do if a job fails,
67        like re-adding it to the queue.
68
69        ~ See `RedisBroker.process_jobs` for an example.
70        """
71        raise NotImplementedError(
72            "Storage brokers must define a way to process queued jobs. "
73            "This should run forever."
74        )

Infinitly polls for new jobs pushed the given queue and attempts to run them via Broker.run_job. Jobs must be dequeued atomically. This method is also responsible for determining what to do if a job fails, like re-adding it to the queue.

~ See RedisBroker.process_jobs for an example.

async def run_job(self, job: bytes) -> bool:
76    async def run_job(self, job: bytes) -> bool:
77        """
78        Loads and executes a job, either directly in the event loop if it's a
79        coroutine or in a threadpool if it isn't. Returns if the job ran
80        successfully or not.
81        """
82        partial = pickle.loads(job)
83
84        try:
85            # run the job as a coroutine or in a threadpool
86            if asyncio.iscoroutinefunction(partial.func):
87                await partial()
88            elif self.loop:
89                await self.loop.run_in_executor(None, partial)
90
91            return True
92        except Exception:
93            # TODO: what happens when a job fails? probably should keep track
94            # somewhere with a retry counter instead of leaving it pending
95            return False

Loads and executes a job, either directly in the event loop if it's a coroutine or in a threadpool if it isn't. Returns if the job ran successfully or not.

class RedisBroker(just_jobs.Broker):
 9class RedisBroker(Broker):
10    """
11    Production-ready job broker using redis-py for queue management
12    and job persistence.
13    """
14
15    def __init__(
16        self,
17        shutdown_with_pool: bool = True,
18        url: Optional[str] = None,
19        connection_pool: Optional[ConnectionPool] = None,
20        **kwargs,
21    ):
22        super().__init__(coroutines_per_worker=kwargs.pop("coroutines_per_worker", 20))
23        self.url = url
24        """
25        A valid fully-qualified URL to a Redis instance, of one of the following
26        formats. Mutually exclusive with `connection_pool`.
27
28        ```plain
29        redis://[[username]:[password]]@localhost:6379/0
30        rediss://[[username]:[password]]@localhost:6379/0
31        unix://[[username]:[password]]@/path/to/socket.sock?db=0
32        ```
33        """
34        self.connection_pool = connection_pool
35        """
36        A valid connection pool to a Redis instance.
37        """
38        self.shutdown_with_pool = shutdown_with_pool
39        """
40        Indicates that the connection pool should release all connections and
41        disconnect when the broker is shutdown. Defaults to true.
42        """
43        self.kwargs = kwargs
44        """
45        Any keyword arguments to pass through to the Redis instance during
46        creation.
47        """
48
49    async def startup(self):
50        # creates a async redis instance by either a supplied connection pool or by url
51        if self.connection_pool:
52            self.redis: Redis = Redis(
53                connection_pool=self.connection_pool, **self.kwargs
54            )
55        elif self.url:
56            self.redis: Redis = Redis.from_url(self.url, **self.kwargs)
57            self.connection_pool = self.redis.connection_pool
58        else:
59            raise ValueError(
60                "You must specify either a valid Redis URL or existing connection pool!"
61            )
62
63    async def shutdown(self):
64        # shutdowns all redis connections
65        await self.redis.close()
66        if self.connection_pool and self.shutdown_with_pool:
67            await self.connection_pool.disconnect()
68
69    async def enqueue(self, queue_name: str, job: bytes):
70        return await self.redis.rpush(f"jobqueue:{queue_name}", job)  # type: ignore
71
72    async def process_jobs(self, queue_name: str):
73        jqueue = f"jobqueue:{queue_name}"
74        pqueue = f"{jqueue}-processing"
75
76        while True:
77            # deserialize the queued job for processing
78            serialized = await self.redis.brpoplpush(jqueue, pqueue, timeout=5)
79
80            if serialized:
81                # run the job
82                successful = await super().run_job(serialized)
83                if successful:
84                    # remove from the processing queue
85                    await self.redis.lrem(pqueue, 0, serialized)  # type: ignore

Production-ready job broker using redis-py for queue management and job persistence.

RedisBroker( shutdown_with_pool: bool = True, url: Optional[str] = None, connection_pool: Optional[redis.asyncio.connection.ConnectionPool] = None, **kwargs)
15    def __init__(
16        self,
17        shutdown_with_pool: bool = True,
18        url: Optional[str] = None,
19        connection_pool: Optional[ConnectionPool] = None,
20        **kwargs,
21    ):
22        super().__init__(coroutines_per_worker=kwargs.pop("coroutines_per_worker", 20))
23        self.url = url
24        """
25        A valid fully-qualified URL to a Redis instance, of one of the following
26        formats. Mutually exclusive with `connection_pool`.
27
28        ```plain
29        redis://[[username]:[password]]@localhost:6379/0
30        rediss://[[username]:[password]]@localhost:6379/0
31        unix://[[username]:[password]]@/path/to/socket.sock?db=0
32        ```
33        """
34        self.connection_pool = connection_pool
35        """
36        A valid connection pool to a Redis instance.
37        """
38        self.shutdown_with_pool = shutdown_with_pool
39        """
40        Indicates that the connection pool should release all connections and
41        disconnect when the broker is shutdown. Defaults to true.
42        """
43        self.kwargs = kwargs
44        """
45        Any keyword arguments to pass through to the Redis instance during
46        creation.
47        """
url

A valid fully-qualified URL to a Redis instance, of one of the following formats. Mutually exclusive with connection_pool.

redis://[[username]:[password]]@localhost:6379/0
rediss://[[username]:[password]]@localhost:6379/0
unix://[[username]:[password]]@/path/to/socket.sock?db=0
connection_pool

A valid connection pool to a Redis instance.

shutdown_with_pool

Indicates that the connection pool should release all connections and disconnect when the broker is shutdown. Defaults to true.

kwargs

Any keyword arguments to pass through to the Redis instance during creation.

async def startup(self)
49    async def startup(self):
50        # creates a async redis instance by either a supplied connection pool or by url
51        if self.connection_pool:
52            self.redis: Redis = Redis(
53                connection_pool=self.connection_pool, **self.kwargs
54            )
55        elif self.url:
56            self.redis: Redis = Redis.from_url(self.url, **self.kwargs)
57            self.connection_pool = self.redis.connection_pool
58        else:
59            raise ValueError(
60                "You must specify either a valid Redis URL or existing connection pool!"
61            )

Performs any setup required by the storage broker, like connecting to a database.

async def shutdown(self)
63    async def shutdown(self):
64        # shutdowns all redis connections
65        await self.redis.close()
66        if self.connection_pool and self.shutdown_with_pool:
67            await self.connection_pool.disconnect()

Performs any shutdown required by the broker, like ensuring disconnection from a database.

async def enqueue(self, queue_name: str, job: bytes)
69    async def enqueue(self, queue_name: str, job: bytes):
70        return await self.redis.rpush(f"jobqueue:{queue_name}", job)  # type: ignore

Enqueues the given serialized job to the provided queue for later processing.

async def process_jobs(self, queue_name: str)
72    async def process_jobs(self, queue_name: str):
73        jqueue = f"jobqueue:{queue_name}"
74        pqueue = f"{jqueue}-processing"
75
76        while True:
77            # deserialize the queued job for processing
78            serialized = await self.redis.brpoplpush(jqueue, pqueue, timeout=5)
79
80            if serialized:
81                # run the job
82                successful = await super().run_job(serialized)
83                if successful:
84                    # remove from the processing queue
85                    await self.redis.lrem(pqueue, 0, serialized)  # type: ignore

Infinitly polls for new jobs pushed the given queue and attempts to run them via Broker.run_job. Jobs must be dequeued atomically. This method is also responsible for determining what to do if a job fails, like re-adding it to the queue.

~ See RedisBroker.process_jobs for an example.