pgmq
PGMQ (PostgreSQL Message Queue) worker bus for distributed workers.
- class pipecat.bus.network.pgmq.PgmqBus(*, pgmq: PGMQueue | None = None, backend: PgmqBackend | None = None, serializer: MessageSerializer | None = None, channel: str = 'pipecat_bus', visibility_timeout: int = 30, batch_size: int = 10, poll_interval_ms: int = 100, max_poll_seconds: int = 5, **kwargs)[source]
Bases:
WorkerBusDistributed worker bus backed by PGMQ (PostgreSQL Message Queue).
Pub/sub fan-out is implemented on top of PGMQ’s point-to-point queue semantics by giving each
PgmqBusinstance its own queue and broadcasting on publish. A reader long-polls the instance’s queue and dispatches received messages to local subscribers.BusLocalMessagemessages bypass the network entirely and are delivered directly to local subscribers.Two backends are supported (see
pipecat.bus.network.pgmq_backends):DirectPgmqBackend— callspgmq.*directly and discovers peers by queue-name prefix. Suitable when bus peers trust each other.IsolatedPgmqBackend— calls SECURITY DEFINER Postgres wrappers over an asyncpg pool. Suitable when peers should be isolated and the channel name is the bus capability.
Construct with either
pgmq=PGMQueue(usesDirectPgmqBackend) orbackend=PgmqBackend(any backend). The two are mutually exclusive.Requires the
pgmqextra. Install withpip install pipecat-ai[pgmq].Example:
from pgmq.async_queue import PGMQueue pgmq = PGMQueue( host="...", port="5432", database="postgres", username="postgres", password="...", pool_size=4, ) await pgmq.init() bus = PgmqBus(pgmq=pgmq, channel="pipecat_acme")
Notes
Prefer a session-mode pooler when available. Transaction-mode pooling works for direct
pgmq.*calls but is fragile around the long-poll inside the SECURITY DEFINERbus_subscribewrapper used byIsolatedPgmqBackend. The underlying connection pool must allow at least two concurrent connections (one for the reader’s long-poll, one for publishes).- __init__(*, pgmq: PGMQueue | None = None, backend: PgmqBackend | None = None, serializer: MessageSerializer | None = None, channel: str = 'pipecat_bus', visibility_timeout: int = 30, batch_size: int = 10, poll_interval_ms: int = 100, max_poll_seconds: int = 5, **kwargs)[source]
Initialize the PgmqBus.
- Parameters:
pgmq – An initialized
PGMQueueclient. SelectsDirectPgmqBackend. Mutually exclusive withbackend.backend – A
PgmqBackendinstance (e.g.IsolatedPgmqBackend, or a custom backend). Mutually exclusive withpgmq.serializer – The
MessageSerializerfor encoding/decoding messages. Defaults toJSONMessageSerializer.channel – Channel name. With
DirectPgmqBackendthis is sanitized into a queue-name prefix. WithIsolatedPgmqBackendit is the bus capability passed to every wrapper call.visibility_timeout – Seconds a read message stays invisible before redelivery. Defaults to 30.
batch_size – Maximum messages to fetch per read. Defaults to 10.
poll_interval_ms – Long-poll check interval in milliseconds. Defaults to 100. (Backend may ignore if it doesn’t expose this knob.)
max_poll_seconds – Maximum seconds the reader blocks per poll cycle. Defaults to 5.
**kwargs – Additional arguments passed to
WorkerBus.
- async publish(message: BusMessage) None[source]
Broadcast a message to every peer on this channel.
The backend handles peer discovery and fan-out; per-peer failures are the backend’s responsibility to absorb so the publish does not raise.