messages
Bus message types for inter-worker communication.
Defines the message hierarchy used by the WorkerBus for pub/sub messaging between workers and the runner.
- class pipecat.bus.messages.BusMessage(*, source: str, target: str | None = None)[source]
Bases:
objectBase class for messages carried by the WorkerBus.
Bus messages are independent of pipeline Frame`s — if a worker needs to ship a frame between pipelines it wraps it in a `BusFrameMessage. Subclasses choose delivery priority by extending
BusDataMessage(normal priority, FIFO) orBusSystemMessage(high priority, delivered ahead of queued data messages).- Parameters:
source – Name of the worker or component that sent this message.
target – Name of the intended recipient worker, or None for broadcast.
- source: str
- target: str | None = None
- class pipecat.bus.messages.BusLocalMessage[source]
Bases:
objectMixin: message stays on the local bus, never forwarded to remote buses.
- class pipecat.bus.messages.BusDataMessage(*, source: str, target: str | None = None)[source]
Bases:
BusMessageNormal-priority bus message.
Delivered in FIFO order on the subscriber’s data queue.
- class pipecat.bus.messages.BusSystemMessage(*, source: str, target: str | None = None)[source]
Bases:
BusMessageHigh-priority bus message.
Delivered ahead of any queued
BusDataMessageon the subscriber’s priority queue.
- class pipecat.bus.messages.BusFrameMessage(frame: Frame, direction: FrameDirection, bridge: str | None = None, *, source: str, target: str | None = None)[source]
Bases:
BusDataMessageWraps a Pipecat Frame for transport over the bus.
- Parameters:
frame – The Pipecat frame to transport.
direction – Direction the frame should travel in the recipient’s pipeline.
bridge – Optional bridge name for routing in multi-bridge setups.
- direction: FrameDirection
- bridge: str | None = None
- class pipecat.bus.messages.BusTTSSpeakMessage(text: str, append_to_context: bool | None = None, *, source: str, target: str | None = None)[source]
Bases:
BusDataMessageAsks a PipelineWorker to speak the given text via its TTS service.
On receipt, the worker queues a TTSSpeakFrame into its pipeline. Pipelines without a TTS service let the frame flow through harmlessly.
- Parameters:
text – The text to be spoken.
append_to_context – Whether the spoken text should also be appended to the conversation context (forwarded to TTSSpeakFrame).
- text: str
- append_to_context: bool | None = None
- class pipecat.bus.messages.BusActivateWorkerMessage(args: dict | None = None, *, source: str, target: str | None = None)[source]
Bases:
BusDataMessageTells a targeted worker to become active and start processing.
- Parameters:
args – Optional activation arguments forwarded to on_activated.
- args: dict | None = None
- class pipecat.bus.messages.BusDeactivateWorkerMessage(*, source: str, target: str | None = None)[source]
Bases:
BusDataMessageTells a targeted worker to become inactive and stop processing.
- class pipecat.bus.messages.BusEndMessage(reason: str | None = None, *, source: str, target: str | None = None)[source]
Bases:
BusDataMessageRequest a graceful end of the session.
Sent by a worker to the runner, which responds by sending BusEndWorkerMessage to each worker.
- Parameters:
reason – Optional human-readable reason for ending.
- reason: str | None = None
- class pipecat.bus.messages.BusEndWorkerMessage(reason: str | None = None, *, source: str, target: str | None = None)[source]
Bases:
BusDataMessageTells a targeted worker to end its pipeline gracefully.
Sent by the runner to individual workers during shutdown.
- Parameters:
reason – Optional human-readable reason for ending.
- reason: str | None = None
- class pipecat.bus.messages.BusCancelMessage(reason: str | None = None, *, source: str, target: str | None = None)[source]
Bases:
BusSystemMessageRequest a hard cancel of the session.
Sent by a worker to the runner, which responds by sending BusCancelWorkerMessage to each worker.
- Parameters:
reason – Optional human-readable reason for the cancellation.
- reason: str | None = None
- class pipecat.bus.messages.BusCancelWorkerMessage(reason: str | None = None, *, source: str, target: str | None = None)[source]
Bases:
BusSystemMessageTells a targeted worker to cancel its pipeline.
Sent by the runner to individual workers during cancellation.
- Parameters:
reason – Optional human-readable reason for the cancellation.
- reason: str | None = None
- class pipecat.bus.messages.BusAddWorkerMessage(worker: BaseWorker, *, source: str, target: str | None = None)[source]
Bases:
BusSystemMessage,BusLocalMessageRequest to add a worker to the local runner.
Local-only: carries an in-memory worker reference that cannot be serialized over the network.
- Parameters:
worker – The worker instance to add.
- worker: BaseWorker
- class pipecat.bus.messages.BusWorkerRegistryMessage(runner: str, workers: list[WorkerRegistryEntry], *, source: str, target: str | None = None)[source]
Bases:
BusSystemMessageSnapshot of workers managed by a runner.
Sent by the runner on startup and when new runners connect, so that remote runners can discover each other’s workers.
- Parameters:
runner – Name of the runner that owns these workers.
workers – List of worker entries with their state.
- runner: str
- workers: list[WorkerRegistryEntry]
- class pipecat.bus.messages.BusWorkerReadyMessage(runner: str, parent: str | None = None, active: bool = False, bridged: bool = False, started_at: float | None = None, *, source: str, target: str | None = None)[source]
Bases:
BusDataMessageAnnounces that a worker is ready.
Sent when any worker (root or child) becomes ready. Carries the worker’s parent name so observers can reconstruct the full hierarchy.
- Parameters:
runner – Name of the runner managing this worker.
parent – Name of the parent worker, or None for root workers.
active – Whether the worker started active.
bridged – Whether the worker is bridged (receives pipeline frames from the bus).
started_at – Unix timestamp when the worker became ready.
- runner: str
- parent: str | None = None
- active: bool = False
- bridged: bool = False
- started_at: float | None = None
- class pipecat.bus.messages.BusWorkerErrorMessage(error: str, *, source: str, target: str | None = None)[source]
Bases:
BusSystemMessageReports an error from a root worker.
Sent over the network so remote workers can react. For child worker errors, see BusWorkerLocalErrorMessage.
- Parameters:
error – Description of the error.
- error: str
- class pipecat.bus.messages.BusWorkerLocalErrorMessage(error: str, *, source: str, target: str | None = None)[source]
Bases:
BusSystemMessage,BusLocalMessageReports an error from a child worker to its parent.
Local-only: never crosses the network. The parent receives it via on_worker_failed().
- Parameters:
error – Description of the error.
- error: str
- class pipecat.bus.messages.BusJobRequestMessage(job_id: str, job_name: str | None = None, payload: dict | None = None, *, source: str, target: str | None = None)[source]
Bases:
BusDataMessageRequests a worker worker to start work.
- Parameters:
job_id – Unique identifier for this job.
job_name – Optional job name for routing to named @job handlers.
payload – Optional structured data describing the work.
- job_id: str
- job_name: str | None = None
- payload: dict | None = None
- class pipecat.bus.messages.BusJobResponseMessage(job_id: str, status: JobStatus, response: dict | None = None, *, source: str, target: str | None = None)[source]
Bases:
BusDataMessageResponse from a worker worker when its job completes.
- Parameters:
job_id – The job identifier.
status – Completion status.
response – Optional result data.
- job_id: str
- response: dict | None = None
- class pipecat.bus.messages.BusJobResponseUrgentMessage(job_id: str, status: JobStatus, response: dict | None = None, *, source: str, target: str | None = None)[source]
Bases:
BusSystemMessageHigh-priority job response.
Same semantics as BusJobResponseMessage but delivered with system priority, preempting queued data messages.
- Parameters:
job_id – The job identifier.
status – Completion status.
response – Optional result data.
- job_id: str
- response: dict | None = None
- class pipecat.bus.messages.BusJobUpdateMessage(job_id: str, update: dict | None = None, *, source: str, target: str | None = None)[source]
Bases:
BusDataMessageProgress update from a worker worker.
- Parameters:
job_id – The job identifier.
update – Optional progress data.
- job_id: str
- update: dict | None = None
- class pipecat.bus.messages.BusJobUpdateUrgentMessage(job_id: str, update: dict | None = None, *, source: str, target: str | None = None)[source]
Bases:
BusSystemMessageHigh-priority job progress update.
Same semantics as BusJobUpdateMessage but delivered with system priority, preempting queued data messages.
- Parameters:
job_id – The job identifier.
update – Optional progress data.
- job_id: str
- update: dict | None = None
- class pipecat.bus.messages.BusJobUpdateRequestMessage(job_id: str, *, source: str, target: str | None = None)[source]
Bases:
BusDataMessageRequest a progress update from a worker worker.
- Parameters:
job_id – The job identifier.
- job_id: str
- class pipecat.bus.messages.BusJobCancelMessage(job_id: str, reason: str | None = None, *, source: str, target: str | None = None)[source]
Bases:
BusSystemMessageCancel a running job.
- Parameters:
job_id – The job identifier.
reason – Optional human-readable reason for cancellation.
- job_id: str
- reason: str | None = None
- class pipecat.bus.messages.BusJobStreamStartMessage(job_id: str, data: dict | None = None, *, source: str, target: str | None = None)[source]
Bases:
BusDataMessageSignals the start of a streaming job response.
- Parameters:
job_id – The job identifier.
data – Optional metadata (e.g. content type).
- job_id: str
- data: dict | None = None
- class pipecat.bus.messages.BusJobStreamDataMessage(job_id: str, data: dict | None = None, *, source: str, target: str | None = None)[source]
Bases:
BusDataMessageA chunk of streaming job data.
- Parameters:
job_id – The job identifier.
data – The chunk payload.
- job_id: str
- data: dict | None = None
- class pipecat.bus.messages.BusJobStreamEndMessage(job_id: str, data: dict | None = None, *, source: str, target: str | None = None)[source]
Bases:
BusDataMessageSignals the end of a streaming job response.
- Parameters:
job_id – The job identifier.
data – Optional final metadata.
- job_id: str
- data: dict | None = None