pgmq

PGMQ (PostgreSQL Message Queue) worker bus for distributed workers.

class pipecat.bus.network.pgmq.PgmqBus(*, pgmq: PGMQueue | None = None, backend: PgmqBackend | None = None, serializer: MessageSerializer | None = None, channel: str = 'pipecat_bus', visibility_timeout: int = 30, batch_size: int = 10, poll_interval_ms: int = 100, max_poll_seconds: int = 5, **kwargs)[source]

Bases: WorkerBus

Distributed worker bus backed by PGMQ (PostgreSQL Message Queue).

Pub/sub fan-out is implemented on top of PGMQ’s point-to-point queue semantics by giving each PgmqBus instance its own queue and broadcasting on publish. A reader long-polls the instance’s queue and dispatches received messages to local subscribers.

BusLocalMessage messages bypass the network entirely and are delivered directly to local subscribers.

Two backends are supported (see pipecat.bus.network.pgmq_backends):

  • DirectPgmqBackend — calls pgmq.* directly and discovers peers by queue-name prefix. Suitable when bus peers trust each other.

  • IsolatedPgmqBackend — calls SECURITY DEFINER Postgres wrappers over an asyncpg pool. Suitable when peers should be isolated and the channel name is the bus capability.

Construct with either pgmq=PGMQueue (uses DirectPgmqBackend) or backend=PgmqBackend (any backend). The two are mutually exclusive.

Requires the pgmq extra. Install with pip install pipecat-ai[pgmq].

Example:

from pgmq.async_queue import PGMQueue

pgmq = PGMQueue(
    host="...",
    port="5432",
    database="postgres",
    username="postgres",
    password="...",
    pool_size=4,
)
await pgmq.init()
bus = PgmqBus(pgmq=pgmq, channel="pipecat_acme")

Notes

Prefer a session-mode pooler when available. Transaction-mode pooling works for direct pgmq.* calls but is fragile around the long-poll inside the SECURITY DEFINER bus_subscribe wrapper used by IsolatedPgmqBackend. The underlying connection pool must allow at least two concurrent connections (one for the reader’s long-poll, one for publishes).

__init__(*, pgmq: PGMQueue | None = None, backend: PgmqBackend | None = None, serializer: MessageSerializer | None = None, channel: str = 'pipecat_bus', visibility_timeout: int = 30, batch_size: int = 10, poll_interval_ms: int = 100, max_poll_seconds: int = 5, **kwargs)[source]

Initialize the PgmqBus.

Parameters:
  • pgmq – An initialized PGMQueue client. Selects DirectPgmqBackend. Mutually exclusive with backend.

  • backend – A PgmqBackend instance (e.g. IsolatedPgmqBackend, or a custom backend). Mutually exclusive with pgmq.

  • serializer – The MessageSerializer for encoding/decoding messages. Defaults to JSONMessageSerializer.

  • channel – Channel name. With DirectPgmqBackend this is sanitized into a queue-name prefix. With IsolatedPgmqBackend it is the bus capability passed to every wrapper call.

  • visibility_timeout – Seconds a read message stays invisible before redelivery. Defaults to 30.

  • batch_size – Maximum messages to fetch per read. Defaults to 10.

  • poll_interval_ms – Long-poll check interval in milliseconds. Defaults to 100. (Backend may ignore if it doesn’t expose this knob.)

  • max_poll_seconds – Maximum seconds the reader blocks per poll cycle. Defaults to 5.

  • **kwargs – Additional arguments passed to WorkerBus.

async start()[source]

Join the channel via the backend and start the reader task.

async stop()[source]

Stop the reader task and leave the channel.

async publish(message: BusMessage) None[source]

Broadcast a message to every peer on this channel.

The backend handles peer discovery and fan-out; per-peer failures are the backend’s responsibility to absorb so the publish does not raise.