job_context

Worker group types for structured concurrent worker execution.

class pipecat.pipeline.job_context.JobStatus(*values)[source]

Bases: StrEnum

Status of a completed worker.

Inherits from str so values compare naturally with plain strings and serialize without extra handling.

COMPLETED

The worker finished successfully.

CANCELLED

The worker was cancelled by the requester.

FAILED

The worker failed due to a logical or business error.

ERROR

The worker encountered an unexpected runtime error.

COMPLETED = 'completed'
CANCELLED = 'cancelled'
FAILED = 'failed'
ERROR = 'error'
exception pipecat.pipeline.job_context.JobError[source]

Bases: Exception

Raised when a worker is cancelled due to a worker error or timeout.

exception pipecat.pipeline.job_context.JobGroupError[source]

Bases: Exception

Raised when a worker group is cancelled due to a worker error or timeout.

class pipecat.pipeline.job_context.JobGroupResponse(job_id: str, responses: dict[str, dict])[source]

Bases: object

Collected results from a completed job group.

Parameters:
  • job_id – The shared job identifier.

  • responses – Collected responses keyed by worker name.

job_id: str
responses: dict[str, dict]
class pipecat.pipeline.job_context.JobEvent(type: str, data: dict | None = None)[source]

Bases: object

An event received from a worker during a single-worker job.

Parameters:
  • type – The event type.

  • data – Optional event payload.

UPDATE: ClassVar[str] = 'update'
STREAM_START: ClassVar[str] = 'stream_start'
STREAM_DATA: ClassVar[str] = 'stream_data'
STREAM_END: ClassVar[str] = 'stream_end'
type: str
data: dict | None = None
class pipecat.pipeline.job_context.JobGroupEvent(type: str, worker_name: str, data: dict | None = None)[source]

Bases: object

An event received from a worker during job group execution.

Parameters:
  • type – The event type.

  • worker_name – The name of the worker that sent the event.

  • data – Optional event payload.

UPDATE: ClassVar[str] = 'update'
STREAM_START: ClassVar[str] = 'stream_start'
STREAM_DATA: ClassVar[str] = 'stream_data'
STREAM_END: ClassVar[str] = 'stream_end'
type: str
worker_name: str
data: dict | None = None
class pipecat.pipeline.job_context.JobGroup(job_id: str, worker_names: set[str], responses: dict[str, dict] = <factory>, timeout_task: ~_asyncio.Task | None = None, cancel_on_error: bool = True, event_queue: ~asyncio.queues.Queue | None = None, _done: ~asyncio.locks.Event = <factory>, _error: str | None = None)[source]

Bases: object

Tracks a group of workers launched together.

Parameters:
  • job_id – Shared identifier for all workers in this group.

  • worker_names – Names of the workers in the group.

  • responses – Collected responses keyed by worker name.

  • timeout_task – Optional asyncio worker that cancels the group on timeout.

  • cancel_on_error – Whether to cancel the group if a worker errors.

  • event_queue – Optional queue for streaming events to a JobGroupContext async iterator.

job_id: str
worker_names: set[str]
responses: dict[str, dict]
timeout_task: Task | None = None
cancel_on_error: bool = True
event_queue: Queue | None = None
property is_done: bool

Whether the group has completed or failed.

async wait() None[source]

Wait for all workers in the group to respond.

Raises:

JobGroupError – If the group was cancelled due to error or timeout.

complete() None[source]

Signal that all workers have responded.

fail(reason: str | None = None) None[source]

Signal that the group was cancelled.

Parameters:

reason – Human-readable reason for the failure.

class pipecat.pipeline.job_context.JobGroupContext(worker: BaseWorker, worker_names: tuple[str, ...], *, name: str | None = None, payload: dict | None = None, timeout: float | None = None, cancel_on_error: bool = True)[source]

Bases: object

Async context manager and iterator for structured job group execution.

Sends job requests on enter, waits for all responses on exit. Supports async for to receive intermediate events (updates and streaming data) from workers while waiting for completion.

On normal completion, results are available via responses. On worker error (with cancel_on_error=True) or timeout, raises JobGroupError. If the async with block raises, remaining jobs are cancelled.

Example:

async with self.job_group("w1", "w2", payload=data) as tg:
    async for event in tg:
        print(f"{event.worker_name} [{event.type}]: {event.data}")

for name, result in tg.responses.items():
    print(name, result)
__init__(worker: BaseWorker, worker_names: tuple[str, ...], *, name: str | None = None, payload: dict | None = None, timeout: float | None = None, cancel_on_error: bool = True)[source]

Initialize the JobGroupContext.

Parameters:
  • worker – The parent BaseWorker that owns this job group.

  • worker_names – Names of the workers to send the job to.

  • name – Optional job name for routing to named @job handlers.

  • payload – Optional structured data describing the work.

  • timeout – Optional timeout in seconds covering both the ready-wait and job execution.

  • cancel_on_error – Whether to cancel the group if a worker errors. Defaults to True.

property job_id: str

The shared job identifier for this group.

property responses: dict[str, dict]

Collected responses keyed by worker name.

class pipecat.pipeline.job_context.JobContext(worker: BaseWorker, worker_name: str, *, name: str | None = None, payload: dict | None = None, timeout: float | None = None)[source]

Bases: object

Async context manager and iterator for a single-worker job.

Sends a job request on enter, waits for the response on exit. Supports async for to receive intermediate events (updates and streaming data) from the worker while waiting for completion.

On normal completion, the result is available via response. On worker error or timeout, raises JobError. If the async with block raises, the job is cancelled.

Example:

async with self.job("worker", payload=data) as t:
    async for event in t:
        print(f"[{event.type}]: {event.data}")

print(t.response)
__init__(worker: BaseWorker, worker_name: str, *, name: str | None = None, payload: dict | None = None, timeout: float | None = None)[source]

Initialize the JobContext.

Parameters:
  • worker – The parent BaseWorker that owns this job.

  • worker_name – Name of the worker to send the job to.

  • name – Optional job name for routing to a named @job handler.

  • payload – Optional structured data describing the work.

  • timeout – Optional timeout in seconds covering both the ready-wait and job execution.

property job_id: str

The job identifier.

property response: dict

The worker’s response payload.