messages

Bus message types for inter-worker communication.

Defines the message hierarchy used by the WorkerBus for pub/sub messaging between workers and the runner.

class pipecat.bus.messages.BusMessage(*, source: str, target: str | None = None)[source]

Bases: object

Base class for messages carried by the WorkerBus.

Bus messages are independent of pipeline Frame`s — if a worker needs to ship a frame between pipelines it wraps it in a `BusFrameMessage. Subclasses choose delivery priority by extending BusDataMessage (normal priority, FIFO) or BusSystemMessage (high priority, delivered ahead of queued data messages).

Parameters:
  • source – Name of the worker or component that sent this message.

  • target – Name of the intended recipient worker, or None for broadcast.

source: str
target: str | None = None
class pipecat.bus.messages.BusLocalMessage[source]

Bases: object

Mixin: message stays on the local bus, never forwarded to remote buses.

class pipecat.bus.messages.BusDataMessage(*, source: str, target: str | None = None)[source]

Bases: BusMessage

Normal-priority bus message.

Delivered in FIFO order on the subscriber’s data queue.

class pipecat.bus.messages.BusSystemMessage(*, source: str, target: str | None = None)[source]

Bases: BusMessage

High-priority bus message.

Delivered ahead of any queued BusDataMessage on the subscriber’s priority queue.

class pipecat.bus.messages.BusFrameMessage(frame: Frame, direction: FrameDirection, bridge: str | None = None, *, source: str, target: str | None = None)[source]

Bases: BusDataMessage

Wraps a Pipecat Frame for transport over the bus.

Parameters:
  • frame – The Pipecat frame to transport.

  • direction – Direction the frame should travel in the recipient’s pipeline.

  • bridge – Optional bridge name for routing in multi-bridge setups.

frame: Frame
direction: FrameDirection
bridge: str | None = None
class pipecat.bus.messages.BusTTSSpeakMessage(text: str, append_to_context: bool | None = None, *, source: str, target: str | None = None)[source]

Bases: BusDataMessage

Asks a PipelineWorker to speak the given text via its TTS service.

On receipt, the worker queues a TTSSpeakFrame into its pipeline. Pipelines without a TTS service let the frame flow through harmlessly.

Parameters:
  • text – The text to be spoken.

  • append_to_context – Whether the spoken text should also be appended to the conversation context (forwarded to TTSSpeakFrame).

text: str
append_to_context: bool | None = None
class pipecat.bus.messages.BusActivateWorkerMessage(args: dict | None = None, *, source: str, target: str | None = None)[source]

Bases: BusDataMessage

Tells a targeted worker to become active and start processing.

Parameters:

args – Optional activation arguments forwarded to on_activated.

args: dict | None = None
class pipecat.bus.messages.BusDeactivateWorkerMessage(*, source: str, target: str | None = None)[source]

Bases: BusDataMessage

Tells a targeted worker to become inactive and stop processing.

class pipecat.bus.messages.BusEndMessage(reason: str | None = None, *, source: str, target: str | None = None)[source]

Bases: BusDataMessage

Request a graceful end of the session.

Sent by a worker to the runner, which responds by sending BusEndWorkerMessage to each worker.

Parameters:

reason – Optional human-readable reason for ending.

reason: str | None = None
class pipecat.bus.messages.BusEndWorkerMessage(reason: str | None = None, *, source: str, target: str | None = None)[source]

Bases: BusDataMessage

Tells a targeted worker to end its pipeline gracefully.

Sent by the runner to individual workers during shutdown.

Parameters:

reason – Optional human-readable reason for ending.

reason: str | None = None
class pipecat.bus.messages.BusCancelMessage(reason: str | None = None, *, source: str, target: str | None = None)[source]

Bases: BusSystemMessage

Request a hard cancel of the session.

Sent by a worker to the runner, which responds by sending BusCancelWorkerMessage to each worker.

Parameters:

reason – Optional human-readable reason for the cancellation.

reason: str | None = None
class pipecat.bus.messages.BusCancelWorkerMessage(reason: str | None = None, *, source: str, target: str | None = None)[source]

Bases: BusSystemMessage

Tells a targeted worker to cancel its pipeline.

Sent by the runner to individual workers during cancellation.

Parameters:

reason – Optional human-readable reason for the cancellation.

reason: str | None = None
class pipecat.bus.messages.BusAddWorkerMessage(worker: BaseWorker, *, source: str, target: str | None = None)[source]

Bases: BusSystemMessage, BusLocalMessage

Request to add a worker to the local runner.

Local-only: carries an in-memory worker reference that cannot be serialized over the network.

Parameters:

worker – The worker instance to add.

worker: BaseWorker
class pipecat.bus.messages.BusWorkerRegistryMessage(runner: str, workers: list[WorkerRegistryEntry], *, source: str, target: str | None = None)[source]

Bases: BusSystemMessage

Snapshot of workers managed by a runner.

Sent by the runner on startup and when new runners connect, so that remote runners can discover each other’s workers.

Parameters:
  • runner – Name of the runner that owns these workers.

  • workers – List of worker entries with their state.

runner: str
workers: list[WorkerRegistryEntry]
class pipecat.bus.messages.BusWorkerReadyMessage(runner: str, parent: str | None = None, active: bool = False, bridged: bool = False, started_at: float | None = None, *, source: str, target: str | None = None)[source]

Bases: BusDataMessage

Announces that a worker is ready.

Sent when any worker (root or child) becomes ready. Carries the worker’s parent name so observers can reconstruct the full hierarchy.

Parameters:
  • runner – Name of the runner managing this worker.

  • parent – Name of the parent worker, or None for root workers.

  • active – Whether the worker started active.

  • bridged – Whether the worker is bridged (receives pipeline frames from the bus).

  • started_at – Unix timestamp when the worker became ready.

runner: str
parent: str | None = None
active: bool = False
bridged: bool = False
started_at: float | None = None
class pipecat.bus.messages.BusWorkerErrorMessage(error: str, *, source: str, target: str | None = None)[source]

Bases: BusSystemMessage

Reports an error from a root worker.

Sent over the network so remote workers can react. For child worker errors, see BusWorkerLocalErrorMessage.

Parameters:

error – Description of the error.

error: str
class pipecat.bus.messages.BusWorkerLocalErrorMessage(error: str, *, source: str, target: str | None = None)[source]

Bases: BusSystemMessage, BusLocalMessage

Reports an error from a child worker to its parent.

Local-only: never crosses the network. The parent receives it via on_worker_failed().

Parameters:

error – Description of the error.

error: str
class pipecat.bus.messages.BusJobRequestMessage(job_id: str, job_name: str | None = None, payload: dict | None = None, *, source: str, target: str | None = None)[source]

Bases: BusDataMessage

Requests a worker worker to start work.

Parameters:
  • job_id – Unique identifier for this job.

  • job_name – Optional job name for routing to named @job handlers.

  • payload – Optional structured data describing the work.

job_id: str
job_name: str | None = None
payload: dict | None = None
class pipecat.bus.messages.BusJobResponseMessage(job_id: str, status: JobStatus, response: dict | None = None, *, source: str, target: str | None = None)[source]

Bases: BusDataMessage

Response from a worker worker when its job completes.

Parameters:
  • job_id – The job identifier.

  • status – Completion status.

  • response – Optional result data.

job_id: str
status: JobStatus
response: dict | None = None
class pipecat.bus.messages.BusJobResponseUrgentMessage(job_id: str, status: JobStatus, response: dict | None = None, *, source: str, target: str | None = None)[source]

Bases: BusSystemMessage

High-priority job response.

Same semantics as BusJobResponseMessage but delivered with system priority, preempting queued data messages.

Parameters:
  • job_id – The job identifier.

  • status – Completion status.

  • response – Optional result data.

job_id: str
status: JobStatus
response: dict | None = None
class pipecat.bus.messages.BusJobUpdateMessage(job_id: str, update: dict | None = None, *, source: str, target: str | None = None)[source]

Bases: BusDataMessage

Progress update from a worker worker.

Parameters:
  • job_id – The job identifier.

  • update – Optional progress data.

job_id: str
update: dict | None = None
class pipecat.bus.messages.BusJobUpdateUrgentMessage(job_id: str, update: dict | None = None, *, source: str, target: str | None = None)[source]

Bases: BusSystemMessage

High-priority job progress update.

Same semantics as BusJobUpdateMessage but delivered with system priority, preempting queued data messages.

Parameters:
  • job_id – The job identifier.

  • update – Optional progress data.

job_id: str
update: dict | None = None
class pipecat.bus.messages.BusJobUpdateRequestMessage(job_id: str, *, source: str, target: str | None = None)[source]

Bases: BusDataMessage

Request a progress update from a worker worker.

Parameters:

job_id – The job identifier.

job_id: str
class pipecat.bus.messages.BusJobCancelMessage(job_id: str, reason: str | None = None, *, source: str, target: str | None = None)[source]

Bases: BusSystemMessage

Cancel a running job.

Parameters:
  • job_id – The job identifier.

  • reason – Optional human-readable reason for cancellation.

job_id: str
reason: str | None = None
class pipecat.bus.messages.BusJobStreamStartMessage(job_id: str, data: dict | None = None, *, source: str, target: str | None = None)[source]

Bases: BusDataMessage

Signals the start of a streaming job response.

Parameters:
  • job_id – The job identifier.

  • data – Optional metadata (e.g. content type).

job_id: str
data: dict | None = None
class pipecat.bus.messages.BusJobStreamDataMessage(job_id: str, data: dict | None = None, *, source: str, target: str | None = None)[source]

Bases: BusDataMessage

A chunk of streaming job data.

Parameters:
  • job_id – The job identifier.

  • data – The chunk payload.

job_id: str
data: dict | None = None
class pipecat.bus.messages.BusJobStreamEndMessage(job_id: str, data: dict | None = None, *, source: str, target: str | None = None)[source]

Bases: BusDataMessage

Signals the end of a streaming job response.

Parameters:
  • job_id – The job identifier.

  • data – Optional final metadata.

job_id: str
data: dict | None = None