proxy
Proxy workers for forwarding bus messages over network transports.
- class pipecat.workers.proxy.WebSocketProxyClient(name: str, *, url: str, remote_worker_name: str, local_worker_name: str, forward_messages: tuple[type[BusMessage], ...] = (), headers: dict[str, str] | None = None, serializer: MessageSerializer | None = None, active: bool = False)[source]
Bases:
BaseWorkerForwards bus messages to a remote worker over WebSocket.
Connects to a WebSocket URL and forwards messages between a local worker and a remote worker. Only messages targeted at the remote worker are sent. Only messages targeted at the local worker are accepted.
Event handlers available:
on_connected: Fired when the WebSocket connection is established.
on_disconnected: Fired when the WebSocket connection is closed.
Example:
proxy = WebSocketProxyClient( "proxy", url="ws://remote-server:8765/ws", remote_worker_name="worker", local_worker_name="voice", ) @proxy.event_handler("on_connected") async def on_connected(worker, websocket): logger.info("Connected to remote server") @proxy.event_handler("on_disconnected") async def on_disconnected(worker, websocket): logger.info("Disconnected from remote server") await runner.add_workers(proxy)
- __init__(name: str, *, url: str, remote_worker_name: str, local_worker_name: str, forward_messages: tuple[type[BusMessage], ...] = (), headers: dict[str, str] | None = None, serializer: MessageSerializer | None = None, active: bool = False)[source]
Initialize the WebSocketProxyClient.
- Parameters:
name – Unique name for this worker.
url – The WebSocket URL to connect to.
remote_worker_name – Name of the worker on the remote server. Only messages targeted at this worker are forwarded.
local_worker_name – Name of the local worker that should receive responses. Only inbound messages targeted at this worker are accepted.
forward_messages – Additional message types to forward from the local worker (e.g.
(BusFrameMessage,)for frame routing). These are forwarded based on source worker name only, regardless of target.headers – Optional HTTP headers sent with the WebSocket handshake (e.g. for authentication).
serializer – Serializer for bus messages. Defaults to JSONMessageSerializer.
active – Whether the worker starts active. Defaults to
Falsebecauseon_activatedopens the WebSocket connection, which is almost always a deliberate action triggered by an upstream event (e.g. the local client connecting). PassTrueto connect as soon as the worker starts.
- async on_bus_message(message: BusMessage) None[source]
Forward messages targeted at the remote worker.
- Parameters:
message – The bus message to process.
- class pipecat.workers.proxy.WebSocketProxyServer(name: str, *, websocket: WebSocket, worker_name: str, remote_worker_name: str, forward_messages: tuple[type[BusMessage], ...] = (), serializer: MessageSerializer | None = None)[source]
Bases:
BaseWorkerReceives bus messages from a remote client over WebSocket.
Accepts a FastAPI/Starlette WebSocket connection and forwards messages between the remote client and a local worker. Only messages from the local worker targeted at the remote worker are sent. Only inbound messages targeted at the local worker are accepted.
Event handlers available:
on_client_connected: Fired when the WebSocket client connects and the proxy is ready.
on_client_disconnected: Fired when the WebSocket client disconnects.
Example:
@app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): await websocket.accept() proxy = WebSocketProxyServer( "gateway", websocket=websocket, worker_name="worker", remote_worker_name="voice", ) @proxy.event_handler("on_client_connected") async def on_client_connected(worker, websocket): logger.info("Client connected") @proxy.event_handler("on_client_disconnected") async def on_client_disconnected(worker, websocket): logger.info("Client disconnected") await runner.add_workers(proxy)
- __init__(name: str, *, websocket: WebSocket, worker_name: str, remote_worker_name: str, forward_messages: tuple[type[BusMessage], ...] = (), serializer: MessageSerializer | None = None)[source]
Initialize the WebSocketProxyServer.
- Parameters:
name – Unique name for this worker.
websocket – An accepted FastAPI/Starlette WebSocket connection.
worker_name – Name of the local worker to route messages to/from. Only messages from this worker are forwarded to the client.
remote_worker_name – Name of the worker on the remote client. Only outbound messages targeted at this worker are sent. Only inbound messages targeted at the local worker are accepted.
forward_messages – Additional message types to forward from the local worker (e.g.
(BusFrameMessage,)for frame routing). These are forwarded based on source worker name only, regardless of target.serializer – Serializer for bus messages. Defaults to JSONMessageSerializer.
- async on_worker_ready(data: WorkerReadyData) None[source]
Notify the remote client that the local worker is ready.
- async on_bus_message(message: BusMessage) None[source]
Forward messages from the local worker to the remote client.
- Parameters:
message – The bus message to process.