just_jobs

just-jobs

GitHub Workflow Status PyPI - Downloads GitHub Buy a tree

A friendly and lightweight wrapper for arq. just-jobs provides a simple interface on top of arq that implements additional functionality like synchronous job types (IO-bound vs. CPU-bound) and signed and secure task serialization.

Documentation: https://justjobs.thearchitector.dev.

Tested support on Python 3.7, 3.8, 3.9, and 3.10, 3.11.

$ pdm add just-jobs
# or
$ pip install --user just-jobs

Features

just-jobs doesn't aim to replace the invocations that arq provides, only wrap some of them to make job creation and execution better and easier. It lets you:

  • Define and run non-async jobs. Passing a non-async @job function to arq will run properly. Non-async jobs can also be defined as either IO-bound or CPU-bound, which changes how the job will be executed to prevent blocking the asyncio event loop.
  • The arq Context parameter now works a lot like FastAPI's Request. It's no longer a required parameter, but if it exists, it will get set. It doesn't have to be named ctx either, only have the type Context.
  • Specify a single RedisSettings within your WorkerSettings from which you can create a pool using Settings.create_pool().
  • Run jobs either immediately or via normal arq enqueueing.
  • Use non-picklable job arguments and kwargs (supported by the dill library).
  • Signed secure job serialization using blake2b.

Usage

Using just-jobs is pretty straight forward:

Add @job() to any function to make it a delayable job.

If the job is synchronous, specify its job type so just-jobs knows how to optimally run it. If you don't, you'll get an error. This helps encourage thoughtful and intentional job design while ensuring that the event loop is never blocked.

@job(job_type=JobType.CPU_BOUND) # or JobType.IO_BOUND
def complex_math(i: int, j: int, k: int)

If it's a coroutine function, you don't need to specify a job type (and will get a warning if you do).

@job()
async def poll_reddit(subr: str)

By default, just-jobs will utilize your Python version's default number of thread and process workers to handle IO-bound and CPU-bound tasks respectively. On 3.8+, that is min(32, CPU_COUNT + 4) for IO-bound jobs and 1 <= CPU_COUNT <= 61 for CPU-bound ones.

If you want to configure those max worker values, you can do so via the MAX_THREAD_WORKERS and MAX_PROCESS_WORKERS environment variables.

Invoke a job normally if you want to run it immediately.

Invoking a job as a regular function allows you to run a job as if it were one. If you have logic that you only want to execute when enqueued, include a parameter with type Context and check if it exists at runtime (functions with a Context that are run immediately will have that argument set to None).

@job()
async def context_aware(ctx: Context, msg: str):
    if ctx:
        # enqueued then run by arq
        return f"hello {msg}"
    else:
        # invoked manually
        return f"bye {msg}"

await context_aware("world") == "bye world"

j = await p.enqueue_job("context_aware", "world")
await j.result() == "hello world"

Define WorkerSettings using the BaseSettings metaclass.

The execution logic that @job provides requires some stuff. When you defining your WorkerSettings, you must declare BaseSettings as its metaclass to ensure that stuff exists.

class Settings(metaclass=BaseSettings):
    redis_settings = ...

Use Settings.create_pool().

While you may elect to use arq.connections.create_pool as you would normally, using the create_pool function provided by your Settings class ensures the pool it creates always matches your worker's Redis and serialization settings (it will be less of a headache). It also lets you take advantage of additional functionality, namely that it can be used as an auto-closing context manager.

# manually
pool = await Settings.create_pool()
await pool.close(close_connection_pool=True)

# or as an async context manager
async with Settings.create_pool() as pool:
    ...

Sign your job serializations with blake2b.

By default, using just-jobs Settings means all serialized jobs are prefixed with a signature which is then parsed and validated before job execution. This helps ensure that any jobs you serialize do not get tampered with while enqueued and waiting for execution. The default (and very insecure) secret used for signing is thisisasecret. In any production or public-facing deployment, you _should_ change this value to something private and secure. It can be changed via the JOB_SERIALIZATION_SECRET environment variable.

Enqueue your job.

just-jobs doesn't change the way in which you enqueue your jobs. Just use await pool.enqueue_job(...). Using just-jobs, you also don't have to worry as much about the type of arguments you supply; all Python objects supported by the dill serialization library will work just fine.

await pool.enqueue_job('complex_math', 2, 1, 3)

Caveats

  1. arq.func() and @job() are mutually exclusive. If you want to configure a job in the same way, pass the settings you would have passed to func() to @job() instead.

    @job(job_type=JobType.CPU_BOUND, keep_result_forever=True, max_tries=10)
    def task(a: int, b: int):
      return a + b
    
  2. There isn't support for asynchronous CPU-bound tasks. Currently, job types only configure the execution behavior of synchronous tasks (not coroutines). However, there are some valid cases for CPU-bound tasks that also need to be run in an asyncio context.

    At the moment, the best way to achieve this will be to create a synchronous CPU-bound task (so it runs in a separate process) that then invokes a coroutine via asyncio.run. If you intend on running the task in the current context from time to time, just return the coroutine instead and it will get automatically executed in the current event loop.

    async _async_task(a: int, b: int, c: int):
       ab = await add(a, b)
       return await add(ab, c)
    
    @job(job_type=JobType.CPU_BOUND)
    def wrapper_cpu_bound(ctx: Context, a: int, b: int, c: int):
       task = _async_task(a, b, c)
       return asyncio.run(task) if ctx else task
    

Example

The complete example is available at docs/example.py and should work out of the box. The snippet below is just an excerpt to show the features described above:

from just_jobs import BaseSettings, Context, JobType, job

@job()
async def async_task(url: str):
    return url

@job(job_type=JobType.IO_BOUND)
def sync_task(ctx: Context, url: str):
    # if the context is present, this is being run from the arq listener
    if ctx:
        print(url)
    return url

class Settings(metaclass=BaseSettings):
    functions = [async_task, sync_task]
    redis_settings = RedisSettings(host="redis")

async def main():
    # create a Redis pool using the Settings already defined
    pool = await Settings.create_pool()
    # run the_task right now and return the url
    url = sync_task("https://www.theglassfiles.com")

    await pool.enqueue_job("async_task", "https://www.eliasfgabriel.com")
    await pool.enqueue_job("sync_task", "https://gianturl.net")

    await pool.close(close_connection_pool=True)

License

This software is licensed under the 3-Clause BSD License.

This package is Treeware. If you use it in production, consider buying the world a tree to thank me for my work. By contributing to my forest, you’ll be creating employment for local families and restoring wildlife habitats.

1""".. include:: ../README.md"""
2
3from .job_type import JobType
4from .jobs import job
5from .settings import BaseSettings
6from .typing import Context
7
8__all__ = ["job", "JobType", "BaseSettings", "Context"]
def job( job_type: Optional[just_jobs.JobType] = None, name: Optional[str] = None, keep_result: Union[int, float, datetime.timedelta, NoneType] = None, timeout: Union[int, float, datetime.timedelta, NoneType] = None, keep_result_forever: Optional[bool] = None, max_tries: Optional[int] = None) -> Callable[[Callable[..., Union[Any, Awaitable[Any]]]], just_jobs.jobs._job[Any]]:
101def job(
102    job_type: Optional[JobType] = None,
103    name: Optional[str] = None,
104    keep_result: Optional[SecondsTimedelta] = None,
105    timeout: Optional[SecondsTimedelta] = None,
106    keep_result_forever: Optional[bool] = None,
107    max_tries: Optional[int] = None,
108) -> Callable[[ArqCallable[Any]], _job[Any]]:
109    """
110    Creates an async enqueueable job from the provided function. The function may be
111    synchronous or a coroutine. If synchronous, the job will be run in either a
112    thread or process depending on its `JobType`.
113
114    Synchronous jobs are required to specify their `job_type`. If a job type is
115    specified for a coroutine, a warning will be thrown (but will still execute).
116    """
117    return lambda func: _job(
118        func=func,
119        job_type=job_type,
120        # inherited
121        name=name or func.__qualname__,
122        timeout_s=to_seconds(timeout),
123        keep_result_s=to_seconds(keep_result),
124        keep_result_forever=keep_result_forever,
125        max_tries=max_tries,
126    )

Creates an async enqueueable job from the provided function. The function may be synchronous or a coroutine. If synchronous, the job will be run in either a thread or process depending on its JobType.

Synchronous jobs are required to specify their job_type. If a job type is specified for a coroutine, a warning will be thrown (but will still execute).

class JobType(enum.Enum):
 5class JobType(Enum):
 6    """Indicates the performance characteristic of the job to run."""
 7
 8    IO_BOUND = "io-bound"
 9    """
10    IO-bound tasks typically spend a majority of their time waiting for external
11    services to complete, such as when read / writing to disk or sending / receiving
12    network packets. Since they do not hold the GIL during that downtime, IO-bound
13    jobs run in a ThreadPoolExecutor.
14    """
15
16    CPU_BOUND = "cpu-bound"
17    """
18    CPU-bound tasks typically perform many contiguous operations (like complex
19    calculations) rather than wait for external services. Because they operate 
20    continuously and hold the GIL, CPU-bound jobs run in a ProcessPoolExecutor.
21    """

Indicates the performance characteristic of the job to run.

IO_BOUND = <JobType.IO_BOUND: 'io-bound'>

IO-bound tasks typically spend a majority of their time waiting for external services to complete, such as when read / writing to disk or sending / receiving network packets. Since they do not hold the GIL during that downtime, IO-bound jobs run in a ThreadPoolExecutor.

CPU_BOUND = <JobType.CPU_BOUND: 'cpu-bound'>

CPU-bound tasks typically perform many contiguous operations (like complex calculations) rather than wait for external services. Because they operate continuously and hold the GIL, CPU-bound jobs run in a ProcessPoolExecutor.

Inherited Members
enum.Enum
name
value
class BaseSettings(builtins.type):
 24class BaseSettings(type):
 25    """
 26    A metaclass for defining WorkerSettings to pass to an arq process. This enables
 27    using the built-in JobType and executor pool logic, as well as secure remote job
 28    serialization and parsing.
 29    """
 30
 31    def __new__(
 32        cls, clsname: str, bases: Tuple[Any, ...], attrs: Dict[str, Any]
 33    ) -> type:
 34        attrs.update(
 35            {
 36                "on_startup": BaseSettings.on_startup,
 37                "on_shutdown": BaseSettings.on_shutdown,
 38                "job_serializer": BaseSettings.job_serializer,
 39                "job_deserializer": BaseSettings.job_deserializer,
 40            }
 41        )
 42        return super().__new__(cls, clsname, bases, attrs)
 43
 44    @staticmethod
 45    async def on_startup(ctx: Context) -> None:
 46        """
 47        Starts the thread and process pool executors for downstream synchronous job
 48        execution.
 49        """
 50        with styled_text(Fore.BLUE, Style.DIM):
 51            print("[justjobs] Starting executors...")
 52
 53        # we're ok creating pools for all the types since the executors don't
 54        # spin up the threads / processes unless a task is scheduled to run in one
 55        ctx["_executors"] = {
 56            JobType.IO_BOUND: ThreadPoolExecutor(max_workers=MAX_THREAD_WORKERS),
 57            JobType.CPU_BOUND: ProcessPoolExecutor(max_workers=MAX_PROCESS_WORKERS),
 58        }
 59
 60    @staticmethod
 61    async def on_shutdown(ctx: Context) -> None:
 62        """
 63        Gracefully shuts down the available thread and process pool executors.
 64        """
 65        for executor in ctx["_executors"].values():
 66            executor.shutdown(wait=True)
 67
 68        del ctx["_executors"]
 69
 70        with styled_text(Fore.BLUE, Style.DIM):
 71            print("[justjobs] Gracefully shutdown executors ✔")
 72
 73    @staticmethod
 74    def job_serializer(job: Any) -> bytes:
 75        """
 76        Serializes the given job using dill and signs it using blake2b. The serialized
 77        job and its signature are returned for later verification.
 78        """
 79        serialized: bytes = dill.dumps(job)
 80        signer = blake2b(key=SERIALIZATION_SECRET)
 81        signer.update(serialized)
 82        # must be hexdigest to ensure no premature byte delimiters
 83        sig = signer.hexdigest()
 84        return (sig + "|").encode("utf-8") + serialized
 85
 86    @staticmethod
 87    def job_deserializer(packed: bytes) -> Any:
 88        """
 89        Extracts the signature from the serialized job and compares it with the job
 90        function. If the signatures match, the job is deserialized and executed. If
 91        not, an error is raised.
 92        """
 93        sig, serialized = packed.split(b"|", 1)
 94        signer = blake2b(key=SERIALIZATION_SECRET)
 95        signer.update(serialized)
 96
 97        if not compare_digest(sig.decode("utf-8"), signer.hexdigest()):
 98            raise ValueError(
 99                "Invalid job signature! Has someone tampered with your job queue?"
100            )
101
102        return dill.loads(serialized)
103
104    def create_pool(cls, **kwargs: Any) -> Broker:
105        """
106        Creates an ArqRedis instance using this class' RedisSettings and job
107        serializers. This function technically returns an instance of Broker,
108        so the pool creation is delayed until the returned object is either
109        awaited or entered.
110        """
111        if not hasattr(cls, "redis_settings"):
112            raise AttributeError(
113                "You must first define some RedisSettings on this worker class before"
114                " trying to create a pool from them."
115            )
116
117        return Broker(
118            redis_settings=cls.redis_settings,
119            packj=cls.job_serializer,
120            unpackj=cls.job_deserializer,
121            kwargs=kwargs,
122        )

A metaclass for defining WorkerSettings to pass to an arq process. This enables using the built-in JobType and executor pool logic, as well as secure remote job serialization and parsing.

@staticmethod
async def on_startup(ctx: Dict[Any, Any]) -> None:
44    @staticmethod
45    async def on_startup(ctx: Context) -> None:
46        """
47        Starts the thread and process pool executors for downstream synchronous job
48        execution.
49        """
50        with styled_text(Fore.BLUE, Style.DIM):
51            print("[justjobs] Starting executors...")
52
53        # we're ok creating pools for all the types since the executors don't
54        # spin up the threads / processes unless a task is scheduled to run in one
55        ctx["_executors"] = {
56            JobType.IO_BOUND: ThreadPoolExecutor(max_workers=MAX_THREAD_WORKERS),
57            JobType.CPU_BOUND: ProcessPoolExecutor(max_workers=MAX_PROCESS_WORKERS),
58        }

Starts the thread and process pool executors for downstream synchronous job execution.

@staticmethod
async def on_shutdown(ctx: Dict[Any, Any]) -> None:
60    @staticmethod
61    async def on_shutdown(ctx: Context) -> None:
62        """
63        Gracefully shuts down the available thread and process pool executors.
64        """
65        for executor in ctx["_executors"].values():
66            executor.shutdown(wait=True)
67
68        del ctx["_executors"]
69
70        with styled_text(Fore.BLUE, Style.DIM):
71            print("[justjobs] Gracefully shutdown executors ✔")

Gracefully shuts down the available thread and process pool executors.

@staticmethod
def job_serializer(job: Any) -> bytes:
73    @staticmethod
74    def job_serializer(job: Any) -> bytes:
75        """
76        Serializes the given job using dill and signs it using blake2b. The serialized
77        job and its signature are returned for later verification.
78        """
79        serialized: bytes = dill.dumps(job)
80        signer = blake2b(key=SERIALIZATION_SECRET)
81        signer.update(serialized)
82        # must be hexdigest to ensure no premature byte delimiters
83        sig = signer.hexdigest()
84        return (sig + "|").encode("utf-8") + serialized

Serializes the given job using dill and signs it using blake2b. The serialized job and its signature are returned for later verification.

@staticmethod
def job_deserializer(packed: bytes) -> Any:
 86    @staticmethod
 87    def job_deserializer(packed: bytes) -> Any:
 88        """
 89        Extracts the signature from the serialized job and compares it with the job
 90        function. If the signatures match, the job is deserialized and executed. If
 91        not, an error is raised.
 92        """
 93        sig, serialized = packed.split(b"|", 1)
 94        signer = blake2b(key=SERIALIZATION_SECRET)
 95        signer.update(serialized)
 96
 97        if not compare_digest(sig.decode("utf-8"), signer.hexdigest()):
 98            raise ValueError(
 99                "Invalid job signature! Has someone tampered with your job queue?"
100            )
101
102        return dill.loads(serialized)

Extracts the signature from the serialized job and compares it with the job function. If the signatures match, the job is deserialized and executed. If not, an error is raised.

def create_pool(cls, **kwargs: Any) -> just_jobs.broker.Broker:
104    def create_pool(cls, **kwargs: Any) -> Broker:
105        """
106        Creates an ArqRedis instance using this class' RedisSettings and job
107        serializers. This function technically returns an instance of Broker,
108        so the pool creation is delayed until the returned object is either
109        awaited or entered.
110        """
111        if not hasattr(cls, "redis_settings"):
112            raise AttributeError(
113                "You must first define some RedisSettings on this worker class before"
114                " trying to create a pool from them."
115            )
116
117        return Broker(
118            redis_settings=cls.redis_settings,
119            packj=cls.job_serializer,
120            unpackj=cls.job_deserializer,
121            kwargs=kwargs,
122        )

Creates an ArqRedis instance using this class' RedisSettings and job serializers. This function technically returns an instance of Broker, so the pool creation is delayed until the returned object is either awaited or entered.

Inherited Members
builtins.type
type
mro
Context = typing.Dict[typing.Any, typing.Any]