base_worker

Abstract base worker for the multi-worker framework.

Provides the BaseWorker class that all workers inherit from, handling worker lifecycle, parent-child relationships, and long-running job coordination on the bus.

class pipecat.workers.base_worker.WorkerParams(loop: AbstractEventLoop)[source]

Bases: object

Configuration parameters for worker execution.

Parameters:

loop – The asyncio event loop to use for worker execution.

loop: AbstractEventLoop
class pipecat.workers.base_worker.WorkerActivationArgs(metadata: dict | None = None)[source]

Bases: object

Base activation arguments for any worker.

Parameters:

metadata – Optional structured data passed during activation.

metadata: dict | None = None
classmethod from_dict(data: dict) Self[source]

Create from a dict, ignoring unknown keys.

to_dict() dict[source]

Convert to a dict, excluding None values.

class pipecat.workers.base_worker.BaseWorker(name: str | None = None, *, active: bool = True)[source]

Bases: BaseObject, BusSubscriber

Abstract base for workers in framework.

A worker connects to a WorkerBus, registers itself in the shared registry, accepts activation/deactivation, and exchanges job requests/responses with other workers. Concrete subclasses (e.g. PipelineWorker) provide the runtime that actually drives the worker’s work.

Overridable lifecycle methods (always call super()):

  • on_activated(args): Called when this worker is activated.

  • on_deactivated(): Called when this worker is deactivated.

  • on_worker_ready(data): Called when another worker is ready to receive messages. For local root workers, fires automatically. For children, fires only on the parent. For remote workers, fires only for workers watched via watch_workers().

  • on_job_request(message): Called when a job request is received.

  • on_job_response(message): Called when a worker sends a response.

  • on_job_update(message): Called when a worker sends a progress update.

  • on_job_update_requested(message): Called when the requester asks for a progress update.

  • on_job_completed(result): Called when all workers in a job group have responded.

  • on_job_error(message): Called when a worker errors and the group is cancelled (cancel_on_error).

  • on_job_stream_start(message): Called when a worker begins streaming.

  • on_job_stream_data(message): Called for each streaming chunk from a worker.

  • on_job_stream_end(message): Called when a worker finishes streaming.

  • on_job_cancelled(message): Called when this worker’s job is cancelled by the requester.

  • on_bus_message(message): Called for bus messages after default lifecycle handling.

Event handlers available:

  • on_activated: Worker was activated.

  • on_deactivated: Worker was deactivated.

  • on_worker_ready: Another worker is ready.

  • on_worker_failed: A child worker reported an error.

  • on_job_request: Received a job request.

  • on_job_response: A worker sent a response.

  • on_job_update: A worker sent a progress update.

  • on_job_update_requested: Requester asked for a progress update.

  • on_job_completed: All workers in a job group responded.

  • on_job_error: A worker errored and the group was cancelled.

  • on_job_stream_start: A worker started streaming.

  • on_job_stream_data: A worker sent a streaming chunk.

  • on_job_stream_end: A worker finished streaming.

  • on_job_cancelled: This worker’s job was cancelled.

  • on_bus_message: A bus message was received.

__init__(name: str | None = None, *, active: bool = True)[source]

Initialize the BaseWorker.

Parameters:
  • name – Unique name for this worker. If None, an auto-generated name is used (useful for instances that don’t participate in inter-worker communication).

  • active – Whether the worker starts active. Defaults to True.

property bus: WorkerBus

The bus this worker is attached to.

Raises:

RuntimeError – If accessed before attach() has been called.

property active: bool

Whether this worker is currently active.

property activation_args: dict | None

The arguments from the most recent activation, or None if inactive.

property parent: str | None

The name of the parent worker, or None if this is a root worker.

property registry: WorkerRegistry

The shared worker registry this worker is attached to.

Raises:

RuntimeError – If accessed before attach() has been called.

property started_at: float | None

Unix timestamp when this worker became ready, or None if not yet started.

property bridged: bool

Whether this worker is bridged onto the bus.

Subclasses (e.g. PipelineWorker) override when they auto-wrap their pipeline with bus edge processors.

property children: list[BaseWorker]

The list of child workers added via add_workers().

property active_jobs: dict[str, BusJobRequestMessage]

Active job requests this worker is currently working on, keyed by job_id.

property job_groups: dict[str, JobGroup]

Active job groups launched by this worker, keyed by job_id.

async attach(*, registry: WorkerRegistry, bus: WorkerBus) None[source]

Attach the worker to a runner-provided registry and bus.

Called by the runner (typically from add_workers()) before the worker is run. After this call, registry and bus return the provided instances, and the worker is subscribed to the bus — so workers added later are listening before any worker emits its first message.

Parameters:
  • registry – The shared worker registry.

  • bus – The shared worker bus.

async cleanup() None[source]

Clean up the worker and release resources.

Cancels running jobs, unsubscribes from the bus, and signals that the worker has stopped.

async run(params: WorkerParams) None[source]

Run this worker until it finishes.

The default implementation is for bus-only workers: it subscribes to the bus, marks the worker as started, then waits until stop() (or _finished_event()) is signalled. Subclasses with their own runtime (e.g. PipelineWorker) override this method.

Parameters:

params – Configuration parameters for worker execution.

async start() None[source]

Mark the worker as started, register, and activate if requested.

async stop() None[source]

Clean up and signal that this worker has stopped.

Cancels all running job groups and reports any still-active job requests back to their requesters as CANCELLED, so parents aren’t left waiting.

async end(*, reason: str | None = None) None[source]

Request a graceful end of the session.

Parameters:

reason – Optional human-readable reason for ending.

async cancel(*, reason: str | None = None) None[source]

Request an immediate cancellation of all workers.

Parameters:

reason – Optional human-readable reason. Propagated through the runner to every root worker’s BusCancelWorkerMessage.

async wait() None[source]

Wait for this worker to finish.

async on_activated(args: dict | None) None[source]

Called when this worker is activated.

Override in subclasses to react to activation. Always call super().on_activated(args).

Parameters:

args – Optional arguments from the caller.

async on_deactivated() None[source]

Called when this worker is deactivated.

Override in subclasses to react to deactivation. Always call super().on_deactivated().

async on_worker_ready(data: WorkerReadyData) None[source]

Called when another worker is ready to receive messages.

For local root workers this fires automatically. For remote workers it fires only for workers watched via watch_workers(). For child workers it fires only on the parent that created them.

Parameters:

data – Information about the ready worker.

async on_worker_failed(data: WorkerErrorData) None[source]

Called when a child worker reports an error.

Parameters:

data – Information about the error.

async on_bus_message(message: BusMessage) None[source]

Called for every bus message after built-in lifecycle handling.

Override to handle custom message types. Built-in message types (activation, end, cancel, job) are already dispatched to their respective hooks before this method is called.

Parameters:

message – The BusMessage to process.

async on_job_request(message: BusJobRequestMessage) None[source]

Called when this worker receives a job request.

Override to perform work. Use send_job_update() to report progress and send_job_response() to return results.

async on_job_response(message: BusJobResponseMessage | BusJobResponseUrgentMessage) None[source]

Called when a worker sends a response.

Override to process individual results as they arrive.

async on_job_update(message: BusJobUpdateMessage | BusJobUpdateUrgentMessage) None[source]

Called when a worker sends a progress update.

async on_job_update_requested(message: BusJobUpdateRequestMessage) None[source]

Called when the requester asks for a progress update.

Override to send back a progress update via send_job_update().

async on_job_completed(result: JobGroupResponse) None[source]

Called when all workers in a job group have responded.

async on_job_error(message: BusJobResponseMessage | BusJobResponseUrgentMessage) None[source]

Called when a job group is cancelled due to a worker error.

Fires when a worker responds with ERROR or FAILED status and cancel_on_error is set. The group is cancelled and on_job_completed will not fire. Partial responses from workers that completed before the error are available in the job group’s responses.

async on_job_stream_start(message: BusJobStreamStartMessage) None[source]

Called when a worker begins streaming.

async on_job_stream_data(message: BusJobStreamDataMessage) None[source]

Called for each streaming chunk from a worker.

async on_job_stream_end(message: BusJobStreamEndMessage) None[source]

Called when a worker finishes streaming.

async on_job_cancelled(message: BusJobCancelMessage) None[source]

Called when this worker’s job is cancelled by the requester.

Override to clean up resources or stop in-progress work.

async send_bus_message(message: BusMessage) None[source]

Send a message on the bus.

Parameters:

message – The BusMessage to send.

async send_bus_error_message(error: str) None[source]

Report an error on the bus.

Child workers send a local-only message to the parent. Root workers broadcast over the network.

Parameters:

error – Description of the error.

async add_workers(*workers: BaseWorker, watch: bool = True) None[source]

Register one or more child workers under this parent.

Each child’s lifecycle (end, cancel) is automatically managed by this parent worker. By default, the children are also watched so the parent receives on_worker_ready when each one starts; pass watch=False to opt out (you can still call watch_workers() later).

Parameters:
  • *workers – One or more child BaseWorker instances to add.

  • watch – When True (the default), watch each newly added child so on_worker_ready fires once it registers. Workers that were skipped (already parented elsewhere) are not watched.

async activate_worker(worker_name: str, *, args: WorkerActivationArgs | None = None, deactivate_self: bool = False) None[source]

Activate a worker by name.

The target worker’s on_activated hook will be called with the provided arguments.

Parameters:
  • worker_name – The name of the worker to activate.

  • args – Optional WorkerActivationArgs forwarded to the target worker’s on_activated.

  • deactivate_self – Whether to deactivate this worker before activating the target.

async deactivate_worker(worker_name: str) None[source]

Deactivate a worker by name.

The target worker’s on_deactivated hook will be called.

Parameters:

worker_name – The name of the worker to deactivate.

async watch_workers(*worker_names: str) None[source]

Request notification when one or more workers register.

For each name: if the worker is already registered, on_worker_ready fires immediately. Otherwise on_worker_ready fires when the worker eventually registers.

Parameters:

*worker_names – Names of workers to watch for.

async request_job(worker_name: str, *, name: str | None = None, payload: dict | None = None, timeout: float | None = None) str[source]

Send a job request to a single worker (fire-and-forget).

Waits for the worker to be ready before sending the request. Does not wait for the job to complete; use callbacks (on_job_response, on_job_completed) or job() for that.

Parameters:
  • worker_name – Name of the worker to send the job to.

  • name – Optional job name for routing to a named @job handler on the worker.

  • payload – Optional structured data describing the work.

  • timeout – Optional timeout in seconds. If set, the job is automatically cancelled after this duration.

Returns:

The generated job_id.

job(worker_name: str, *, name: str | None = None, payload: dict | None = None, timeout: float | None = None) JobContext[source]

Create a single-worker job context manager.

Waits for the worker to be ready, sends a job request, and waits for the response on exit. Supports async for inside the block to receive intermediate events (updates and streaming data) from the worker while waiting.

On normal completion, the result is available via response. On worker error or timeout, raises JobError.

Parameters:
  • worker_name – Name of the worker to send the job to.

  • name – Optional job name for routing to a named @job handler on the worker.

  • payload – Optional structured data describing the work.

  • timeout – Optional timeout in seconds.

Returns:

A JobContext to use with async with.

Example:

async with self.job("worker", payload=data) as t:
    async for event in t:
        if event.type == JobEvent.UPDATE:
            print(event.data)

print(t.response)
async request_job_group(*worker_names: str, name: str | None = None, payload: dict | None = None, timeout: float | None = None, cancel_on_error: bool = True) str[source]

Send a job request to multiple workers (fire-and-forget).

Waits for all workers to be ready before sending requests. Does not wait for the job group to complete; use callbacks (on_job_response, on_job_completed) or job_group() for that.

Parameters:
  • *worker_names – Names of the workers to send the job to.

  • name – Optional job name for routing to named @job handlers on the workers.

  • payload – Optional structured data describing the work.

  • timeout – Optional timeout in seconds. If set, the job is automatically cancelled after this duration.

  • cancel_on_error – Whether to cancel the entire group if a worker responds with an error status. Defaults to True.

Returns:

The generated job_id shared by all workers in the group.

job_group(*worker_names: str, name: str | None = None, payload: dict | None = None, timeout: float | None = None, cancel_on_error: bool = True) JobGroupContext[source]

Create a job group context manager.

Waits for workers to be ready, sends job requests, and waits for all responses on exit. Supports async for inside the block to receive intermediate events (updates and streaming data) from workers while waiting.

On normal completion, results are available via responses. On worker error (with cancel_on_error=True) or timeout, raises JobGroupError.

Parameters:
  • *worker_names – Names of the workers to send the job to.

  • name – Optional job name for routing to named @job handlers on the workers.

  • payload – Optional structured data describing the work.

  • timeout – Optional timeout in seconds.

  • cancel_on_error – Whether to cancel the group if a worker errors. Defaults to True.

Returns:

A JobGroupContext to use with async with.

Example:

async with self.job_group("w1", "w2", payload=data) as tg:
    async for event in tg:
        if event.type == JobGroupEvent.UPDATE:
            print(f"{event.worker_name}: {event.data}")

for name, result in tg.responses.items():
    print(name, result)
async cancel_job_group(job_id: str, *, reason: str | None = None) None[source]

Cancel a running job group.

Parameters:
  • job_id – The job identifier to cancel.

  • reason – Optional human-readable reason for cancellation.

async request_job_update(job_id: str, worker_name: str) None[source]

Request a progress update from a worker.

Parameters:
  • job_id – The job identifier.

  • worker_name – The name of the worker to request an update from.

async create_job_group_and_request_job(worker_names: list[str], *, name: str | None = None, payload: dict | None = None, timeout: float | None = None, cancel_on_error: bool = True) JobGroup[source]

Wait for workers to be ready, create a job group, and send requests.

Waits for all workers to be registered as ready, then creates the group and sends a job request to each worker. Does not wait for the group to complete; call group.wait() or use job_group() for that.

Parameters:
  • worker_names – Names of the workers to send the job to.

  • name – Optional job name for routing to named handlers.

  • payload – Optional structured data describing the work.

  • timeout – Optional timeout in seconds. Covers both the ready-wait and job execution.

  • cancel_on_error – Whether to cancel the group if a worker errors. Defaults to True.

Returns:

The created JobGroup.

Raises:

JobGroupError – If workers are not ready within the timeout.

async send_job_response(job_id: str, response: dict | None = None, *, status: JobStatus = JobStatus.COMPLETED, urgent: bool = False) None[source]

Send a job response back to the requester.

After sending, the job is removed from the set of active jobs.

Parameters:
  • job_id – The identifier of the job being responded to.

  • response – Optional result data.

  • status – Completion status. Defaults to JobStatus.COMPLETED.

  • urgent – When True, the response is delivered with system priority, preempting queued data messages.

Raises:

RuntimeError – If there is no active job with this job_id.

async send_job_update(job_id: str, update: dict | None = None, *, urgent: bool = False) None[source]

Send a progress update to the requester.

Parameters:
  • job_id – The identifier of the job being updated.

  • update – Optional progress data.

  • urgent – When True, the update is delivered with system priority, preempting queued data messages.

Raises:

RuntimeError – If there is no active job with this job_id.

async send_job_stream_start(job_id: str, data: dict | None = None) None[source]

Begin streaming job results back to the requester.

Parameters:
  • job_id – The identifier of the job being streamed.

  • data – Optional metadata about the stream.

Raises:

RuntimeError – If there is no active job with this job_id.

async send_job_stream_data(job_id: str, data: dict | None = None) None[source]

Send a streaming chunk to the requester.

Parameters:
  • job_id – The identifier of the job being streamed.

  • data – The chunk payload.

Raises:

RuntimeError – If there is no active job with this job_id.

async send_job_stream_end(job_id: str, data: dict | None = None) None[source]

End the current stream and mark this worker’s job as complete.

Parameters:
  • job_id – The identifier of the job being streamed.

  • data – Optional final metadata.

Raises:

RuntimeError – If there is no active job with this job_id.