websocket

WebSocket proxy workers for forwarding bus messages.

class pipecat.workers.proxy.websocket.WebSocketProxyClient(name: str, *, url: str, remote_worker_name: str, local_worker_name: str, forward_messages: tuple[type[BusMessage], ...] = (), headers: dict[str, str] | None = None, serializer: MessageSerializer | None = None, active: bool = False)[source]

Bases: BaseWorker

Forwards bus messages to a remote worker over WebSocket.

Connects to a WebSocket URL and forwards messages between a local worker and a remote worker. Only messages targeted at the remote worker are sent. Only messages targeted at the local worker are accepted.

Event handlers available:

  • on_connected: Fired when the WebSocket connection is established.

  • on_disconnected: Fired when the WebSocket connection is closed.

Example:

proxy = WebSocketProxyClient(
    "proxy",
    url="ws://remote-server:8765/ws",
    remote_worker_name="worker",
    local_worker_name="voice",
)

@proxy.event_handler("on_connected")
async def on_connected(worker, websocket):
    logger.info("Connected to remote server")

@proxy.event_handler("on_disconnected")
async def on_disconnected(worker, websocket):
    logger.info("Disconnected from remote server")

await runner.add_workers(proxy)
__init__(name: str, *, url: str, remote_worker_name: str, local_worker_name: str, forward_messages: tuple[type[BusMessage], ...] = (), headers: dict[str, str] | None = None, serializer: MessageSerializer | None = None, active: bool = False)[source]

Initialize the WebSocketProxyClient.

Parameters:
  • name – Unique name for this worker.

  • url – The WebSocket URL to connect to.

  • remote_worker_name – Name of the worker on the remote server. Only messages targeted at this worker are forwarded.

  • local_worker_name – Name of the local worker that should receive responses. Only inbound messages targeted at this worker are accepted.

  • forward_messages – Additional message types to forward from the local worker (e.g. (BusFrameMessage,) for frame routing). These are forwarded based on source worker name only, regardless of target.

  • headers – Optional HTTP headers sent with the WebSocket handshake (e.g. for authentication).

  • serializer – Serializer for bus messages. Defaults to JSONMessageSerializer.

  • active – Whether the worker starts active. Defaults to False because on_activated opens the WebSocket connection, which is almost always a deliberate action triggered by an upstream event (e.g. the local client connecting). Pass True to connect as soon as the worker starts.

async on_activated(args: dict | None) None[source]

Connect to the remote WebSocket server.

async stop() None[source]

Cancel the receive loop and close the WebSocket connection.

async on_bus_message(message: BusMessage) None[source]

Forward messages targeted at the remote worker.

Parameters:

message – The bus message to process.

class pipecat.workers.proxy.websocket.WebSocketProxyServer(name: str, *, websocket: WebSocket, worker_name: str, remote_worker_name: str, forward_messages: tuple[type[BusMessage], ...] = (), serializer: MessageSerializer | None = None)[source]

Bases: BaseWorker

Receives bus messages from a remote client over WebSocket.

Accepts a FastAPI/Starlette WebSocket connection and forwards messages between the remote client and a local worker. Only messages from the local worker targeted at the remote worker are sent. Only inbound messages targeted at the local worker are accepted.

Event handlers available:

  • on_client_connected: Fired when the WebSocket client connects and the proxy is ready.

  • on_client_disconnected: Fired when the WebSocket client disconnects.

Example:

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    proxy = WebSocketProxyServer(
        "gateway",
        websocket=websocket,
        worker_name="worker",
        remote_worker_name="voice",
    )

    @proxy.event_handler("on_client_connected")
    async def on_client_connected(worker, websocket):
        logger.info("Client connected")

    @proxy.event_handler("on_client_disconnected")
    async def on_client_disconnected(worker, websocket):
        logger.info("Client disconnected")

    await runner.add_workers(proxy)
__init__(name: str, *, websocket: WebSocket, worker_name: str, remote_worker_name: str, forward_messages: tuple[type[BusMessage], ...] = (), serializer: MessageSerializer | None = None)[source]

Initialize the WebSocketProxyServer.

Parameters:
  • name – Unique name for this worker.

  • websocket – An accepted FastAPI/Starlette WebSocket connection.

  • worker_name – Name of the local worker to route messages to/from. Only messages from this worker are forwarded to the client.

  • remote_worker_name – Name of the worker on the remote client. Only outbound messages targeted at this worker are sent. Only inbound messages targeted at the local worker are accepted.

  • forward_messages – Additional message types to forward from the local worker (e.g. (BusFrameMessage,) for frame routing). These are forwarded based on source worker name only, regardless of target.

  • serializer – Serializer for bus messages. Defaults to JSONMessageSerializer.

async start() None[source]

Start the WebSocket receive loop and watch the local worker.

async stop() None[source]

Cancel the receive loop and close the WebSocket connection.

async on_worker_ready(data: WorkerReadyData) None[source]

Notify the remote client that the local worker is ready.

async on_bus_message(message: BusMessage) None[source]

Forward messages from the local worker to the remote client.

Parameters:

message – The bus message to process.

Submodules