Source code for pipecat.bus.network.pgmq

#
# Copyright (c) 2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""PGMQ (PostgreSQL Message Queue) worker bus for distributed workers."""

import asyncio
import json

from loguru import logger

from pipecat.bus.bus import WorkerBus
from pipecat.bus.messages import BusMessage
from pipecat.bus.network.pgmq_backends import (
    DirectPgmqBackend,
    PgmqBackend,
)
from pipecat.bus.serializers import JSONMessageSerializer
from pipecat.bus.serializers.base import MessageSerializer

try:
    from pgmq.async_queue import PGMQueue
except ModuleNotFoundError as e:  # pragma: no cover - exercised only when extra is missing
    logger.error(f"Exception: {e}")
    logger.error("In order to use PgmqBus, you need to `pip install pipecat-ai[pgmq]`.")
    raise ImportError(f"Missing module: {e}") from e


[docs] class PgmqBus(WorkerBus): """Distributed worker bus backed by PGMQ (PostgreSQL Message Queue). Pub/sub fan-out is implemented on top of PGMQ's point-to-point queue semantics by giving each :class:`PgmqBus` instance its own queue and broadcasting on publish. A reader long-polls the instance's queue and dispatches received messages to local subscribers. ``BusLocalMessage`` messages bypass the network entirely and are delivered directly to local subscribers. Two backends are supported (see :mod:`pipecat.bus.network.pgmq_backends`): - :class:`DirectPgmqBackend` — calls ``pgmq.*`` directly and discovers peers by queue-name prefix. Suitable when bus peers trust each other. - :class:`IsolatedPgmqBackend` — calls SECURITY DEFINER Postgres wrappers over an asyncpg pool. Suitable when peers should be isolated and the channel name is the bus capability. Construct with either ``pgmq=PGMQueue`` (uses :class:`DirectPgmqBackend`) or ``backend=PgmqBackend`` (any backend). The two are mutually exclusive. Requires the ``pgmq`` extra. Install with ``pip install pipecat-ai[pgmq]``. Example:: from pgmq.async_queue import PGMQueue pgmq = PGMQueue( host="...", port="5432", database="postgres", username="postgres", password="...", pool_size=4, ) await pgmq.init() bus = PgmqBus(pgmq=pgmq, channel="pipecat_acme") Notes: Prefer a session-mode pooler when available. Transaction-mode pooling works for direct ``pgmq.*`` calls but is fragile around the long-poll inside the SECURITY DEFINER ``bus_subscribe`` wrapper used by :class:`IsolatedPgmqBackend`. The underlying connection pool must allow at least two concurrent connections (one for the reader's long-poll, one for publishes). """
[docs] def __init__( self, *, pgmq: PGMQueue | None = None, backend: PgmqBackend | None = None, serializer: MessageSerializer | None = None, channel: str = "pipecat_bus", visibility_timeout: int = 30, batch_size: int = 10, poll_interval_ms: int = 100, max_poll_seconds: int = 5, **kwargs, ): """Initialize the PgmqBus. Args: pgmq: An initialized ``PGMQueue`` client. Selects :class:`DirectPgmqBackend`. Mutually exclusive with ``backend``. backend: A :class:`PgmqBackend` instance (e.g. :class:`IsolatedPgmqBackend`, or a custom backend). Mutually exclusive with ``pgmq``. serializer: The :class:`MessageSerializer` for encoding/decoding messages. Defaults to :class:`JSONMessageSerializer`. channel: Channel name. With :class:`DirectPgmqBackend` this is sanitized into a queue-name prefix. With :class:`IsolatedPgmqBackend` it is the bus capability passed to every wrapper call. visibility_timeout: Seconds a read message stays invisible before redelivery. Defaults to 30. batch_size: Maximum messages to fetch per read. Defaults to 10. poll_interval_ms: Long-poll check interval in milliseconds. Defaults to 100. (Backend may ignore if it doesn't expose this knob.) max_poll_seconds: Maximum seconds the reader blocks per poll cycle. Defaults to 5. **kwargs: Additional arguments passed to :class:`WorkerBus`. """ super().__init__(**kwargs) if pgmq is not None and backend is not None: raise ValueError("PgmqBus accepts pgmq= or backend=, not both") if backend is not None: self._backend: PgmqBackend = backend elif pgmq is not None: self._backend = DirectPgmqBackend(pgmq) else: raise ValueError("PgmqBus requires pgmq= or backend=") self._serializer = serializer or JSONMessageSerializer() self._channel = channel self._visibility_timeout = visibility_timeout self._batch_size = batch_size self._poll_interval_ms = poll_interval_ms self._max_poll_seconds = max_poll_seconds self._queue_name: str | None = None self._reader_task: asyncio.Task | None = None
[docs] async def start(self): """Join the channel via the backend and start the reader task.""" await super().start() self._queue_name = await self._backend.join(self._channel) logger.debug(f"{self}: joined channel via backend; queue='{self._queue_name}'") self._reader_task = self.create_task(self._reader_loop()) await asyncio.sleep(0)
[docs] async def stop(self): """Stop the reader task and leave the channel.""" await super().stop() if self._reader_task: await self.cancel_task(self._reader_task) self._reader_task = None if self._queue_name: try: await self._backend.leave(self._queue_name, channel=self._channel) except Exception: logger.exception(f"{self}: backend leave failed for queue '{self._queue_name}'") self._queue_name = None
[docs] async def publish(self, message: BusMessage) -> None: """Broadcast a message to every peer on this channel. The backend handles peer discovery and fan-out; per-peer failures are the backend's responsibility to absorb so the publish does not raise. """ payload_bytes = self._serializer.serialize(message) try: payload = json.loads(payload_bytes) except Exception: logger.exception(f"{self}: failed to encode payload for pgmq") return if self._queue_name is None: logger.warning(f"{self}: publish called before start(); dropping message") return try: await self._backend.publish(self._channel, self._queue_name, payload) except Exception: logger.exception(f"{self}: backend publish failed")
async def _reader_loop(self) -> None: while True: if self._queue_name is None: # Defensive: start() always sets this before launching the # task, but cover the race where stop() clears it mid-loop. return try: messages = await self._backend.read( self._queue_name, channel=self._channel, vt=self._visibility_timeout, qty=self._batch_size, max_poll_seconds=self._max_poll_seconds, poll_interval_ms=self._poll_interval_ms, ) except Exception: logger.exception(f"{self}: backend read failed; backing off") await asyncio.sleep(1) continue for msg in messages or []: try: raw = json.dumps(msg.message).encode("utf-8") bus_message = self._serializer.deserialize(raw) if bus_message: self.on_message_received(bus_message) except Exception: logger.exception(f"{self}: failed to deserialize message {msg.msg_id}") finally: try: await self._backend.archive( self._queue_name, channel=self._channel, msg_id=msg.msg_id, ) except Exception: logger.exception(f"{self}: failed to archive message {msg.msg_id}")