Source code for pipecat.bus.messages

#
# Copyright (c) 2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""Bus message types for inter-worker communication.

Defines the message hierarchy used by the `WorkerBus` for pub/sub messaging
between workers and the runner.
"""

from __future__ import annotations

from dataclasses import dataclass
from typing import TYPE_CHECKING

from pipecat.frames.frames import Frame
from pipecat.processors.frame_processor import FrameDirection
from pipecat.registry.types import WorkerRegistryEntry

if TYPE_CHECKING:
    from pipecat.pipeline.job_context import JobStatus
    from pipecat.workers.base_worker import BaseWorker

# ---------------------------------------------------------------------------
# Base types and mixins
# ---------------------------------------------------------------------------


[docs] @dataclass(kw_only=True) class BusMessage: """Base class for messages carried by the `WorkerBus`. Bus messages are independent of pipeline `Frame`s — if a worker needs to ship a frame between pipelines it wraps it in a `BusFrameMessage`. Subclasses choose delivery priority by extending :class:`BusDataMessage` (normal priority, FIFO) or :class:`BusSystemMessage` (high priority, delivered ahead of queued data messages). Parameters: source: Name of the worker or component that sent this message. target: Name of the intended recipient worker, or None for broadcast. """ source: str target: str | None = None def __str__(self): return f"{type(self).__name__} (source={self.source}, target={self.target})"
[docs] class BusLocalMessage: """Mixin: message stays on the local bus, never forwarded to remote buses.""" pass
[docs] @dataclass(kw_only=True) class BusDataMessage(BusMessage): """Normal-priority bus message. Delivered in FIFO order on the subscriber's data queue. """ pass
[docs] @dataclass(kw_only=True) class BusSystemMessage(BusMessage): """High-priority bus message. Delivered ahead of any queued :class:`BusDataMessage` on the subscriber's priority queue. """ pass
# --------------------------------------------------------------------------- # Frame transport # ---------------------------------------------------------------------------
[docs] @dataclass class BusFrameMessage(BusDataMessage): """Wraps a Pipecat `Frame` for transport over the bus. Parameters: frame: The Pipecat frame to transport. direction: Direction the frame should travel in the recipient's pipeline. bridge: Optional bridge name for routing in multi-bridge setups. """ frame: Frame direction: FrameDirection bridge: str | None = None
# --------------------------------------------------------------------------- # Pipeline commands # ---------------------------------------------------------------------------
[docs] @dataclass class BusTTSSpeakMessage(BusDataMessage): """Asks a `PipelineWorker` to speak the given text via its TTS service. On receipt, the worker queues a `TTSSpeakFrame` into its pipeline. Pipelines without a TTS service let the frame flow through harmlessly. Parameters: text: The text to be spoken. append_to_context: Whether the spoken text should also be appended to the conversation context (forwarded to `TTSSpeakFrame`). """ text: str append_to_context: bool | None = None
# --------------------------------------------------------------------------- # Worker lifecycle # ---------------------------------------------------------------------------
[docs] @dataclass class BusActivateWorkerMessage(BusDataMessage): """Tells a targeted worker to become active and start processing. Parameters: args: Optional activation arguments forwarded to `on_activated`. """ args: dict | None = None
[docs] @dataclass class BusDeactivateWorkerMessage(BusDataMessage): """Tells a targeted worker to become inactive and stop processing.""" pass
[docs] @dataclass class BusEndMessage(BusDataMessage): """Request a graceful end of the session. Sent by a worker to the runner, which responds by sending `BusEndWorkerMessage` to each worker. Parameters: reason: Optional human-readable reason for ending. """ reason: str | None = None
[docs] @dataclass class BusEndWorkerMessage(BusDataMessage): """Tells a targeted worker to end its pipeline gracefully. Sent by the runner to individual workers during shutdown. Parameters: reason: Optional human-readable reason for ending. """ reason: str | None = None
[docs] @dataclass class BusCancelMessage(BusSystemMessage): """Request a hard cancel of the session. Sent by a worker to the runner, which responds by sending `BusCancelWorkerMessage` to each worker. Parameters: reason: Optional human-readable reason for the cancellation. """ reason: str | None = None
[docs] @dataclass class BusCancelWorkerMessage(BusSystemMessage): """Tells a targeted worker to cancel its pipeline. Sent by the runner to individual workers during cancellation. Parameters: reason: Optional human-readable reason for the cancellation. """ reason: str | None = None
# --------------------------------------------------------------------------- # Worker registry and errors # ---------------------------------------------------------------------------
[docs] @dataclass class BusAddWorkerMessage(BusSystemMessage, BusLocalMessage): """Request to add a worker to the local runner. Local-only: carries an in-memory worker reference that cannot be serialized over the network. Parameters: worker: The worker instance to add. """ worker: BaseWorker
[docs] @dataclass class BusWorkerRegistryMessage(BusSystemMessage): """Snapshot of workers managed by a runner. Sent by the runner on startup and when new runners connect, so that remote runners can discover each other's workers. Parameters: runner: Name of the runner that owns these workers. workers: List of worker entries with their state. """ runner: str workers: list[WorkerRegistryEntry]
[docs] @dataclass class BusWorkerReadyMessage(BusDataMessage): """Announces that a worker is ready. Sent when any worker (root or child) becomes ready. Carries the worker's parent name so observers can reconstruct the full hierarchy. Parameters: runner: Name of the runner managing this worker. parent: Name of the parent worker, or None for root workers. active: Whether the worker started active. bridged: Whether the worker is bridged (receives pipeline frames from the bus). started_at: Unix timestamp when the worker became ready. """ runner: str parent: str | None = None active: bool = False bridged: bool = False started_at: float | None = None
[docs] @dataclass class BusWorkerErrorMessage(BusSystemMessage): """Reports an error from a root worker. Sent over the network so remote workers can react. For child worker errors, see `BusWorkerLocalErrorMessage`. Parameters: error: Description of the error. """ error: str
[docs] @dataclass class BusWorkerLocalErrorMessage(BusSystemMessage, BusLocalMessage): """Reports an error from a child worker to its parent. Local-only: never crosses the network. The parent receives it via `on_worker_failed()`. Parameters: error: Description of the error. """ error: str
# --------------------------------------------------------------------------- # Jobs # ---------------------------------------------------------------------------
[docs] @dataclass class BusJobRequestMessage(BusDataMessage): """Requests a worker worker to start work. Parameters: job_id: Unique identifier for this job. job_name: Optional job name for routing to named `@job` handlers. payload: Optional structured data describing the work. """ job_id: str job_name: str | None = None payload: dict | None = None
[docs] @dataclass class BusJobResponseMessage(BusDataMessage): """Response from a worker worker when its job completes. Parameters: job_id: The job identifier. status: Completion status. response: Optional result data. """ job_id: str status: JobStatus response: dict | None = None
[docs] @dataclass class BusJobResponseUrgentMessage(BusSystemMessage): """High-priority job response. Same semantics as `BusJobResponseMessage` but delivered with system priority, preempting queued data messages. Parameters: job_id: The job identifier. status: Completion status. response: Optional result data. """ job_id: str status: JobStatus response: dict | None = None
[docs] @dataclass class BusJobUpdateMessage(BusDataMessage): """Progress update from a worker worker. Parameters: job_id: The job identifier. update: Optional progress data. """ job_id: str update: dict | None = None
[docs] @dataclass class BusJobUpdateUrgentMessage(BusSystemMessage): """High-priority job progress update. Same semantics as `BusJobUpdateMessage` but delivered with system priority, preempting queued data messages. Parameters: job_id: The job identifier. update: Optional progress data. """ job_id: str update: dict | None = None
[docs] @dataclass class BusJobUpdateRequestMessage(BusDataMessage): """Request a progress update from a worker worker. Parameters: job_id: The job identifier. """ job_id: str
[docs] @dataclass class BusJobCancelMessage(BusSystemMessage): """Cancel a running job. Parameters: job_id: The job identifier. reason: Optional human-readable reason for cancellation. """ job_id: str reason: str | None = None
# --------------------------------------------------------------------------- # Job streaming # ---------------------------------------------------------------------------
[docs] @dataclass class BusJobStreamStartMessage(BusDataMessage): """Signals the start of a streaming job response. Parameters: job_id: The job identifier. data: Optional metadata (e.g. content type). """ job_id: str data: dict | None = None
[docs] @dataclass class BusJobStreamDataMessage(BusDataMessage): """A chunk of streaming job data. Parameters: job_id: The job identifier. data: The chunk payload. """ job_id: str data: dict | None = None
[docs] @dataclass class BusJobStreamEndMessage(BusDataMessage): """Signals the end of a streaming job response. Parameters: job_id: The job identifier. data: Optional final metadata. """ job_id: str data: dict | None = None