pgmq_backends

Backends for pipecat.bus.network.pgmq.PgmqBus.

A backend owns the wire-side of bus operations: allocating a queue when a bus instance joins a channel, fanning a published message out to channel peers, long-polling for incoming messages, archiving them, and dropping the queue when the bus stops. PgmqBus is the orchestrator on top of this surface.

Two backends ship in this module:

  • DirectPgmqBackend calls pgmq.async_queue.PGMQueue directly. Peer discovery is via list_queues() filtered by a channel prefix. The channel name is encoded in the queue name and is therefore enumerable by any role that can read pg_class. Choose this backend when bus peers trust each other (single-tenant deployments, internal services).

  • IsolatedPgmqBackend calls a small set of SECURITY DEFINER Postgres wrappers (public.bus_join, bus_publish, bus_subscribe, bus_archive, bus_leave) over an asyncpg pool. Queue names are server-allocated opaque UUIDs and a server-side peer registry replaces list_queues discovery. Choose this backend when bus peers should be isolated from each other and the channel name itself is the bus capability.

class pipecat.bus.network.pgmq_backends.BackendMessage(msg_id: int, message: dict)[source]

Bases: object

A message returned by a backend’s read call.

Backends normalize whatever shape they get off the wire into this minimal record so the bus orchestrator can stay backend-agnostic.

msg_id: int
message: dict
class pipecat.bus.network.pgmq_backends.DirectPgmqBackend(pgmq: PGMQueue)[source]

Bases: object

Backend that calls pgmq.async_queue.PGMQueue directly.

  • Queue names are constructed client-side as {safe_channel}_{uuid12}.

  • Peer discovery uses pgmq.list_queues() filtered by channel prefix, cached for _PEER_LIST_TTL_S seconds.

  • Per-peer send failures are caught; the cache is invalidated and the fanout continues. The publish does not raise.

The provided PGMQueue must already be initialized (await pgmq.init()). The backend does not own the client’s lifetime.

Use this when bus peers trust each other. The channel name appears in queue names visible to any role that can read pg_class, so channels are not secret.

__init__(pgmq: PGMQueue)[source]

Initialize the backend with an already-initialized PGMQueue client.

async join(channel: str) str[source]

Create a queue named {safe_channel}_{uuid12} for this bus.

async publish(channel: str, my_queue: str, payload: dict) None[source]

Send payload to every cached peer queue for channel.

async read(queue: str, *, channel: str, vt: int, qty: int, max_poll_seconds: int, poll_interval_ms: int) list[BackendMessage][source]

Long-poll queue via pgmq.read_with_poll.

async archive(queue: str, *, channel: str, msg_id: int) bool[source]

Archive a processed message via pgmq.delete.

async leave(queue: str, *, channel: str) None[source]

Drop the queue and invalidate the cached peer list for channel.

class pipecat.bus.network.pgmq_backends.IsolatedPgmqBackend(pool: Any)[source]

Bases: object

Backend that calls SECURITY DEFINER Postgres wrappers over asyncpg.

Use this when bus peers should be isolated from each other and the channel name is the bus capability. The backend never issues raw pgmq.* calls; every operation goes through public.bus_* wrappers, which enforce (queue_name, channel) membership against a server-side peer registry table.

Wire format (server-side SQL, defined out-of-band in the deployer’s migrations):

bus_join(p_channel text) RETURNS text
bus_publish(p_channel text, p_my_queue text, p_message jsonb) RETURNS bigint[]
bus_subscribe(p_my_queue text, p_channel text, p_vt int,
              p_qty int, p_max_seconds int)
    RETURNS TABLE(msg_id bigint, message jsonb)
bus_archive(p_my_queue text, p_channel text, p_msg_id bigint) RETURNS boolean
bus_leave(p_my_queue text, p_channel text) RETURNS void

The asyncpg.Pool must allow at least two concurrent connections (one held by the reader loop’s long-poll, one for publishes). Pool lifetime is owned by the caller — this backend does not close it.

Notes

  • bus_subscribe long-polls inside a SECURITY DEFINER function. Use a session-mode pooler; transaction-mode poolers may drop the connection mid-poll.

  • Payload serialization: the caller hands PgmqBus a json-encodable dict; the backend forwards it as jsonb via json.dumps because asyncpg does not auto-coerce dict to jsonb.

__init__(pool: Any)[source]

Initialize the backend.

Parameters:

pool – An asyncpg.pool.Pool that the bus will use for all wrapper calls. Typed as Any to keep asyncpg an optional import.

async join(channel: str) str[source]

Call public.bus_join(channel) and return the server-allocated queue name.

async publish(channel: str, my_queue: str, payload: dict) None[source]

Call public.bus_publish to fan payload out to channel peers.

async read(queue: str, *, channel: str, vt: int, qty: int, max_poll_seconds: int, poll_interval_ms: int) list[BackendMessage][source]

Long-poll queue via public.bus_subscribe.

poll_interval_ms is honored server-side by pgmq.read_with_poll’s default; the wrapper does not expose it.

async archive(queue: str, *, channel: str, msg_id: int) bool[source]

Call public.bus_archive to acknowledge a processed message.

async leave(queue: str, *, channel: str) None[source]

Call public.bus_leave to drop the queue and unregister it.

class pipecat.bus.network.pgmq_backends.PgmqBackend(*args, **kwargs)[source]

Bases: Protocol

Wire-side interface PgmqBus delegates to.

A backend instance is shared across the lifetime of a bus and may be shared across multiple buses (e.g. one process running both an upstream and downstream bus on different channels against the same database).

async join(channel: str) str[source]

Allocate a queue for this bus and register it on channel.

Returns:

The opaque queue name the bus should read from.

async publish(channel: str, my_queue: str, payload: dict) None[source]

Fan payload out to every peer queue on channel except my_queue.

async read(queue: str, *, channel: str, vt: int, qty: int, max_poll_seconds: int, poll_interval_ms: int) list[BackendMessage][source]

Long-poll for messages on queue. Returns an empty list on timeout.

async archive(queue: str, *, channel: str, msg_id: int) bool[source]

Acknowledge / archive a processed message.

async leave(queue: str, *, channel: str) None[source]

Drop the queue and unregister it from channel.