bus

Worker bus package – pub/sub messaging between workers and the runner.

Provides the pub/sub infrastructure that connects workers to each other and to the runner. Key components:

  • WorkerBus – abstract base class defining the send/receive interface.

  • AsyncQueueBus – in-process implementation backed by asyncio.Queue.

  • BusBridgeProcessor – bidirectional mid-pipeline bridge for transport/session workers that exchanges frames with other workers through the bus.

  • BusMessage and its subclasses – the typed message hierarchy used for worker lifecycle events (activation, cancellation, shutdown), job coordination, and frame transport.

class pipecat.bus.WorkerBus(**kwargs)[source]

Bases: BaseObject

Abstract base for inter-worker and runner-worker communication.

Provides pub/sub messaging where each subscriber receives messages independently through its own priority queue. System messages (e.g. cancel) are delivered before normal data messages.

Subclasses implement publish() for the specific transport. send() handles local-only messages automatically. For network buses, override start()/stop() to manage connections and call on_message_received() when messages arrive from the network.

__init__(**kwargs)[source]

Initialize the WorkerBus.

Parameters:

**kwargs – Additional arguments passed to BaseObject.

async start()[source]

Start dispatch tasks for all registered subscribers.

async stop()[source]

Stop all dispatch tasks.

async subscribe(subscriber: BusSubscriber) None[source]

Register a subscriber to receive messages from the bus.

Idempotent: re-subscribing an already-registered subscriber is a no-op.

Parameters:

subscriber – The BusSubscriber to register.

async unsubscribe(subscriber: BusSubscriber) None[source]

Remove a subscriber and cancel its dispatch tasks.

Parameters:

subscriber – The BusSubscriber to remove.

async send(message: BusMessage) None[source]

Send a message through the bus.

Local-only messages are delivered directly to subscribers. All other messages are passed to publish() for transport.

Parameters:

message – The bus message to send.

abstractmethod async publish(message: BusMessage) None[source]

Publish a message to the transport.

Subclasses implement this for the specific transport. Called by send() after filtering local-only messages.

Parameters:

message – The bus message to publish.

on_message_received(message: BusMessage) None[source]

Deliver a message to all local subscribers via their priority queues.

Called by bus implementations when a message arrives (either from a local send() or from a network transport).

class pipecat.bus.AsyncQueueBus(**kwargs)[source]

Bases: WorkerBus

In-process bus that delivers messages via priority queues.

async publish(message: BusMessage) None[source]

Deliver a message to all local subscriber queues.

Parameters:

message – The bus message to deliver.

class pipecat.bus.BusActivateWorkerMessage(args: dict | None = None, *, source: str, target: str | None = None)[source]

Bases: BusDataMessage

Tells a targeted worker to become active and start processing.

Parameters:

args – Optional activation arguments forwarded to on_activated.

args: dict | None = None
class pipecat.bus.BusAddWorkerMessage(worker: BaseWorker, *, source: str, target: str | None = None)[source]

Bases: BusSystemMessage, BusLocalMessage

Request to add a worker to the local runner.

Local-only: carries an in-memory worker reference that cannot be serialized over the network.

Parameters:

worker – The worker instance to add.

worker: BaseWorker
class pipecat.bus.BusWorkerErrorMessage(error: str, *, source: str, target: str | None = None)[source]

Bases: BusSystemMessage

Reports an error from a root worker.

Sent over the network so remote workers can react. For child worker errors, see BusWorkerLocalErrorMessage.

Parameters:

error – Description of the error.

error: str
class pipecat.bus.BusWorkerLocalErrorMessage(error: str, *, source: str, target: str | None = None)[source]

Bases: BusSystemMessage, BusLocalMessage

Reports an error from a child worker to its parent.

Local-only: never crosses the network. The parent receives it via on_worker_failed().

Parameters:

error – Description of the error.

error: str
class pipecat.bus.WorkerRegistryEntry(name: str, parent: str | None = None, active: bool = False, bridged: bool = False, started_at: float | None = None)[source]

Bases: object

Information about a worker in a registry snapshot.

Parameters:
  • name – The worker’s name.

  • parent – Name of the parent worker, or None for root tasks.

  • active – Whether the worker is currently active.

  • bridged – Whether the worker is bridged.

  • started_at – Unix timestamp when the worker became ready.

name: str
parent: str | None = None
active: bool = False
bridged: bool = False
started_at: float | None = None
class pipecat.bus.BusWorkerReadyMessage(runner: str, parent: str | None = None, active: bool = False, bridged: bool = False, started_at: float | None = None, *, source: str, target: str | None = None)[source]

Bases: BusDataMessage

Announces that a worker is ready.

Sent when any worker (root or child) becomes ready. Carries the worker’s parent name so observers can reconstruct the full hierarchy.

Parameters:
  • runner – Name of the runner managing this worker.

  • parent – Name of the parent worker, or None for root workers.

  • active – Whether the worker started active.

  • bridged – Whether the worker is bridged (receives pipeline frames from the bus).

  • started_at – Unix timestamp when the worker became ready.

runner: str
parent: str | None = None
active: bool = False
bridged: bool = False
started_at: float | None = None
class pipecat.bus.BusWorkerRegistryMessage(runner: str, workers: list[WorkerRegistryEntry], *, source: str, target: str | None = None)[source]

Bases: BusSystemMessage

Snapshot of workers managed by a runner.

Sent by the runner on startup and when new runners connect, so that remote runners can discover each other’s workers.

Parameters:
  • runner – Name of the runner that owns these workers.

  • workers – List of worker entries with their state.

runner: str
workers: list[WorkerRegistryEntry]
class pipecat.bus.BusBridgeProcessor(*, bus: WorkerBus, worker_name: str, target_task: str | None = None, bridge: str | None = None, exclude_frames: tuple[type[Frame], ...] | None = None, **kwargs)[source]

Bases: FrameProcessor, BusSubscriber

Bidirectional mid-pipeline bridge between a Pipecat pipeline and the bus.

Placed in a transport or session worker’s pipeline to exchange frames with other workers via the WorkerBus. Lifecycle and excluded frames pass through locally without crossing the bus.

__init__(*, bus: WorkerBus, worker_name: str, target_task: str | None = None, bridge: str | None = None, exclude_frames: tuple[type[Frame], ...] | None = None, **kwargs)[source]

Initialize the BusBridgeProcessor.

Parameters:
  • bus – The WorkerBus to exchange frames with.

  • worker_name – Name of the owning worker, used as message source.

  • target_task – When set, only exchange frames with this worker.

  • bridge – Optional bridge name for routing. When set, outgoing frames are tagged with this name and only incoming frames with the same bridge name are accepted.

  • exclude_frames – Extra frame types that should never cross the bus (on top of lifecycle frames which are always excluded).

  • **kwargs – Additional arguments passed to FrameProcessor.

async setup(setup: FrameProcessorSetup)[source]

Subscribe to the bus during processor setup.

async cleanup()[source]

Unsubscribe from the bus on cleanup.

async process_frame(frame: Frame, direction: FrameDirection)[source]

Process a frame: send to bus, or pass through locally if excluded.

Parameters:
  • frame – The frame to process.

  • direction – The direction the frame is traveling.

async on_bus_message(message: BusMessage) None[source]

Handle an incoming bus message by pushing its frame into the pipeline.

Parameters:

message – The bus message to handle.

class pipecat.bus.BusCancelWorkerMessage(reason: str | None = None, *, source: str, target: str | None = None)[source]

Bases: BusSystemMessage

Tells a targeted worker to cancel its pipeline.

Sent by the runner to individual workers during cancellation.

Parameters:

reason – Optional human-readable reason for the cancellation.

reason: str | None = None
class pipecat.bus.BusCancelMessage(reason: str | None = None, *, source: str, target: str | None = None)[source]

Bases: BusSystemMessage

Request a hard cancel of the session.

Sent by a worker to the runner, which responds by sending BusCancelWorkerMessage to each worker.

Parameters:

reason – Optional human-readable reason for the cancellation.

reason: str | None = None
class pipecat.bus.BusDeactivateWorkerMessage(*, source: str, target: str | None = None)[source]

Bases: BusDataMessage

Tells a targeted worker to become inactive and stop processing.

class pipecat.bus.BusEndWorkerMessage(reason: str | None = None, *, source: str, target: str | None = None)[source]

Bases: BusDataMessage

Tells a targeted worker to end its pipeline gracefully.

Sent by the runner to individual workers during shutdown.

Parameters:

reason – Optional human-readable reason for ending.

reason: str | None = None
class pipecat.bus.BusEndMessage(reason: str | None = None, *, source: str, target: str | None = None)[source]

Bases: BusDataMessage

Request a graceful end of the session.

Sent by a worker to the runner, which responds by sending BusEndWorkerMessage to each worker.

Parameters:

reason – Optional human-readable reason for ending.

reason: str | None = None
class pipecat.bus.BusFrameMessage(frame: Frame, direction: FrameDirection, bridge: str | None = None, *, source: str, target: str | None = None)[source]

Bases: BusDataMessage

Wraps a Pipecat Frame for transport over the bus.

Parameters:
  • frame – The Pipecat frame to transport.

  • direction – Direction the frame should travel in the recipient’s pipeline.

  • bridge – Optional bridge name for routing in multi-bridge setups.

frame: Frame
direction: FrameDirection
bridge: str | None = None
class pipecat.bus.BusDataMessage(*, source: str, target: str | None = None)[source]

Bases: BusMessage

Normal-priority bus message.

Delivered in FIFO order on the subscriber’s data queue.

class pipecat.bus.BusLocalMessage[source]

Bases: object

Mixin: message stays on the local bus, never forwarded to remote buses.

class pipecat.bus.BusMessage(*, source: str, target: str | None = None)[source]

Bases: object

Base class for messages carried by the WorkerBus.

Bus messages are independent of pipeline Frame`s — if a worker needs to ship a frame between pipelines it wraps it in a `BusFrameMessage. Subclasses choose delivery priority by extending BusDataMessage (normal priority, FIFO) or BusSystemMessage (high priority, delivered ahead of queued data messages).

Parameters:
  • source – Name of the worker or component that sent this message.

  • target – Name of the intended recipient worker, or None for broadcast.

source: str
target: str | None = None
class pipecat.bus.BusSubscriber[source]

Bases: object

Mixin for objects that receive messages from an WorkerBus.

Implementors override on_bus_message() to handle incoming messages. Concrete subscribers must provide a name property (typically inherited from BaseObject).

property name: str

Unique name identifying this subscriber on the bus.

async on_bus_message(message: BusMessage) None[source]

Handle an incoming bus message.

Parameters:

message – The bus message to handle.

class pipecat.bus.BusSystemMessage(*, source: str, target: str | None = None)[source]

Bases: BusMessage

High-priority bus message.

Delivered ahead of any queued BusDataMessage on the subscriber’s priority queue.

class pipecat.bus.BusTTSSpeakMessage(text: str, append_to_context: bool | None = None, *, source: str, target: str | None = None)[source]

Bases: BusDataMessage

Asks a PipelineWorker to speak the given text via its TTS service.

On receipt, the worker queues a TTSSpeakFrame into its pipeline. Pipelines without a TTS service let the frame flow through harmlessly.

Parameters:
  • text – The text to be spoken.

  • append_to_context – Whether the spoken text should also be appended to the conversation context (forwarded to TTSSpeakFrame).

text: str
append_to_context: bool | None = None
class pipecat.bus.BusJobCancelMessage(job_id: str, reason: str | None = None, *, source: str, target: str | None = None)[source]

Bases: BusSystemMessage

Cancel a running job.

Parameters:
  • job_id – The job identifier.

  • reason – Optional human-readable reason for cancellation.

job_id: str
reason: str | None = None
class pipecat.bus.BusJobRequestMessage(job_id: str, job_name: str | None = None, payload: dict | None = None, *, source: str, target: str | None = None)[source]

Bases: BusDataMessage

Requests a worker worker to start work.

Parameters:
  • job_id – Unique identifier for this job.

  • job_name – Optional job name for routing to named @job handlers.

  • payload – Optional structured data describing the work.

job_id: str
job_name: str | None = None
payload: dict | None = None
class pipecat.bus.BusJobResponseMessage(job_id: str, status: JobStatus, response: dict | None = None, *, source: str, target: str | None = None)[source]

Bases: BusDataMessage

Response from a worker worker when its job completes.

Parameters:
  • job_id – The job identifier.

  • status – Completion status.

  • response – Optional result data.

job_id: str
status: JobStatus
response: dict | None = None
class pipecat.bus.BusJobResponseUrgentMessage(job_id: str, status: JobStatus, response: dict | None = None, *, source: str, target: str | None = None)[source]

Bases: BusSystemMessage

High-priority job response.

Same semantics as BusJobResponseMessage but delivered with system priority, preempting queued data messages.

Parameters:
  • job_id – The job identifier.

  • status – Completion status.

  • response – Optional result data.

job_id: str
status: JobStatus
response: dict | None = None
class pipecat.bus.BusJobStreamDataMessage(job_id: str, data: dict | None = None, *, source: str, target: str | None = None)[source]

Bases: BusDataMessage

A chunk of streaming job data.

Parameters:
  • job_id – The job identifier.

  • data – The chunk payload.

job_id: str
data: dict | None = None
class pipecat.bus.BusJobStreamEndMessage(job_id: str, data: dict | None = None, *, source: str, target: str | None = None)[source]

Bases: BusDataMessage

Signals the end of a streaming job response.

Parameters:
  • job_id – The job identifier.

  • data – Optional final metadata.

job_id: str
data: dict | None = None
class pipecat.bus.BusJobStreamStartMessage(job_id: str, data: dict | None = None, *, source: str, target: str | None = None)[source]

Bases: BusDataMessage

Signals the start of a streaming job response.

Parameters:
  • job_id – The job identifier.

  • data – Optional metadata (e.g. content type).

job_id: str
data: dict | None = None
class pipecat.bus.BusJobUpdateMessage(job_id: str, update: dict | None = None, *, source: str, target: str | None = None)[source]

Bases: BusDataMessage

Progress update from a worker worker.

Parameters:
  • job_id – The job identifier.

  • update – Optional progress data.

job_id: str
update: dict | None = None
class pipecat.bus.BusJobUpdateRequestMessage(job_id: str, *, source: str, target: str | None = None)[source]

Bases: BusDataMessage

Request a progress update from a worker worker.

Parameters:

job_id – The job identifier.

job_id: str
class pipecat.bus.BusJobUpdateUrgentMessage(job_id: str, update: dict | None = None, *, source: str, target: str | None = None)[source]

Bases: BusSystemMessage

High-priority job progress update.

Same semantics as BusJobUpdateMessage but delivered with system priority, preempting queued data messages.

Parameters:
  • job_id – The job identifier.

  • update – Optional progress data.

job_id: str
update: dict | None = None

Subpackages

Submodules