just_jobs
Python package just_jobs
provides a lightweight asynchronous Python job executor.
Using Redis by default (but not exclusivly, via custom Brokers), it is a smaller and
production-ready alternative to Celery for applications where distributed microservices
are overkill.
What is a "job"?
A job is any exposed function, asynchronous coroutine, or generic callable that has
been queued to a worker for delayed execution by calling Manager.enqueue
.
"Exposed" means the callable must be importable through a module-reference FQN string, such as "just_jobs.Manager.enqueue". This is due to a limitation with Python pickling.
1""" 2Python package `just_jobs` provides a lightweight asynchronous Python job executor. 3Using Redis by default (but not exclusivly, via custom Brokers), it is a smaller and 4production-ready alternative to Celery for applications where distributed microservices 5are overkill. 6 7## What is a "job"? 8 9A job is any exposed function, asynchronous coroutine, or generic callable that has 10been queued to a worker for delayed execution by calling `Manager.enqueue`. 11 12"Exposed" means the callable must be importable through a module-reference 13FQN string, such as "just_jobs.Manager.enqueue". This is due to a limitation with Python 14[pickling](https://docs.python.org/3/library/pickle.html). 15""" 16 17from .brokers import Broker, RedisBroker 18from .manager import Manager 19 20__all__ = ["Manager", "Broker", "RedisBroker"]
13class Manager: 14 """The entrypoint for all job queueing and brokering within an application.""" 15 16 def __init__( 17 self, 18 broker: Type[Broker] = RedisBroker, 19 queue_names: Optional[List[str]] = None, 20 **bkwargs, 21 ): 22 self.queue_names = queue_names or ["default"] 23 """ 24 The queue names to which jobs will be placed. 25 26 It is ***highly*** recommended to set this list of queue names to 27 logically / functionally separate action-flows in your application. 28 This is because jobs in a single queue are processed sequentially 29 (albeit in asyncio-parallel). Defining separate queues for unrelated 30 actions ensures that a job in one queue does not block the unrelated 31 job from executing. 32 """ 33 34 self.broker = broker(**bkwargs) 35 """ 36 The broker class to instantiate. `RedisBroker` by default, but may be 37 replaced with a custom `Broker` if desired. 38 """ 39 self.bkwargs = bkwargs 40 """ 41 Any keyword arguments to pass to the `broker` during initialization.""" 42 43 self.processes: List[Process] = [] 44 self._initialized = False 45 46 async def startup(self): 47 """ 48 Startup the broker to allow it to perform any initial actions, like connecting 49 to a database. Also spawn/fork the processing queues and register them 50 with the manager, to allow for graceful cleanup during shutdown. 51 """ 52 # allow the broker to startup whatever it needs 53 await self.broker.startup() 54 55 # spawn listening worker processes with communication events for cleanup 56 self._pevent = Event() 57 for queue_name in self.queue_names: 58 p = Process( 59 name=f"brokingworker-{queue_name}", 60 target=self.broker._spawn_worker, 61 args=(self._pevent, queue_name), 62 kwargs=self.bkwargs, 63 ) 64 self.processes.append(p) 65 p.start() 66 67 self._initialized = True 68 69 async def shutdown(self): 70 """ 71 Signal all processing workers to shutdown and wait for them to cleanup. Also 72 gracefully shutdown the broker. This must be called after startup. 73 """ 74 if not self._initialized: 75 raise NotReadyException() 76 77 # shutdown the processes by setting their locking events rather than 78 # sending a SIGTERM, to allow the workers to properly cleanup their resources 79 # and connections. 80 self._pevent.set() 81 for process in self.processes: 82 process.join() 83 84 await self.broker.shutdown() 85 self._initialized = False 86 87 async def enqueue( 88 self, func: Callable, *args, queue_name: str = "default", **kwargs 89 ): 90 """ 91 Enqueues the given function and its arguments for asynchronous execution 92 sometime in the near future. All functions are wrapped and serialized. No 93 guarantee is given for WHEN an enqueued job will be run, only that it will 94 (or at least, attempted). The time until execution will depend on the number 95 of enqueued tasks, their complexities, and the rest of the workload present on 96 the system. 97 """ 98 if not self._initialized: 99 raise NotReadyException() 100 elif queue_name not in self.queue_names: 101 raise InvalidQueueException() 102 elif not callable(func): 103 raise InvalidEnqueueableFunction() 104 105 # wrap in a partial for serialization 106 partial = functools.partial(func, *args, **kwargs) 107 serialized = pickle.dumps(partial) 108 109 # request the storage broker dispatch the serialized job to the correct 110 # worker thread 111 return await self.broker.enqueue(queue_name, serialized) 112 113 async def __aenter__(self): 114 await self.startup() 115 return self 116 117 async def __aexit__(self, exc_type, exc, tb): 118 await self.shutdown()
The entrypoint for all job queueing and brokering within an application.
16 def __init__( 17 self, 18 broker: Type[Broker] = RedisBroker, 19 queue_names: Optional[List[str]] = None, 20 **bkwargs, 21 ): 22 self.queue_names = queue_names or ["default"] 23 """ 24 The queue names to which jobs will be placed. 25 26 It is ***highly*** recommended to set this list of queue names to 27 logically / functionally separate action-flows in your application. 28 This is because jobs in a single queue are processed sequentially 29 (albeit in asyncio-parallel). Defining separate queues for unrelated 30 actions ensures that a job in one queue does not block the unrelated 31 job from executing. 32 """ 33 34 self.broker = broker(**bkwargs) 35 """ 36 The broker class to instantiate. `RedisBroker` by default, but may be 37 replaced with a custom `Broker` if desired. 38 """ 39 self.bkwargs = bkwargs 40 """ 41 Any keyword arguments to pass to the `broker` during initialization.""" 42 43 self.processes: List[Process] = [] 44 self._initialized = False
The queue names to which jobs will be placed.
It is highly recommended to set this list of queue names to logically / functionally separate action-flows in your application. This is because jobs in a single queue are processed sequentially (albeit in asyncio-parallel). Defining separate queues for unrelated actions ensures that a job in one queue does not block the unrelated job from executing.
The broker class to instantiate. RedisBroker
by default, but may be
replaced with a custom Broker
if desired.
46 async def startup(self): 47 """ 48 Startup the broker to allow it to perform any initial actions, like connecting 49 to a database. Also spawn/fork the processing queues and register them 50 with the manager, to allow for graceful cleanup during shutdown. 51 """ 52 # allow the broker to startup whatever it needs 53 await self.broker.startup() 54 55 # spawn listening worker processes with communication events for cleanup 56 self._pevent = Event() 57 for queue_name in self.queue_names: 58 p = Process( 59 name=f"brokingworker-{queue_name}", 60 target=self.broker._spawn_worker, 61 args=(self._pevent, queue_name), 62 kwargs=self.bkwargs, 63 ) 64 self.processes.append(p) 65 p.start() 66 67 self._initialized = True
Startup the broker to allow it to perform any initial actions, like connecting to a database. Also spawn/fork the processing queues and register them with the manager, to allow for graceful cleanup during shutdown.
69 async def shutdown(self): 70 """ 71 Signal all processing workers to shutdown and wait for them to cleanup. Also 72 gracefully shutdown the broker. This must be called after startup. 73 """ 74 if not self._initialized: 75 raise NotReadyException() 76 77 # shutdown the processes by setting their locking events rather than 78 # sending a SIGTERM, to allow the workers to properly cleanup their resources 79 # and connections. 80 self._pevent.set() 81 for process in self.processes: 82 process.join() 83 84 await self.broker.shutdown() 85 self._initialized = False
Signal all processing workers to shutdown and wait for them to cleanup. Also gracefully shutdown the broker. This must be called after startup.
87 async def enqueue( 88 self, func: Callable, *args, queue_name: str = "default", **kwargs 89 ): 90 """ 91 Enqueues the given function and its arguments for asynchronous execution 92 sometime in the near future. All functions are wrapped and serialized. No 93 guarantee is given for WHEN an enqueued job will be run, only that it will 94 (or at least, attempted). The time until execution will depend on the number 95 of enqueued tasks, their complexities, and the rest of the workload present on 96 the system. 97 """ 98 if not self._initialized: 99 raise NotReadyException() 100 elif queue_name not in self.queue_names: 101 raise InvalidQueueException() 102 elif not callable(func): 103 raise InvalidEnqueueableFunction() 104 105 # wrap in a partial for serialization 106 partial = functools.partial(func, *args, **kwargs) 107 serialized = pickle.dumps(partial) 108 109 # request the storage broker dispatch the serialized job to the correct 110 # worker thread 111 return await self.broker.enqueue(queue_name, serialized)
Enqueues the given function and its arguments for asynchronous execution sometime in the near future. All functions are wrapped and serialized. No guarantee is given for WHEN an enqueued job will be run, only that it will (or at least, attempted). The time until execution will depend on the number of enqueued tasks, their complexities, and the rest of the workload present on the system.
10class Broker(ABC): 11 """ 12 An abtract Broker interface used to define custom functionality. Every 13 Broker class must inherit from this interface and override the defined 14 abstract methods. 15 """ 16 17 def __init__(self, coroutines_per_worker: int = 20): 18 # settings 19 self.coroutines_per_worker = coroutines_per_worker 20 """ 21 The number of coroutine working tasks to spawn per queue. Each coroutine 22 processes and runs jobs atomically, and their parallelism is handeled by 23 asyncio. Defaults to 20. 24 """ 25 26 # operational 27 self.is_worker: bool = False 28 """ 29 Set by the `Manager` to mark this broker as a worker responsible for 30 processing jobs. May be used during startup and shutdown to do 31 worker-specific actions. 32 """ 33 self.loop: Optional[asyncio.AbstractEventLoop] = None 34 35 @abstractmethod 36 async def startup(self): 37 """ 38 Performs any setup required by the storage broker, like connecting to a 39 database. 40 """ 41 raise NotImplementedError("Storage brokers must define a startup process.") 42 43 @abstractmethod 44 async def shutdown(self): 45 """ 46 Performs any shutdown required by the broker, like ensuring 47 disconnection from a database. 48 """ 49 raise NotImplementedError("Storage brokers must define a shutdown process.") 50 51 @abstractmethod 52 async def enqueue(self, queue_name: str, job: bytes): 53 """ 54 Enqueues the given serialized job to the provided queue for later 55 processing. 56 """ 57 raise NotImplementedError( 58 "Storage brokers must define a way to enqueue serialized jobs." 59 ) 60 61 @abstractmethod 62 async def process_jobs(self, queue_name: str): 63 """ 64 Infinitly polls for new jobs pushed the given queue and attempts to run 65 them via `Broker.run_job`. Jobs must be dequeued atomically. This 66 method is also responsible for determining what to do if a job fails, 67 like re-adding it to the queue. 68 69 ~ See `RedisBroker.process_jobs` for an example. 70 """ 71 raise NotImplementedError( 72 "Storage brokers must define a way to process queued jobs. " 73 "This should run forever." 74 ) 75 76 async def run_job(self, job: bytes) -> bool: 77 """ 78 Loads and executes a job, either directly in the event loop if it's a 79 coroutine or in a threadpool if it isn't. Returns if the job ran 80 successfully or not. 81 """ 82 partial = pickle.loads(job) 83 84 try: 85 # run the job as a coroutine or in a threadpool 86 if asyncio.iscoroutinefunction(partial.func): 87 await partial() 88 elif self.loop: 89 await self.loop.run_in_executor(None, partial) 90 91 return True 92 except Exception: 93 # TODO: what happens when a job fails? probably should keep track 94 # somewhere with a retry counter instead of leaving it pending 95 return False 96 97 @classmethod 98 def _spawn_worker(cls, event: Event, queue_name: str, **kwargs): 99 loop = asyncio.new_event_loop() 100 loop_thread = threading.Thread(target=loop.run_forever) 101 loop_thread.start() 102 103 # create worker and wait for it to startup 104 worker = cls(**kwargs) 105 worker.is_worker = True 106 worker.loop = loop 107 asyncio.run_coroutine_threadsafe(worker.startup(), loop).result() 108 109 # spawn N working coroutines 110 futs = [ 111 asyncio.run_coroutine_threadsafe(worker.process_jobs(queue_name), loop) 112 for _ in range(worker.coroutines_per_worker) 113 ] 114 115 # block until the shutdown event is set 116 event.wait() 117 118 # cancel all worker coroutines 119 for fut in futs: 120 loop.call_soon_threadsafe(fut.cancel) 121 122 # wait for the worker shutdown to complete 123 asyncio.run_coroutine_threadsafe(worker.shutdown(), loop).result() 124 125 # cleanup the loop 126 loop.call_soon_threadsafe(loop.stop) 127 loop_thread.join() 128 loop.close()
An abtract Broker interface used to define custom functionality. Every Broker class must inherit from this interface and override the defined abstract methods.
The number of coroutine working tasks to spawn per queue. Each coroutine processes and runs jobs atomically, and their parallelism is handeled by asyncio. Defaults to 20.
Set by the Manager
to mark this broker as a worker responsible for
processing jobs. May be used during startup and shutdown to do
worker-specific actions.
35 @abstractmethod 36 async def startup(self): 37 """ 38 Performs any setup required by the storage broker, like connecting to a 39 database. 40 """ 41 raise NotImplementedError("Storage brokers must define a startup process.")
Performs any setup required by the storage broker, like connecting to a database.
43 @abstractmethod 44 async def shutdown(self): 45 """ 46 Performs any shutdown required by the broker, like ensuring 47 disconnection from a database. 48 """ 49 raise NotImplementedError("Storage brokers must define a shutdown process.")
Performs any shutdown required by the broker, like ensuring disconnection from a database.
51 @abstractmethod 52 async def enqueue(self, queue_name: str, job: bytes): 53 """ 54 Enqueues the given serialized job to the provided queue for later 55 processing. 56 """ 57 raise NotImplementedError( 58 "Storage brokers must define a way to enqueue serialized jobs." 59 )
Enqueues the given serialized job to the provided queue for later processing.
61 @abstractmethod 62 async def process_jobs(self, queue_name: str): 63 """ 64 Infinitly polls for new jobs pushed the given queue and attempts to run 65 them via `Broker.run_job`. Jobs must be dequeued atomically. This 66 method is also responsible for determining what to do if a job fails, 67 like re-adding it to the queue. 68 69 ~ See `RedisBroker.process_jobs` for an example. 70 """ 71 raise NotImplementedError( 72 "Storage brokers must define a way to process queued jobs. " 73 "This should run forever." 74 )
Infinitly polls for new jobs pushed the given queue and attempts to run
them via Broker.run_job
. Jobs must be dequeued atomically. This
method is also responsible for determining what to do if a job fails,
like re-adding it to the queue.
~ See RedisBroker.process_jobs
for an example.
76 async def run_job(self, job: bytes) -> bool: 77 """ 78 Loads and executes a job, either directly in the event loop if it's a 79 coroutine or in a threadpool if it isn't. Returns if the job ran 80 successfully or not. 81 """ 82 partial = pickle.loads(job) 83 84 try: 85 # run the job as a coroutine or in a threadpool 86 if asyncio.iscoroutinefunction(partial.func): 87 await partial() 88 elif self.loop: 89 await self.loop.run_in_executor(None, partial) 90 91 return True 92 except Exception: 93 # TODO: what happens when a job fails? probably should keep track 94 # somewhere with a retry counter instead of leaving it pending 95 return False
Loads and executes a job, either directly in the event loop if it's a coroutine or in a threadpool if it isn't. Returns if the job ran successfully or not.
9class RedisBroker(Broker): 10 """ 11 Production-ready job broker using redis-py for queue management 12 and job persistence. 13 """ 14 15 def __init__( 16 self, 17 shutdown_with_pool: bool = True, 18 url: Optional[str] = None, 19 connection_pool: Optional[ConnectionPool] = None, 20 **kwargs, 21 ): 22 super().__init__(coroutines_per_worker=kwargs.pop("coroutines_per_worker", 20)) 23 self.url = url 24 """ 25 A valid fully-qualified URL to a Redis instance, of one of the following 26 formats. Mutually exclusive with `connection_pool`. 27 28 ```plain 29 redis://[[username]:[password]]@localhost:6379/0 30 rediss://[[username]:[password]]@localhost:6379/0 31 unix://[[username]:[password]]@/path/to/socket.sock?db=0 32 ``` 33 """ 34 self.connection_pool = connection_pool 35 """ 36 A valid connection pool to a Redis instance. 37 """ 38 self.shutdown_with_pool = shutdown_with_pool 39 """ 40 Indicates that the connection pool should release all connections and 41 disconnect when the broker is shutdown. Defaults to true. 42 """ 43 self.kwargs = kwargs 44 """ 45 Any keyword arguments to pass through to the Redis instance during 46 creation. 47 """ 48 49 async def startup(self): 50 # creates a async redis instance by either a supplied connection pool or by url 51 if self.connection_pool: 52 self.redis: Redis = Redis( 53 connection_pool=self.connection_pool, **self.kwargs 54 ) 55 elif self.url: 56 self.redis: Redis = Redis.from_url(self.url, **self.kwargs) 57 self.connection_pool = self.redis.connection_pool 58 else: 59 raise ValueError( 60 "You must specify either a valid Redis URL or existing connection pool!" 61 ) 62 63 async def shutdown(self): 64 # shutdowns all redis connections 65 await self.redis.close() 66 if self.connection_pool and self.shutdown_with_pool: 67 await self.connection_pool.disconnect() 68 69 async def enqueue(self, queue_name: str, job: bytes): 70 return await self.redis.rpush(f"jobqueue:{queue_name}", job) # type: ignore 71 72 async def process_jobs(self, queue_name: str): 73 jqueue = f"jobqueue:{queue_name}" 74 pqueue = f"{jqueue}-processing" 75 76 while True: 77 # deserialize the queued job for processing 78 serialized = await self.redis.brpoplpush(jqueue, pqueue, timeout=5) 79 80 if serialized: 81 # run the job 82 successful = await super().run_job(serialized) 83 if successful: 84 # remove from the processing queue 85 await self.redis.lrem(pqueue, 0, serialized) # type: ignore
Production-ready job broker using redis-py for queue management and job persistence.
15 def __init__( 16 self, 17 shutdown_with_pool: bool = True, 18 url: Optional[str] = None, 19 connection_pool: Optional[ConnectionPool] = None, 20 **kwargs, 21 ): 22 super().__init__(coroutines_per_worker=kwargs.pop("coroutines_per_worker", 20)) 23 self.url = url 24 """ 25 A valid fully-qualified URL to a Redis instance, of one of the following 26 formats. Mutually exclusive with `connection_pool`. 27 28 ```plain 29 redis://[[username]:[password]]@localhost:6379/0 30 rediss://[[username]:[password]]@localhost:6379/0 31 unix://[[username]:[password]]@/path/to/socket.sock?db=0 32 ``` 33 """ 34 self.connection_pool = connection_pool 35 """ 36 A valid connection pool to a Redis instance. 37 """ 38 self.shutdown_with_pool = shutdown_with_pool 39 """ 40 Indicates that the connection pool should release all connections and 41 disconnect when the broker is shutdown. Defaults to true. 42 """ 43 self.kwargs = kwargs 44 """ 45 Any keyword arguments to pass through to the Redis instance during 46 creation. 47 """
A valid fully-qualified URL to a Redis instance, of one of the following
formats. Mutually exclusive with connection_pool
.
redis://[[username]:[password]]@localhost:6379/0
rediss://[[username]:[password]]@localhost:6379/0
unix://[[username]:[password]]@/path/to/socket.sock?db=0
Indicates that the connection pool should release all connections and disconnect when the broker is shutdown. Defaults to true.
49 async def startup(self): 50 # creates a async redis instance by either a supplied connection pool or by url 51 if self.connection_pool: 52 self.redis: Redis = Redis( 53 connection_pool=self.connection_pool, **self.kwargs 54 ) 55 elif self.url: 56 self.redis: Redis = Redis.from_url(self.url, **self.kwargs) 57 self.connection_pool = self.redis.connection_pool 58 else: 59 raise ValueError( 60 "You must specify either a valid Redis URL or existing connection pool!" 61 )
Performs any setup required by the storage broker, like connecting to a database.
63 async def shutdown(self): 64 # shutdowns all redis connections 65 await self.redis.close() 66 if self.connection_pool and self.shutdown_with_pool: 67 await self.connection_pool.disconnect()
Performs any shutdown required by the broker, like ensuring disconnection from a database.
69 async def enqueue(self, queue_name: str, job: bytes): 70 return await self.redis.rpush(f"jobqueue:{queue_name}", job) # type: ignore
Enqueues the given serialized job to the provided queue for later processing.
72 async def process_jobs(self, queue_name: str): 73 jqueue = f"jobqueue:{queue_name}" 74 pqueue = f"{jqueue}-processing" 75 76 while True: 77 # deserialize the queued job for processing 78 serialized = await self.redis.brpoplpush(jqueue, pqueue, timeout=5) 79 80 if serialized: 81 # run the job 82 successful = await super().run_job(serialized) 83 if successful: 84 # remove from the processing queue 85 await self.redis.lrem(pqueue, 0, serialized) # type: ignore
Infinitly polls for new jobs pushed the given queue and attempts to run
them via Broker.run_job
. Jobs must be dequeued atomically. This
method is also responsible for determining what to do if a job fails,
like re-adding it to the queue.
~ See RedisBroker.process_jobs
for an example.