pgmq_backends
Backends for pipecat.bus.network.pgmq.PgmqBus.
A backend owns the wire-side of bus operations: allocating a queue when a
bus instance joins a channel, fanning a published message out to channel
peers, long-polling for incoming messages, archiving them, and dropping
the queue when the bus stops. PgmqBus is the orchestrator on top
of this surface.
Two backends ship in this module:
DirectPgmqBackendcallspgmq.async_queue.PGMQueuedirectly. Peer discovery is vialist_queues()filtered by a channel prefix. The channel name is encoded in the queue name and is therefore enumerable by any role that can readpg_class. Choose this backend when bus peers trust each other (single-tenant deployments, internal services).IsolatedPgmqBackendcalls a small set of SECURITY DEFINER Postgres wrappers (public.bus_join,bus_publish,bus_subscribe,bus_archive,bus_leave) over anasyncpgpool. Queue names are server-allocated opaque UUIDs and a server-side peer registry replaceslist_queuesdiscovery. Choose this backend when bus peers should be isolated from each other and the channel name itself is the bus capability.
- class pipecat.bus.network.pgmq_backends.BackendMessage(msg_id: int, message: dict)[source]
Bases:
objectA message returned by a backend’s
readcall.Backends normalize whatever shape they get off the wire into this minimal record so the bus orchestrator can stay backend-agnostic.
- msg_id: int
- message: dict
- class pipecat.bus.network.pgmq_backends.DirectPgmqBackend(pgmq: PGMQueue)[source]
Bases:
objectBackend that calls
pgmq.async_queue.PGMQueuedirectly.Queue names are constructed client-side as
{safe_channel}_{uuid12}.Peer discovery uses
pgmq.list_queues()filtered by channel prefix, cached for_PEER_LIST_TTL_Sseconds.Per-peer
sendfailures are caught; the cache is invalidated and the fanout continues. The publish does not raise.
The provided
PGMQueuemust already be initialized (await pgmq.init()). The backend does not own the client’s lifetime.Use this when bus peers trust each other. The channel name appears in queue names visible to any role that can read
pg_class, so channels are not secret.- __init__(pgmq: PGMQueue)[source]
Initialize the backend with an already-initialized PGMQueue client.
- async publish(channel: str, my_queue: str, payload: dict) None[source]
Send
payloadto every cached peer queue forchannel.
- async read(queue: str, *, channel: str, vt: int, qty: int, max_poll_seconds: int, poll_interval_ms: int) list[BackendMessage][source]
Long-poll
queueviapgmq.read_with_poll.
- class pipecat.bus.network.pgmq_backends.IsolatedPgmqBackend(pool: Any)[source]
Bases:
objectBackend that calls SECURITY DEFINER Postgres wrappers over asyncpg.
Use this when bus peers should be isolated from each other and the channel name is the bus capability. The backend never issues raw
pgmq.*calls; every operation goes throughpublic.bus_*wrappers, which enforce(queue_name, channel)membership against a server-side peer registry table.Wire format (server-side SQL, defined out-of-band in the deployer’s migrations):
bus_join(p_channel text) RETURNS text bus_publish(p_channel text, p_my_queue text, p_message jsonb) RETURNS bigint[] bus_subscribe(p_my_queue text, p_channel text, p_vt int, p_qty int, p_max_seconds int) RETURNS TABLE(msg_id bigint, message jsonb) bus_archive(p_my_queue text, p_channel text, p_msg_id bigint) RETURNS boolean bus_leave(p_my_queue text, p_channel text) RETURNS void
The
asyncpg.Poolmust allow at least two concurrent connections (one held by the reader loop’s long-poll, one for publishes). Pool lifetime is owned by the caller — this backend does not close it.Notes
bus_subscribelong-polls inside a SECURITY DEFINER function. Use a session-mode pooler; transaction-mode poolers may drop the connection mid-poll.Payload serialization: the caller hands
PgmqBusajson-encodabledict; the backend forwards it asjsonbviajson.dumpsbecause asyncpg does not auto-coercedicttojsonb.
- __init__(pool: Any)[source]
Initialize the backend.
- Parameters:
pool – An
asyncpg.pool.Poolthat the bus will use for all wrapper calls. Typed asAnyto keepasyncpgan optional import.
- async join(channel: str) str[source]
Call
public.bus_join(channel)and return the server-allocated queue name.
- async publish(channel: str, my_queue: str, payload: dict) None[source]
Call
public.bus_publishto fanpayloadout to channel peers.
- async read(queue: str, *, channel: str, vt: int, qty: int, max_poll_seconds: int, poll_interval_ms: int) list[BackendMessage][source]
Long-poll
queueviapublic.bus_subscribe.poll_interval_msis honored server-side bypgmq.read_with_poll’s default; the wrapper does not expose it.
- class pipecat.bus.network.pgmq_backends.PgmqBackend(*args, **kwargs)[source]
Bases:
ProtocolWire-side interface
PgmqBusdelegates to.A backend instance is shared across the lifetime of a bus and may be shared across multiple buses (e.g. one process running both an upstream and downstream bus on different channels against the same database).
- async join(channel: str) str[source]
Allocate a queue for this bus and register it on
channel.- Returns:
The opaque queue name the bus should read from.
- async publish(channel: str, my_queue: str, payload: dict) None[source]
Fan
payloadout to every peer queue onchannelexceptmy_queue.
- async read(queue: str, *, channel: str, vt: int, qty: int, max_poll_seconds: int, poll_interval_ms: int) list[BackendMessage][source]
Long-poll for messages on
queue. Returns an empty list on timeout.