Source code for pipecat.bus.bridge_processor

#
# Copyright (c) 2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""Bus bridge and edge processors for inter-worker frame routing.

Provides:

- `BusBridgeProcessor`: a mid-pipeline processor that exchanges frames
  with other workers through the bus, consuming local frames.
- `_BusEdgeProcessor`: a pipeline-edge processor used internally by
  `PipelineWorker` when ``bridged`` is set. Tees frames between the local
  pipeline and the bus (frames continue locally and are also forwarded
  to the bus).
"""

from typing import TYPE_CHECKING

from pipecat.bus.bus import WorkerBus
from pipecat.bus.messages import BusFrameMessage, BusMessage
from pipecat.bus.subscriber import BusSubscriber
from pipecat.frames.frames import (
    CancelFrame,
    EndFrame,
    Frame,
    OutputTransportMessageUrgentFrame,
    StartFrame,
    StopFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, FrameProcessorSetup

if TYPE_CHECKING:
    from pipecat.workers.base_worker import BaseWorker

_LIFECYCLE_FRAMES = (StartFrame, EndFrame, CancelFrame, StopFrame)
_PASSTHROUGH_FRAMES = (OutputTransportMessageUrgentFrame,)


[docs] class BusBridgeProcessor(FrameProcessor, BusSubscriber): """Bidirectional mid-pipeline bridge between a Pipecat pipeline and the bus. Placed in a transport or session worker's pipeline to exchange frames with other workers via the `WorkerBus`. Lifecycle and excluded frames pass through locally without crossing the bus. """
[docs] def __init__( self, *, bus: WorkerBus, worker_name: str, target_task: str | None = None, bridge: str | None = None, exclude_frames: tuple[type[Frame], ...] | None = None, **kwargs, ): """Initialize the BusBridgeProcessor. Args: bus: The `WorkerBus` to exchange frames with. worker_name: Name of the owning worker, used as message source. target_task: When set, only exchange frames with this worker. bridge: Optional bridge name for routing. When set, outgoing frames are tagged with this name and only incoming frames with the same bridge name are accepted. exclude_frames: Extra frame types that should never cross the bus (on top of lifecycle frames which are always excluded). **kwargs: Additional arguments passed to `FrameProcessor`. """ super().__init__(**kwargs) self._bus = bus self._worker_name = worker_name self._target_task = target_task self._bridge = bridge self._exclude_frames = exclude_frames or ()
[docs] async def setup(self, setup: FrameProcessorSetup): """Subscribe to the bus during processor setup.""" await super().setup(setup) await self._bus.subscribe(self)
[docs] async def cleanup(self): """Unsubscribe from the bus on cleanup.""" await super().cleanup() await self._bus.unsubscribe(self)
[docs] async def process_frame(self, frame: Frame, direction: FrameDirection): """Process a frame: send to bus, or pass through locally if excluded. Args: frame: The frame to process. direction: The direction the frame is traveling. """ await super().process_frame(frame, direction) # Lifecycle frames never cross the bus if isinstance(frame, _LIFECYCLE_FRAMES): await self.push_frame(frame, direction) return # Urgent transport frames pass through directly. They need to # reach the transport even when no child worker is active yet. if isinstance(frame, _PASSTHROUGH_FRAMES): await self.push_frame(frame, direction) return # Excluded frames never cross the bus if self._exclude_frames and isinstance(frame, self._exclude_frames): await self.push_frame(frame, direction) return # Send to bus msg = BusFrameMessage( source=self._worker_name, frame=frame, direction=direction, bridge=self._bridge, ) await self._bus.send(msg)
[docs] async def on_bus_message(self, message: BusMessage) -> None: """Handle an incoming bus message by pushing its frame into the pipeline. Args: message: The bus message to handle. """ if not isinstance(message, BusFrameMessage): return # Skip own frames if message.source == self._worker_name: return # Filter by bridge name if self._bridge and message.bridge != self._bridge: return # If target_task set, only accept from that worker if self._target_task and message.source != self._target_task: return # If message targeted at someone else, skip if message.target and message.target != self._worker_name: return await self.push_frame(message.frame, message.direction)
class _BusEdgeProcessor(FrameProcessor, BusSubscriber): """Pipeline-edge tee between a local pipeline and the bus. Placed by `PipelineWorker` at the source and sink of a bridged pipeline. Frames always continue through the local pipeline; in addition, frames travelling in ``direction`` are forwarded to the bus, and frames received from the bus in the opposite direction are injected into the pipeline. """ def __init__( self, *, worker: "BaseWorker", direction: FrameDirection, bridges: tuple[str, ...] = (), exclude_frames: tuple[type[Frame], ...] | None = None, **kwargs, ): """Initialize the edge processor. Args: worker: The owning worker; the edge reads ``worker.bus`` lazily so the bus only needs to be set (via :meth:`BaseWorker.attach`) by the time the processor is set up. ``worker.name`` is the message source and ``worker.active`` gates inbound frames. direction: Direction this edge captures and forwards to the bus. Inbound frames from the bus travelling in the opposite direction are injected here. bridges: Bridge names this edge accepts. Empty tuple accepts frames from all bridges. exclude_frames: Extra frame types that should never cross the bus (lifecycle frames are always excluded). **kwargs: Additional arguments passed to ``FrameProcessor``. """ super().__init__(**kwargs) self._task = worker self._direction = direction self._bridges = bridges self._exclude_frames = exclude_frames or () async def setup(self, setup: FrameProcessorSetup): """Subscribe to the bus during processor setup.""" await super().setup(setup) await self._task.bus.subscribe(self) async def cleanup(self): """Unsubscribe from the bus on cleanup.""" await super().cleanup() await self._task.bus.unsubscribe(self) async def process_frame(self, frame: Frame, direction: FrameDirection): """Pass the frame through locally and forward matching ones to the bus.""" await super().process_frame(frame, direction) await self.push_frame(frame, direction) if direction != self._direction: return if isinstance(frame, _LIFECYCLE_FRAMES): return if self._exclude_frames and isinstance(frame, self._exclude_frames): return await self._task.bus.send( BusFrameMessage(source=self._task.name, frame=frame, direction=direction) ) async def on_bus_message(self, message: BusMessage) -> None: """Inject incoming bus frames into the pipeline.""" if not isinstance(message, BusFrameMessage): return if message.source == self._task.name: return if message.direction == self._direction: return if not self._task.active: return if message.target and message.target != self._task.name: return if self._bridges and message.bridge not in self._bridges: return await self.push_frame(message.frame, message.direction)