bridge_processor

Bus bridge and edge processors for inter-worker frame routing.

Provides:

  • BusBridgeProcessor: a mid-pipeline processor that exchanges frames with other workers through the bus, consuming local frames.

  • _BusEdgeProcessor: a pipeline-edge processor used internally by PipelineWorker when bridged is set. Tees frames between the local pipeline and the bus (frames continue locally and are also forwarded to the bus).

class pipecat.bus.bridge_processor.BusBridgeProcessor(*, bus: WorkerBus, worker_name: str, target_task: str | None = None, bridge: str | None = None, exclude_frames: tuple[type[Frame], ...] | None = None, **kwargs)[source]

Bases: FrameProcessor, BusSubscriber

Bidirectional mid-pipeline bridge between a Pipecat pipeline and the bus.

Placed in a transport or session worker’s pipeline to exchange frames with other workers via the WorkerBus. Lifecycle and excluded frames pass through locally without crossing the bus.

__init__(*, bus: WorkerBus, worker_name: str, target_task: str | None = None, bridge: str | None = None, exclude_frames: tuple[type[Frame], ...] | None = None, **kwargs)[source]

Initialize the BusBridgeProcessor.

Parameters:
  • bus – The WorkerBus to exchange frames with.

  • worker_name – Name of the owning worker, used as message source.

  • target_task – When set, only exchange frames with this worker.

  • bridge – Optional bridge name for routing. When set, outgoing frames are tagged with this name and only incoming frames with the same bridge name are accepted.

  • exclude_frames – Extra frame types that should never cross the bus (on top of lifecycle frames which are always excluded).

  • **kwargs – Additional arguments passed to FrameProcessor.

async setup(setup: FrameProcessorSetup)[source]

Subscribe to the bus during processor setup.

async cleanup()[source]

Unsubscribe from the bus on cleanup.

async process_frame(frame: Frame, direction: FrameDirection)[source]

Process a frame: send to bus, or pass through locally if excluded.

Parameters:
  • frame – The frame to process.

  • direction – The direction the frame is traveling.

async on_bus_message(message: BusMessage) None[source]

Handle an incoming bus message by pushing its frame into the pipeline.

Parameters:

message – The bus message to handle.