bus
Worker bus package – pub/sub messaging between workers and the runner.
Provides the pub/sub infrastructure that connects workers to each other and to the runner. Key components:
WorkerBus – abstract base class defining the send/receive interface.
AsyncQueueBus – in-process implementation backed by
asyncio.Queue.BusBridgeProcessor – bidirectional mid-pipeline bridge for transport/session workers that exchanges frames with other workers through the bus.
BusMessage and its subclasses – the typed message hierarchy used for worker lifecycle events (activation, cancellation, shutdown), job coordination, and frame transport.
- class pipecat.bus.WorkerBus(**kwargs)[source]
Bases:
BaseObjectAbstract base for inter-worker and runner-worker communication.
Provides pub/sub messaging where each subscriber receives messages independently through its own priority queue. System messages (e.g. cancel) are delivered before normal data messages.
Subclasses implement
publish()for the specific transport.send()handles local-only messages automatically. For network buses, overridestart()/stop()to manage connections and callon_message_received()when messages arrive from the network.- __init__(**kwargs)[source]
Initialize the WorkerBus.
- Parameters:
**kwargs – Additional arguments passed to BaseObject.
- async subscribe(subscriber: BusSubscriber) None[source]
Register a subscriber to receive messages from the bus.
Idempotent: re-subscribing an already-registered subscriber is a no-op.
- Parameters:
subscriber – The BusSubscriber to register.
- async unsubscribe(subscriber: BusSubscriber) None[source]
Remove a subscriber and cancel its dispatch tasks.
- Parameters:
subscriber – The BusSubscriber to remove.
- async send(message: BusMessage) None[source]
Send a message through the bus.
Local-only messages are delivered directly to subscribers. All other messages are passed to
publish()for transport.- Parameters:
message – The bus message to send.
- abstractmethod async publish(message: BusMessage) None[source]
Publish a message to the transport.
Subclasses implement this for the specific transport. Called by
send()after filtering local-only messages.- Parameters:
message – The bus message to publish.
- on_message_received(message: BusMessage) None[source]
Deliver a message to all local subscribers via their priority queues.
Called by bus implementations when a message arrives (either from a local
send()or from a network transport).
- class pipecat.bus.AsyncQueueBus(**kwargs)[source]
Bases:
WorkerBusIn-process bus that delivers messages via priority queues.
- async publish(message: BusMessage) None[source]
Deliver a message to all local subscriber queues.
- Parameters:
message – The bus message to deliver.
- class pipecat.bus.BusActivateWorkerMessage(args: dict | None = None, *, source: str, target: str | None = None)[source]
Bases:
BusDataMessageTells a targeted worker to become active and start processing.
- Parameters:
args – Optional activation arguments forwarded to on_activated.
- args: dict | None = None
- class pipecat.bus.BusAddWorkerMessage(worker: BaseWorker, *, source: str, target: str | None = None)[source]
Bases:
BusSystemMessage,BusLocalMessageRequest to add a worker to the local runner.
Local-only: carries an in-memory worker reference that cannot be serialized over the network.
- Parameters:
worker – The worker instance to add.
- worker: BaseWorker
- class pipecat.bus.BusWorkerErrorMessage(error: str, *, source: str, target: str | None = None)[source]
Bases:
BusSystemMessageReports an error from a root worker.
Sent over the network so remote workers can react. For child worker errors, see BusWorkerLocalErrorMessage.
- Parameters:
error – Description of the error.
- error: str
- class pipecat.bus.BusWorkerLocalErrorMessage(error: str, *, source: str, target: str | None = None)[source]
Bases:
BusSystemMessage,BusLocalMessageReports an error from a child worker to its parent.
Local-only: never crosses the network. The parent receives it via on_worker_failed().
- Parameters:
error – Description of the error.
- error: str
- class pipecat.bus.WorkerRegistryEntry(name: str, parent: str | None = None, active: bool = False, bridged: bool = False, started_at: float | None = None)[source]
Bases:
objectInformation about a worker in a registry snapshot.
- Parameters:
name – The worker’s name.
parent – Name of the parent worker, or None for root tasks.
active – Whether the worker is currently active.
bridged – Whether the worker is bridged.
started_at – Unix timestamp when the worker became ready.
- name: str
- parent: str | None = None
- active: bool = False
- bridged: bool = False
- started_at: float | None = None
- class pipecat.bus.BusWorkerReadyMessage(runner: str, parent: str | None = None, active: bool = False, bridged: bool = False, started_at: float | None = None, *, source: str, target: str | None = None)[source]
Bases:
BusDataMessageAnnounces that a worker is ready.
Sent when any worker (root or child) becomes ready. Carries the worker’s parent name so observers can reconstruct the full hierarchy.
- Parameters:
runner – Name of the runner managing this worker.
parent – Name of the parent worker, or None for root workers.
active – Whether the worker started active.
bridged – Whether the worker is bridged (receives pipeline frames from the bus).
started_at – Unix timestamp when the worker became ready.
- runner: str
- parent: str | None = None
- active: bool = False
- bridged: bool = False
- started_at: float | None = None
- class pipecat.bus.BusWorkerRegistryMessage(runner: str, workers: list[WorkerRegistryEntry], *, source: str, target: str | None = None)[source]
Bases:
BusSystemMessageSnapshot of workers managed by a runner.
Sent by the runner on startup and when new runners connect, so that remote runners can discover each other’s workers.
- Parameters:
runner – Name of the runner that owns these workers.
workers – List of worker entries with their state.
- runner: str
- workers: list[WorkerRegistryEntry]
- class pipecat.bus.BusBridgeProcessor(*, bus: WorkerBus, worker_name: str, target_task: str | None = None, bridge: str | None = None, exclude_frames: tuple[type[Frame], ...] | None = None, **kwargs)[source]
Bases:
FrameProcessor,BusSubscriberBidirectional mid-pipeline bridge between a Pipecat pipeline and the bus.
Placed in a transport or session worker’s pipeline to exchange frames with other workers via the WorkerBus. Lifecycle and excluded frames pass through locally without crossing the bus.
- __init__(*, bus: WorkerBus, worker_name: str, target_task: str | None = None, bridge: str | None = None, exclude_frames: tuple[type[Frame], ...] | None = None, **kwargs)[source]
Initialize the BusBridgeProcessor.
- Parameters:
bus – The WorkerBus to exchange frames with.
worker_name – Name of the owning worker, used as message source.
target_task – When set, only exchange frames with this worker.
bridge – Optional bridge name for routing. When set, outgoing frames are tagged with this name and only incoming frames with the same bridge name are accepted.
exclude_frames – Extra frame types that should never cross the bus (on top of lifecycle frames which are always excluded).
**kwargs – Additional arguments passed to FrameProcessor.
- async setup(setup: FrameProcessorSetup)[source]
Subscribe to the bus during processor setup.
- async process_frame(frame: Frame, direction: FrameDirection)[source]
Process a frame: send to bus, or pass through locally if excluded.
- Parameters:
frame – The frame to process.
direction – The direction the frame is traveling.
- async on_bus_message(message: BusMessage) None[source]
Handle an incoming bus message by pushing its frame into the pipeline.
- Parameters:
message – The bus message to handle.
- class pipecat.bus.BusCancelWorkerMessage(reason: str | None = None, *, source: str, target: str | None = None)[source]
Bases:
BusSystemMessageTells a targeted worker to cancel its pipeline.
Sent by the runner to individual workers during cancellation.
- Parameters:
reason – Optional human-readable reason for the cancellation.
- reason: str | None = None
- class pipecat.bus.BusCancelMessage(reason: str | None = None, *, source: str, target: str | None = None)[source]
Bases:
BusSystemMessageRequest a hard cancel of the session.
Sent by a worker to the runner, which responds by sending BusCancelWorkerMessage to each worker.
- Parameters:
reason – Optional human-readable reason for the cancellation.
- reason: str | None = None
- class pipecat.bus.BusDeactivateWorkerMessage(*, source: str, target: str | None = None)[source]
Bases:
BusDataMessageTells a targeted worker to become inactive and stop processing.
- class pipecat.bus.BusEndWorkerMessage(reason: str | None = None, *, source: str, target: str | None = None)[source]
Bases:
BusDataMessageTells a targeted worker to end its pipeline gracefully.
Sent by the runner to individual workers during shutdown.
- Parameters:
reason – Optional human-readable reason for ending.
- reason: str | None = None
- class pipecat.bus.BusEndMessage(reason: str | None = None, *, source: str, target: str | None = None)[source]
Bases:
BusDataMessageRequest a graceful end of the session.
Sent by a worker to the runner, which responds by sending BusEndWorkerMessage to each worker.
- Parameters:
reason – Optional human-readable reason for ending.
- reason: str | None = None
- class pipecat.bus.BusFrameMessage(frame: Frame, direction: FrameDirection, bridge: str | None = None, *, source: str, target: str | None = None)[source]
Bases:
BusDataMessageWraps a Pipecat Frame for transport over the bus.
- Parameters:
frame – The Pipecat frame to transport.
direction – Direction the frame should travel in the recipient’s pipeline.
bridge – Optional bridge name for routing in multi-bridge setups.
- direction: FrameDirection
- bridge: str | None = None
- class pipecat.bus.BusDataMessage(*, source: str, target: str | None = None)[source]
Bases:
BusMessageNormal-priority bus message.
Delivered in FIFO order on the subscriber’s data queue.
- class pipecat.bus.BusLocalMessage[source]
Bases:
objectMixin: message stays on the local bus, never forwarded to remote buses.
- class pipecat.bus.BusMessage(*, source: str, target: str | None = None)[source]
Bases:
objectBase class for messages carried by the WorkerBus.
Bus messages are independent of pipeline Frame`s — if a worker needs to ship a frame between pipelines it wraps it in a `BusFrameMessage. Subclasses choose delivery priority by extending
BusDataMessage(normal priority, FIFO) orBusSystemMessage(high priority, delivered ahead of queued data messages).- Parameters:
source – Name of the worker or component that sent this message.
target – Name of the intended recipient worker, or None for broadcast.
- source: str
- target: str | None = None
- class pipecat.bus.BusSubscriber[source]
Bases:
objectMixin for objects that receive messages from an WorkerBus.
Implementors override on_bus_message() to handle incoming messages. Concrete subscribers must provide a
nameproperty (typically inherited fromBaseObject).- property name: str
Unique name identifying this subscriber on the bus.
- async on_bus_message(message: BusMessage) None[source]
Handle an incoming bus message.
- Parameters:
message – The bus message to handle.
- class pipecat.bus.BusSystemMessage(*, source: str, target: str | None = None)[source]
Bases:
BusMessageHigh-priority bus message.
Delivered ahead of any queued
BusDataMessageon the subscriber’s priority queue.
- class pipecat.bus.BusTTSSpeakMessage(text: str, append_to_context: bool | None = None, *, source: str, target: str | None = None)[source]
Bases:
BusDataMessageAsks a PipelineWorker to speak the given text via its TTS service.
On receipt, the worker queues a TTSSpeakFrame into its pipeline. Pipelines without a TTS service let the frame flow through harmlessly.
- Parameters:
text – The text to be spoken.
append_to_context – Whether the spoken text should also be appended to the conversation context (forwarded to TTSSpeakFrame).
- text: str
- append_to_context: bool | None = None
- class pipecat.bus.BusJobCancelMessage(job_id: str, reason: str | None = None, *, source: str, target: str | None = None)[source]
Bases:
BusSystemMessageCancel a running job.
- Parameters:
job_id – The job identifier.
reason – Optional human-readable reason for cancellation.
- job_id: str
- reason: str | None = None
- class pipecat.bus.BusJobRequestMessage(job_id: str, job_name: str | None = None, payload: dict | None = None, *, source: str, target: str | None = None)[source]
Bases:
BusDataMessageRequests a worker worker to start work.
- Parameters:
job_id – Unique identifier for this job.
job_name – Optional job name for routing to named @job handlers.
payload – Optional structured data describing the work.
- job_id: str
- job_name: str | None = None
- payload: dict | None = None
- class pipecat.bus.BusJobResponseMessage(job_id: str, status: JobStatus, response: dict | None = None, *, source: str, target: str | None = None)[source]
Bases:
BusDataMessageResponse from a worker worker when its job completes.
- Parameters:
job_id – The job identifier.
status – Completion status.
response – Optional result data.
- job_id: str
- response: dict | None = None
- class pipecat.bus.BusJobResponseUrgentMessage(job_id: str, status: JobStatus, response: dict | None = None, *, source: str, target: str | None = None)[source]
Bases:
BusSystemMessageHigh-priority job response.
Same semantics as BusJobResponseMessage but delivered with system priority, preempting queued data messages.
- Parameters:
job_id – The job identifier.
status – Completion status.
response – Optional result data.
- job_id: str
- response: dict | None = None
- class pipecat.bus.BusJobStreamDataMessage(job_id: str, data: dict | None = None, *, source: str, target: str | None = None)[source]
Bases:
BusDataMessageA chunk of streaming job data.
- Parameters:
job_id – The job identifier.
data – The chunk payload.
- job_id: str
- data: dict | None = None
- class pipecat.bus.BusJobStreamEndMessage(job_id: str, data: dict | None = None, *, source: str, target: str | None = None)[source]
Bases:
BusDataMessageSignals the end of a streaming job response.
- Parameters:
job_id – The job identifier.
data – Optional final metadata.
- job_id: str
- data: dict | None = None
- class pipecat.bus.BusJobStreamStartMessage(job_id: str, data: dict | None = None, *, source: str, target: str | None = None)[source]
Bases:
BusDataMessageSignals the start of a streaming job response.
- Parameters:
job_id – The job identifier.
data – Optional metadata (e.g. content type).
- job_id: str
- data: dict | None = None
- class pipecat.bus.BusJobUpdateMessage(job_id: str, update: dict | None = None, *, source: str, target: str | None = None)[source]
Bases:
BusDataMessageProgress update from a worker worker.
- Parameters:
job_id – The job identifier.
update – Optional progress data.
- job_id: str
- update: dict | None = None
- class pipecat.bus.BusJobUpdateRequestMessage(job_id: str, *, source: str, target: str | None = None)[source]
Bases:
BusDataMessageRequest a progress update from a worker worker.
- Parameters:
job_id – The job identifier.
- job_id: str
- class pipecat.bus.BusJobUpdateUrgentMessage(job_id: str, update: dict | None = None, *, source: str, target: str | None = None)[source]
Bases:
BusSystemMessageHigh-priority job progress update.
Same semantics as BusJobUpdateMessage but delivered with system priority, preempting queued data messages.
- Parameters:
job_id – The job identifier.
update – Optional progress data.
- job_id: str
- update: dict | None = None
Subpackages
- adapters
- local
- network
- serializers
- ui
Submodules
- bridge_processor
- bus
- messages
BusMessageBusLocalMessageBusDataMessageBusSystemMessageBusFrameMessageBusTTSSpeakMessageBusActivateWorkerMessageBusDeactivateWorkerMessageBusEndMessageBusEndWorkerMessageBusCancelMessageBusCancelWorkerMessageBusAddWorkerMessageBusWorkerRegistryMessageBusWorkerReadyMessageBusWorkerErrorMessageBusWorkerLocalErrorMessageBusJobRequestMessageBusJobResponseMessageBusJobResponseUrgentMessageBusJobUpdateMessageBusJobUpdateUrgentMessageBusJobUpdateRequestMessageBusJobCancelMessageBusJobStreamStartMessageBusJobStreamDataMessageBusJobStreamEndMessage
- queue
- subscriber