job_context
Worker group types for structured concurrent worker execution.
- class pipecat.pipeline.job_context.JobStatus(*values)[source]
Bases:
StrEnumStatus of a completed worker.
Inherits from
strso values compare naturally with plain strings and serialize without extra handling.- COMPLETED
The worker finished successfully.
- CANCELLED
The worker was cancelled by the requester.
- FAILED
The worker failed due to a logical or business error.
- ERROR
The worker encountered an unexpected runtime error.
- COMPLETED = 'completed'
- CANCELLED = 'cancelled'
- FAILED = 'failed'
- ERROR = 'error'
- exception pipecat.pipeline.job_context.JobError[source]
Bases:
ExceptionRaised when a worker is cancelled due to a worker error or timeout.
- exception pipecat.pipeline.job_context.JobGroupError[source]
Bases:
ExceptionRaised when a worker group is cancelled due to a worker error or timeout.
- class pipecat.pipeline.job_context.JobGroupResponse(job_id: str, responses: dict[str, dict])[source]
Bases:
objectCollected results from a completed job group.
- Parameters:
job_id – The shared job identifier.
responses – Collected responses keyed by worker name.
- job_id: str
- responses: dict[str, dict]
- class pipecat.pipeline.job_context.JobEvent(type: str, data: dict | None = None)[source]
Bases:
objectAn event received from a worker during a single-worker job.
- Parameters:
type – The event type.
data – Optional event payload.
- UPDATE: ClassVar[str] = 'update'
- STREAM_START: ClassVar[str] = 'stream_start'
- STREAM_DATA: ClassVar[str] = 'stream_data'
- STREAM_END: ClassVar[str] = 'stream_end'
- type: str
- data: dict | None = None
- class pipecat.pipeline.job_context.JobGroupEvent(type: str, worker_name: str, data: dict | None = None)[source]
Bases:
objectAn event received from a worker during job group execution.
- Parameters:
type – The event type.
worker_name – The name of the worker that sent the event.
data – Optional event payload.
- UPDATE: ClassVar[str] = 'update'
- STREAM_START: ClassVar[str] = 'stream_start'
- STREAM_DATA: ClassVar[str] = 'stream_data'
- STREAM_END: ClassVar[str] = 'stream_end'
- type: str
- worker_name: str
- data: dict | None = None
- class pipecat.pipeline.job_context.JobGroup(job_id: str, worker_names: set[str], responses: dict[str, dict] = <factory>, timeout_task: ~_asyncio.Task | None = None, cancel_on_error: bool = True, event_queue: ~asyncio.queues.Queue | None = None, _done: ~asyncio.locks.Event = <factory>, _error: str | None = None)[source]
Bases:
objectTracks a group of workers launched together.
- Parameters:
job_id – Shared identifier for all workers in this group.
worker_names – Names of the workers in the group.
responses – Collected responses keyed by worker name.
timeout_task – Optional asyncio worker that cancels the group on timeout.
cancel_on_error – Whether to cancel the group if a worker errors.
event_queue – Optional queue for streaming events to a
JobGroupContextasync iterator.
- job_id: str
- worker_names: set[str]
- responses: dict[str, dict]
- timeout_task: Task | None = None
- cancel_on_error: bool = True
- event_queue: Queue | None = None
- property is_done: bool
Whether the group has completed or failed.
- async wait() None[source]
Wait for all workers in the group to respond.
- Raises:
JobGroupError – If the group was cancelled due to error or timeout.
- class pipecat.pipeline.job_context.JobGroupContext(worker: BaseWorker, worker_names: tuple[str, ...], *, name: str | None = None, payload: dict | None = None, timeout: float | None = None, cancel_on_error: bool = True)[source]
Bases:
objectAsync context manager and iterator for structured job group execution.
Sends job requests on enter, waits for all responses on exit. Supports
async forto receive intermediate events (updates and streaming data) from workers while waiting for completion.On normal completion, results are available via
responses. On worker error (withcancel_on_error=True) or timeout, raisesJobGroupError. If theasync withblock raises, remaining jobs are cancelled.Example:
async with self.job_group("w1", "w2", payload=data) as tg: async for event in tg: print(f"{event.worker_name} [{event.type}]: {event.data}") for name, result in tg.responses.items(): print(name, result)
- __init__(worker: BaseWorker, worker_names: tuple[str, ...], *, name: str | None = None, payload: dict | None = None, timeout: float | None = None, cancel_on_error: bool = True)[source]
Initialize the JobGroupContext.
- Parameters:
worker – The parent BaseWorker that owns this job group.
worker_names – Names of the workers to send the job to.
name – Optional job name for routing to named
@jobhandlers.payload – Optional structured data describing the work.
timeout – Optional timeout in seconds covering both the ready-wait and job execution.
cancel_on_error – Whether to cancel the group if a worker errors. Defaults to True.
- property job_id: str
The shared job identifier for this group.
- property responses: dict[str, dict]
Collected responses keyed by worker name.
- class pipecat.pipeline.job_context.JobContext(worker: BaseWorker, worker_name: str, *, name: str | None = None, payload: dict | None = None, timeout: float | None = None)[source]
Bases:
objectAsync context manager and iterator for a single-worker job.
Sends a job request on enter, waits for the response on exit. Supports
async forto receive intermediate events (updates and streaming data) from the worker while waiting for completion.On normal completion, the result is available via
response. On worker error or timeout, raisesJobError. If theasync withblock raises, the job is cancelled.Example:
async with self.job("worker", payload=data) as t: async for event in t: print(f"[{event.type}]: {event.data}") print(t.response)
- __init__(worker: BaseWorker, worker_name: str, *, name: str | None = None, payload: dict | None = None, timeout: float | None = None)[source]
Initialize the JobContext.
- Parameters:
worker – The parent BaseWorker that owns this job.
worker_name – Name of the worker to send the job to.
name – Optional job name for routing to a named
@jobhandler.payload – Optional structured data describing the work.
timeout – Optional timeout in seconds covering both the ready-wait and job execution.
- property job_id: str
The job identifier.
- property response: dict
The worker’s response payload.