Source code for pipecat.bus.bus

#
# Copyright (c) 2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""Abstract worker bus for inter-worker pub/sub messaging.

Provides the abstract `WorkerBus` base class. Concrete implementations
(e.g. `AsyncQueueBus`) live in separate modules.
"""

import asyncio
from abc import abstractmethod
from dataclasses import dataclass, field
from typing import cast

from pipecat.bus.messages import BusLocalMessage, BusMessage, BusSystemMessage
from pipecat.bus.queue import BusMessageQueue
from pipecat.bus.subscriber import BusSubscriber
from pipecat.utils.base_object import BaseObject


[docs] @dataclass class BusSubscription: """A single subscriber's state on the bus. Parameters: subscriber: The subscriber receiving messages. queue: Priority queue for incoming messages. data_queue: Secondary queue for data messages dispatched by the router worker. router_task: Task that reads from the priority queue, handles system messages inline, and routes data messages to the data queue. data_task: Task that processes data messages sequentially from the data queue. """ subscriber: BusSubscriber queue: BusMessageQueue = field(default_factory=BusMessageQueue, repr=False) data_queue: asyncio.Queue = field(default_factory=asyncio.Queue, repr=False) router_task: asyncio.Task | None = field(default=None, repr=False) data_task: asyncio.Task | None = field(default=None, repr=False)
[docs] class WorkerBus(BaseObject): """Abstract base for inter-worker and runner-worker communication. Provides pub/sub messaging where each subscriber receives messages independently through its own priority queue. System messages (e.g. cancel) are delivered before normal data messages. Subclasses implement ``publish()`` for the specific transport. ``send()`` handles local-only messages automatically. For network buses, override ``start()``/``stop()`` to manage connections and call ``on_message_received()`` when messages arrive from the network. """
[docs] def __init__(self, **kwargs): """Initialize the WorkerBus. Args: **kwargs: Additional arguments passed to `BaseObject`. """ super().__init__(**kwargs) self._subscriptions: dict[str, BusSubscription] = {} self._running = False
[docs] async def start(self): """Start dispatch tasks for all registered subscribers.""" if self._running: return self._running = True for sub in self._subscriptions.values(): self._start_dispatch_task(sub) # Schedule tasks right away. await asyncio.sleep(0)
[docs] async def stop(self): """Stop all dispatch tasks.""" if not self._running: return self._running = False for sub in self._subscriptions.values(): if sub.router_task: await self.cancel_task(sub.router_task) sub.router_task = None if sub.data_task: await self.cancel_task(sub.data_task) sub.data_task = None
[docs] async def subscribe(self, subscriber: BusSubscriber) -> None: """Register a subscriber to receive messages from the bus. Idempotent: re-subscribing an already-registered subscriber is a no-op. Args: subscriber: The `BusSubscriber` to register. """ if subscriber.name in self._subscriptions: return sub = BusSubscription(subscriber=subscriber) if self._running: self._start_dispatch_task(sub) # Schedule worker right away. await asyncio.sleep(0) self._subscriptions[subscriber.name] = sub
[docs] async def unsubscribe(self, subscriber: BusSubscriber) -> None: """Remove a subscriber and cancel its dispatch tasks. Args: subscriber: The `BusSubscriber` to remove. """ sub = self._subscriptions.pop(subscriber.name, None) if sub: if sub.router_task: await self.cancel_task(sub.router_task) if sub.data_task: await self.cancel_task(sub.data_task)
[docs] async def send(self, message: BusMessage) -> None: """Send a message through the bus. Local-only messages are delivered directly to subscribers. All other messages are passed to ``publish()`` for transport. Args: message: The bus message to send. """ if isinstance(message, BusLocalMessage): self.on_message_received(message) return await self.publish(message)
[docs] @abstractmethod async def publish(self, message: BusMessage) -> None: """Publish a message to the transport. Subclasses implement this for the specific transport. Called by ``send()`` after filtering local-only messages. Args: message: The bus message to publish. """ pass
[docs] def on_message_received(self, message: BusMessage) -> None: """Deliver a message to all local subscribers via their priority queues. Called by bus implementations when a message arrives (either from a local ``send()`` or from a network transport). """ for sub in self._subscriptions.values(): sub.queue.put_nowait(message)
def _start_dispatch_task(self, sub: BusSubscription) -> None: """Start the router and data dispatch tasks for a subscriber.""" sub.router_task = self.create_task(self._router_task(sub), f"bus_router_{sub.subscriber}") sub.data_task = self.create_task( self._data_dispatch_task(sub), f"bus_data_{sub.subscriber}" ) async def _router_task(self, sub: BusSubscription): """Route system messages inline, data messages to the data queue.""" try: while True: message = await sub.queue.get() if isinstance(message, BusSystemMessage): await sub.subscriber.on_bus_message(cast(BusMessage, message)) else: sub.data_queue.put_nowait(message) except asyncio.CancelledError: pass async def _data_dispatch_task(self, sub: BusSubscription): """Process data messages sequentially from the data queue.""" try: while True: message = await sub.data_queue.get() await sub.subscriber.on_bus_message(message) except asyncio.CancelledError: pass