bus

Abstract worker bus for inter-worker pub/sub messaging.

Provides the abstract WorkerBus base class. Concrete implementations (e.g. AsyncQueueBus) live in separate modules.

class pipecat.bus.bus.BusSubscription(subscriber: BusSubscriber, queue: BusMessageQueue = <factory>, data_queue: Queue = <factory>, router_task: Task | None = None, data_task: Task | None = None)[source]

Bases: object

A single subscriber’s state on the bus.

Parameters:
  • subscriber – The subscriber receiving messages.

  • queue – Priority queue for incoming messages.

  • data_queue – Secondary queue for data messages dispatched by the router worker.

  • router_task – Task that reads from the priority queue, handles system messages inline, and routes data messages to the data queue.

  • data_task – Task that processes data messages sequentially from the data queue.

subscriber: BusSubscriber
queue: BusMessageQueue
data_queue: Queue
router_task: Task | None = None
data_task: Task | None = None
class pipecat.bus.bus.WorkerBus(**kwargs)[source]

Bases: BaseObject

Abstract base for inter-worker and runner-worker communication.

Provides pub/sub messaging where each subscriber receives messages independently through its own priority queue. System messages (e.g. cancel) are delivered before normal data messages.

Subclasses implement publish() for the specific transport. send() handles local-only messages automatically. For network buses, override start()/stop() to manage connections and call on_message_received() when messages arrive from the network.

__init__(**kwargs)[source]

Initialize the WorkerBus.

Parameters:

**kwargs – Additional arguments passed to BaseObject.

async start()[source]

Start dispatch tasks for all registered subscribers.

async stop()[source]

Stop all dispatch tasks.

async subscribe(subscriber: BusSubscriber) None[source]

Register a subscriber to receive messages from the bus.

Idempotent: re-subscribing an already-registered subscriber is a no-op.

Parameters:

subscriber – The BusSubscriber to register.

async unsubscribe(subscriber: BusSubscriber) None[source]

Remove a subscriber and cancel its dispatch tasks.

Parameters:

subscriber – The BusSubscriber to remove.

async send(message: BusMessage) None[source]

Send a message through the bus.

Local-only messages are delivered directly to subscribers. All other messages are passed to publish() for transport.

Parameters:

message – The bus message to send.

abstractmethod async publish(message: BusMessage) None[source]

Publish a message to the transport.

Subclasses implement this for the specific transport. Called by send() after filtering local-only messages.

Parameters:

message – The bus message to publish.

on_message_received(message: BusMessage) None[source]

Deliver a message to all local subscribers via their priority queues.

Called by bus implementations when a message arrives (either from a local send() or from a network transport).