bus
Abstract worker bus for inter-worker pub/sub messaging.
Provides the abstract WorkerBus base class. Concrete implementations (e.g. AsyncQueueBus) live in separate modules.
- class pipecat.bus.bus.BusSubscription(subscriber: BusSubscriber, queue: BusMessageQueue = <factory>, data_queue: Queue = <factory>, router_task: Task | None = None, data_task: Task | None = None)[source]
Bases:
objectA single subscriber’s state on the bus.
- Parameters:
subscriber – The subscriber receiving messages.
queue – Priority queue for incoming messages.
data_queue – Secondary queue for data messages dispatched by the router worker.
router_task – Task that reads from the priority queue, handles system messages inline, and routes data messages to the data queue.
data_task – Task that processes data messages sequentially from the data queue.
- subscriber: BusSubscriber
- queue: BusMessageQueue
- data_queue: Queue
- router_task: Task | None = None
- data_task: Task | None = None
- class pipecat.bus.bus.WorkerBus(**kwargs)[source]
Bases:
BaseObjectAbstract base for inter-worker and runner-worker communication.
Provides pub/sub messaging where each subscriber receives messages independently through its own priority queue. System messages (e.g. cancel) are delivered before normal data messages.
Subclasses implement
publish()for the specific transport.send()handles local-only messages automatically. For network buses, overridestart()/stop()to manage connections and callon_message_received()when messages arrive from the network.- __init__(**kwargs)[source]
Initialize the WorkerBus.
- Parameters:
**kwargs – Additional arguments passed to BaseObject.
- async subscribe(subscriber: BusSubscriber) None[source]
Register a subscriber to receive messages from the bus.
Idempotent: re-subscribing an already-registered subscriber is a no-op.
- Parameters:
subscriber – The BusSubscriber to register.
- async unsubscribe(subscriber: BusSubscriber) None[source]
Remove a subscriber and cancel its dispatch tasks.
- Parameters:
subscriber – The BusSubscriber to remove.
- async send(message: BusMessage) None[source]
Send a message through the bus.
Local-only messages are delivered directly to subscribers. All other messages are passed to
publish()for transport.- Parameters:
message – The bus message to send.
- abstractmethod async publish(message: BusMessage) None[source]
Publish a message to the transport.
Subclasses implement this for the specific transport. Called by
send()after filtering local-only messages.- Parameters:
message – The bus message to publish.
- on_message_received(message: BusMessage) None[source]
Deliver a message to all local subscribers via their priority queues.
Called by bus implementations when a message arrives (either from a local
send()or from a network transport).