Source code for pipecat.pipeline.job_decorator

#
# Copyright (c) 2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""Decorator for marking worker methods as job handlers."""

from collections.abc import Callable


[docs] def job(*, name: str, sequential: bool = False): """Mark a worker method as a job handler. Decorated methods are automatically collected by ``BaseWorker`` at initialization and dispatched when matching job requests arrive. Each request runs in its own asyncio task so the bus message loop is never blocked. Example:: @job(name="research") async def on_research(self, message): ... @job(name="write", sequential=True) async def on_write(self, message): ... Args: name: Job name to match. The handler only receives requests with a matching name. sequential: When ``True``, requests with this name run one at a time in FIFO order. Concurrent requests wait for the previous one to finish before running. When ``False`` (the default), multiple requests run concurrently. The wait time counts against the requester's timeout, so a slow predecessor can cause queued requests to time out before they start. """ def decorator(fn: Callable) -> Callable: fn.is_job_handler = True # type: ignore[attr-defined] fn.job_name = name # type: ignore[attr-defined] fn.job_sequential = sequential # type: ignore[attr-defined] return fn return decorator
def _collect_job_handlers(obj) -> dict[str, Callable]: seen: set[str] = set() handlers: dict[str, Callable] = {} for cls in type(obj).__mro__: for attr_name, val in cls.__dict__.items(): if attr_name in seen: continue seen.add(attr_name) if callable(val) and getattr(val, "is_job_handler", False): job_name: str = getattr(val, "job_name") if job_name in handlers: existing = handlers[job_name].__name__ raise ValueError( f"Duplicate @job handler for '{job_name}': " f"'{attr_name}' conflicts with '{existing}'" ) handlers[job_name] = getattr(obj, attr_name) return handlers