Source code for pipecat.bus.ui.messages

#
# Copyright (c) 2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""Bus carriers for the UI Worker protocol.

These dataclasses are the on-the-bus shape that ``UIWorker`` (see
``pipecat.workers.ui``) and ``PipelineWorker`` exchange. They are NOT
the on-the-wire format the client sees; that lives in
``pipecat.processors.frameworks.rtvi.models`` (``UIEventMessage``,
``UICommandMessage``, ``UIJobGroupMessage``, ...). When RTVI is enabled,
``PipelineWorker`` translates between the two: its ``on_ui_message``
handler republishes inbound client messages onto the bus, and its
``on_bus_message`` handler turns outbound carriers into RTVI frames.

All carriers subclass ``BusUIDataMessage``, which ``PipelineWorker``
dispatches on to translate outbound ones into RTVI frames.

- ``BusUIEventMessage`` and ``BusUICommandMessage`` carry client
  events and server commands respectively.
- ``BusUIJobGroupStartedMessage``, ``BusUIJobUpdateMessage``,
  ``BusUIJobCompletedMessage``, and ``BusUIJobGroupCompletedMessage``
  carry the four phases of a user-facing job group's lifecycle (see
  ``UIWorker.ui_job_group``).

The carriers live in the ``bus`` layer (rather than alongside
``UIWorker``) because both ``PipelineWorker`` (in ``pipecat.pipeline``)
and ``UIWorker`` (in ``pipecat.workers``) reference them, and
``pipeline`` must not import from ``workers``.
"""

from dataclasses import dataclass
from typing import Any

from pipecat.bus.messages import BusDataMessage

#: Internal ``event_name`` used by ``PipelineWorker`` when republishing a
#: ``ui-snapshot`` wire message onto the bus as a
#: ``BusUIEventMessage``. ``UIWorker``'s bus dispatch matches on this
#: name to route the snapshot into ``_latest_snapshot`` storage. The
#: leading double underscore marks the name as internal so app-defined
#: ``@ui_event`` handlers can't collide with it.
_UI_SNAPSHOT_BUS_EVENT_NAME = "__ui_snapshot"

#: Internal ``event_name`` used by ``PipelineWorker`` when republishing a
#: ``ui-cancel-job-group`` wire message onto the bus as a
#: ``BusUIEventMessage``. ``UIWorker``'s bus dispatch matches on this
#: name to route to ``cancel_job_group``. Internal; not part of the
#: public wire format.
_UI_CANCEL_JOB_GROUP_BUS_EVENT_NAME = "__cancel_job_group"


[docs] @dataclass class BusUIDataMessage(BusDataMessage): """Base for all UI Worker protocol bus carriers. ``PipelineWorker.on_bus_message`` dispatches on this type to translate a worker's outbound UI carriers into RTVI frames, so every UI bus message below subclasses it. """ pass
[docs] @dataclass class BusUIEventMessage(BusUIDataMessage): """A UI event sent from the client to a server-side worker. Emitted by ``PipelineWorker`` when the client dispatches an event via ``PipecatClient.sendUIEvent(event, payload)``. ``UIWorker`` subclasses dispatch these to ``@ui_event(name)`` handlers. Parameters: event_name: App-defined event name. payload: App-defined payload. Schemaless by design. """ event_name: str = "" payload: Any = None
[docs] @dataclass class BusUICommandMessage(BusUIDataMessage): """A UI command sent from a server-side worker to the client. Published by ``UIWorker.send_command(name, payload)``. ``PipelineWorker`` (in ``on_bus_message``) translates this to an ``RTVIUICommandFrame(command=command_name, payload=payload)`` and pushes it through the pipeline. Parameters: command_name: App-defined command name. payload: App-defined payload (already a plain dict by the time it lands on the bus). """ command_name: str = "" payload: Any = None
# --------------------------------------------------------------------------- # UI job-group lifecycle # ---------------------------------------------------------------------------
[docs] @dataclass class BusUIJobGroupStartedMessage(BusUIDataMessage): """A user-facing job group has been dispatched. Published by ``UIWorker.ui_job_group(...)`` on entry. ``PipelineWorker`` forwards it to the client as a ``ui-job-group`` envelope with ``kind = "group_started"``. Parameters: job_id: Shared job-group identifier for the group. workers: Names of the workers the work was dispatched to. label: Optional human-readable label for the group. cancellable: Whether the client may request cancellation. at: Epoch milliseconds when the group started. """ job_id: str = "" workers: list[str] | None = None label: str | None = None cancellable: bool = True at: int = 0
[docs] @dataclass class BusUIJobUpdateMessage(BusUIDataMessage): """Per-worker progress for a user-facing job group. Forwarded by the ``UIWorker`` whenever a worker emits a ``BusJobUpdateMessage`` whose ``job_id`` matches a registered user job group. ``PipelineWorker`` forwards to the client as a ``ui-job-group`` envelope with ``kind = "job_update"``. Parameters: job_id: The shared job-group identifier. worker_name: The worker that produced the update. data: The worker's update payload, forwarded verbatim. at: Epoch milliseconds when the update was emitted on the bus. """ job_id: str = "" worker_name: str = "" data: Any = None at: int = 0
[docs] @dataclass class BusUIJobCompletedMessage(BusUIDataMessage): """A worker in a user-facing job group has completed. Forwarded by the ``UIWorker`` whenever a worker's ``BusJobResponseMessage`` arrives for a registered user job group. ``PipelineWorker`` forwards to the client as a ``ui-job-group`` envelope with ``kind = "job_completed"``. Parameters: job_id: The shared job-group identifier. worker_name: The worker that produced the response. status: Completion status as a string (``JobStatus`` value). response: The worker's response payload. at: Epoch milliseconds when the response was received. """ job_id: str = "" worker_name: str = "" status: str = "" response: Any = None at: int = 0
[docs] @dataclass class BusUIJobGroupCompletedMessage(BusUIDataMessage): """A user-facing job group has completed. Published when ``UIWorker.ui_job_group(...)`` exits, after every worker has responded (or the group has been cancelled). ``PipelineWorker`` forwards to the client as a ``ui-job-group`` envelope with ``kind = "group_completed"``. Parameters: job_id: The shared job-group identifier. at: Epoch milliseconds when the group completed. """ job_id: str = "" at: int = 0