#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Worker runner for managing worker execution and orchestration.
This module provides the :class:`WorkerRunner` class. It runs
:class:`~pipecat.workers.base_worker.BaseWorker` instances (including
:class:`~pipecat.pipeline.worker.PipelineWorker`) to completion, owning the
shared :class:`~pipecat.bus.WorkerBus`, the worker registry, and the worker
manager that backs the entire session.
For a typical single-pipeline bot, register the worker with
:meth:`WorkerRunner.add_workers` and then call :meth:`WorkerRunner.run`:
.. code-block:: python
runner = WorkerRunner()
await runner.add_workers(worker)
await runner.run()
For multi-worker setups, register every worker the same way:
.. code-block:: python
runner = WorkerRunner()
await runner.add_workers(CodeWorker("code-worker", ...), worker)
await runner.run()
By default, ``run()`` ends once every root worker has finished — so a
single-pipeline bot naturally ends when its pipeline does. Multi-worker
bots whose helpers run forever (e.g. waiting for bus messages) end by
calling :meth:`WorkerRunner.end` / :meth:`WorkerRunner.cancel` from
an event handler (typically on transport disconnect). For long-lived
hosts that add and remove workers over many sessions (e.g. a FastAPI
server), pass ``auto_end=False`` to ``run()`` so the runner does not
exit when no workers are left.
"""
import asyncio
import gc
import signal
import uuid
import warnings
from dataclasses import dataclass, field
from loguru import logger
from pipecat.bus import (
AsyncQueueBus,
BusAddWorkerMessage,
BusCancelMessage,
BusCancelWorkerMessage,
BusEndMessage,
BusEndWorkerMessage,
BusMessage,
BusWorkerRegistryMessage,
WorkerBus,
)
from pipecat.bus.subscriber import BusSubscriber
from pipecat.pipeline.worker import PipelineWorker
from pipecat.registry import WorkerRegistry
from pipecat.registry.types import WorkerReadyData, WorkerRegistryEntry
from pipecat.utils.asyncio.task_manager import TaskManager, TaskManagerParams
from pipecat.utils.base_object import BaseObject
from pipecat.utils.startup import run_setup_hook
from pipecat.workers.base_worker import BaseWorker, WorkerParams
@dataclass
class _WorkerEntry:
"""A worker registered on the runner and its background asyncio worker."""
worker: BaseWorker
runner_task: asyncio.Task | None = field(default=None, repr=False)
[docs]
class WorkerRunner(BaseObject, BusSubscriber):
"""Manages worker execution.
Provides a high-level interface for running workers with
automatic signal handling (SIGINT/SIGTERM), optional garbage
collection, proper cleanup of resources, and a worker bus + registry
for multi-worker orchestration.
Two entry points:
- :meth:`add_workers(*workers)` — register one or more workers on the
runner's bus and start them in the background. Workers run
concurrently and remaining workers are cancelled when the runner
ends.
- :meth:`run` — block until the runner ends. By default
(``auto_end=True``) the runner ends once every root worker has
finished; pass ``auto_end=False`` to keep the runner up until
:meth:`end` / :meth:`cancel` is called.
Event handlers available:
- ``on_ready`` — fired after the runner has finished its
initialization and any added workers have been started.
- ``on_error`` — fired when starting an added worker fails.
"""
[docs]
def __init__(
self,
*,
name: str | None = None,
bus: WorkerBus | None = None,
handle_sigint: bool = True,
handle_sigterm: bool = False,
force_gc: bool = False,
loop: asyncio.AbstractEventLoop | None = None,
):
"""Initialize the worker runner.
Args:
name: Optional name for the runner instance. Defaults to a
UUID-based name. Must be unique across runners in a
distributed setup.
bus: Optional :class:`WorkerBus`. Defaults to a new
in-process :class:`AsyncQueueBus`.
handle_sigint: Whether to automatically handle SIGINT signals.
handle_sigterm: Whether to automatically handle SIGTERM signals.
force_gc: Whether to force garbage collection after the main
worker completes.
loop: Event loop to use. If None, uses the current running loop.
"""
super().__init__(name=name or f"runner-{uuid.uuid4().hex[:8]}")
self._bus: WorkerBus = bus or AsyncQueueBus()
self._registry = WorkerRegistry(runner_name=self.name)
self._entries: dict[str, _WorkerEntry] = {}
self._known_runners: set[str] = set()
self._running: bool = False
self._auto_end: bool = True
self._shutdown_event = asyncio.Event()
self._sig_task: asyncio.Task | None = None
self._handle_sigint = handle_sigint
self._handle_sigterm = handle_sigterm
self._force_gc = force_gc
self._loop = loop or asyncio.get_running_loop()
self._register_event_handler("on_ready")
self._register_event_handler("on_error")
@property
def bus(self) -> WorkerBus:
"""The bus this runner hosts; shared across launched workers."""
return self._bus
@property
def registry(self) -> WorkerRegistry:
"""The worker registry this runner owns."""
return self._registry
[docs]
async def add_workers(self, *workers: BaseWorker) -> None:
"""Add one or more workers to the runner.
Adding a worker attaches it to the runner's bus and registry, and
starts it in the background. If the runner is not yet running
(``add_workers`` was called before :meth:`run`), workers are
queued and started during run setup; if the runner is already
running, each worker starts immediately.
Added workers run alongside the main worker and are cancelled
when the main worker finishes (or when :meth:`end` /
:meth:`cancel` is called).
Args:
*workers: One or more workers to add.
"""
for worker in workers:
if worker.name in self._entries:
logger.error(
f"WorkerRunner '{self}': worker '{worker.name}' already exists, skipping"
)
continue
# ``attach`` is async because it also subscribes the worker
# to the bus — eager subscription is required so workers
# added later are listening before earlier workers emit
# their first messages.
await worker.attach(registry=self._registry, bus=self._bus)
await self._registry.watch(worker.name, self._on_local_worker_ready)
entry = _WorkerEntry(worker=worker)
self._entries[worker.name] = entry
logger.debug(f"WorkerRunner '{self}': added worker '{worker.name}'")
if self._running:
await self._start_worker(entry)
[docs]
async def run(
self,
worker: BaseWorker | None = None,
*,
auto_end: bool = True,
) -> None:
"""Run all added workers until the runner is stopped.
By default (``auto_end=True``), the runner ends once every root
worker has finished — so a single-pipeline bot naturally ends
when its pipeline does. Multi-worker bots whose helpers run
forever (e.g. waiting for bus messages) end by calling
:meth:`end` / :meth:`cancel` from an event handler (typically on
transport disconnect). For long-lived hosts that add and remove
workers over many sessions (e.g. a FastAPI server), pass
``auto_end=False`` so the runner does not exit when no workers
are left.
Args:
worker: Optional worker to run.
.. deprecated:: 1.3.0
Register the worker with :meth:`add_workers` before
calling ``run()`` instead. Passing ``worker`` here
will be removed in a future release.
auto_end: When ``True`` (the default), the runner ends once
every root worker has finished. When ``False``, the
runner blocks until :meth:`end` or :meth:`cancel` is
called.
"""
if worker is not None:
warnings.warn(
"Passing a worker to WorkerRunner.run() is deprecated; "
"register it with WorkerRunner.add_workers() before calling run() instead.",
DeprecationWarning,
stacklevel=2,
)
logger.debug(f"WorkerRunner '{self}': started running")
self._auto_end = auto_end
self._shutdown_event.clear()
# Treat the main worker as any other added worker: ``add_workers`` attaches
# it to the bus and registry, and ``_setup_session`` then starts every
# entry (main and pre-added) through the same code path.
if worker is not None:
await self.add_workers(worker)
await self._setup_session()
await self._call_event_handler("on_ready")
# Wait for shutdown. With ``auto_end=True``, ``_run_worker`` sets
# ``_shutdown_event`` as soon as any root worker finishes.
try:
await self._shutdown_event.wait()
except asyncio.CancelledError:
pass
try:
# Cancel any remaining launched workers and wait for them to finish.
await self._cancel_spawned_tasks()
# Cleanup base object.
await self.cleanup()
# If we are cancelling through a signal, make sure we wait for it so
# everything gets cleaned up nicely.
if self._sig_task:
await self._sig_task
finally:
await self._bus.stop()
self._running = False
if self._force_gc:
await self._gc_collect()
logger.debug(f"WorkerRunner '{self}': finished running")
[docs]
async def stop_when_done(self) -> None:
"""Schedule all root pipeline workers to stop when their current processing is complete."""
logger.debug(f"WorkerRunner '{self}': scheduled to stop when all workers are done")
await asyncio.gather(
*[
entry.worker.stop_when_done()
for entry in self._entries.values()
if isinstance(entry.worker, PipelineWorker) and entry.worker.parent is None
]
)
[docs]
async def end(self, reason: str | None = None) -> None:
"""Gracefully end all running workers.
Idempotent; subsequent calls are ignored.
Args:
reason: Optional human-readable reason for ending.
"""
if self._shutdown_event.is_set():
return
logger.debug(f"WorkerRunner '{self}': ending gracefully (reason={reason})")
self._shutdown_event.set()
for name, entry in self._entries.items():
if entry.worker.parent is None:
await self._bus.send(
BusEndWorkerMessage(source=self.name, target=name, reason=reason)
)
[docs]
async def cancel(self, reason: str | None = None) -> None:
"""Immediately cancel all running workers.
Idempotent; subsequent calls are ignored.
Args:
reason: Optional human-readable reason for cancelling.
"""
if self._shutdown_event.is_set():
return
logger.debug(f"WorkerRunner '{self}': cancelling (reason={reason})")
self._shutdown_event.set()
for name, entry in self._entries.items():
if entry.worker.parent is None:
await self._bus.send(
BusCancelWorkerMessage(source=self.name, target=name, reason=reason)
)
[docs]
async def on_bus_message(self, message: BusMessage) -> None:
"""Process incoming bus messages for runner-level concerns."""
if message.source == self.name:
return
if isinstance(message, BusEndMessage):
self.create_task(self.end(message.reason), "end")
elif isinstance(message, BusCancelMessage):
self.create_task(self.cancel(message.reason), "cancel")
elif isinstance(message, BusAddWorkerMessage) and message.worker:
await self.add_workers(message.worker)
elif isinstance(message, BusWorkerRegistryMessage):
await self._handle_worker_registry(message)
async def _setup_session(self) -> None:
"""One-time per-run setup: worker manager, bus, signal handlers, launched workers."""
if self._running:
return
task_manager = TaskManager()
task_manager.setup(TaskManagerParams(loop=self._loop))
await super().setup(task_manager)
await self._bus.setup(task_manager)
if self._handle_sigint:
self._setup_sigint()
if self._handle_sigterm:
self._setup_sigterm()
await self._bus.subscribe(self)
await self._bus.start()
await self._load_setup_files()
for entry in self._entries.values():
await self._start_worker(entry)
self._running = True
async def _cancel_spawned_tasks(self) -> None:
"""Wait for added workers' runner tasks to finish (or cancel them)."""
remaining = [
e.runner_task
for e in self._entries.values()
if e.runner_task and not e.runner_task.done()
]
if not remaining:
return
for entry in self._entries.values():
if entry.worker.parent is None:
await self._bus.send(
BusCancelWorkerMessage(
source=self.name, target=entry.worker.name, reason="runner exiting"
)
)
await asyncio.gather(*remaining, return_exceptions=True)
async def _load_setup_files(self) -> None:
"""Run ``setup_worker_runner`` from each file in ``PIPECAT_SETUP_FILES``.
A setup file may define ``setup_worker_runner(runner)`` to add
workers, attach event handlers, or wire other runner-level
configuration.
"""
await run_setup_hook(target=self, function_name="setup_worker_runner")
async def _start_worker(self, entry: _WorkerEntry) -> None:
"""Run a registered worker as a background asyncio worker."""
worker = entry.worker
logger.debug(f"WorkerRunner '{self}': starting worker '{worker.name}'")
entry.runner_task = self.create_task(
self._run_worker(worker),
f"task_{worker.name}",
)
# Add the worker to event loop right away without needing to `await`.
await asyncio.sleep(0)
async def _run_worker(self, worker: BaseWorker) -> None:
"""Drive a registered worker to completion."""
try:
params = WorkerParams(loop=self._loop)
await worker.run(params)
except asyncio.CancelledError:
pass
finally:
# End the runner once every root worker has finished. The
# current worker's task is still "running" (we're inside its
# body), so exclude it from the check.
if self._auto_end and worker.parent is None:
others_running = any(
e.runner_task is not None and not e.runner_task.done()
for e in self._entries.values()
if e.worker.parent is None and e.worker is not worker
)
if not others_running:
self._shutdown_event.set()
async def _on_local_worker_ready(self, data: WorkerReadyData) -> None:
"""Called when a local added worker registers as ready."""
if data.runner != self.name:
return
entry = self._entries.get(data.worker_name)
if not entry or entry.worker.parent is not None:
return
await self._send_registry()
async def _send_registry(self) -> None:
"""Broadcast this runner's workers to the bus."""
workers = [
WorkerRegistryEntry(
name=entry.worker.name,
parent=entry.worker.parent,
active=entry.worker.active,
bridged=entry.worker.bridged,
started_at=entry.worker.started_at,
)
for entry in self._entries.values()
]
if workers:
names = [w.name for w in workers]
logger.debug(f"WorkerRunner '{self}': broadcasting registry: {names}")
await self._bus.send(
BusWorkerRegistryMessage(
source=self.name,
runner=self.name,
workers=workers,
)
)
async def _handle_worker_registry(self, message: BusWorkerRegistryMessage) -> None:
"""Handle a registry message from a remote runner."""
worker_names = [w.name for w in message.workers]
logger.debug(
f"WorkerRunner '{self}': received registry from '{message.runner}' "
f"with workers: {worker_names}"
)
for entry in message.workers:
await self._registry.register(
WorkerReadyData(worker_name=entry.name, runner=message.runner)
)
if message.runner not in self._known_runners:
self._known_runners.add(message.runner)
logger.debug(
f"WorkerRunner '{self}': new runner '{message.runner}', sending our registry back"
)
await self._send_registry()
def _setup_sigint(self) -> None:
"""Set up SIGINT handler for graceful shutdown."""
try:
loop = asyncio.get_running_loop()
loop.add_signal_handler(signal.SIGINT, lambda *args: self._sig_handler())
except NotImplementedError:
# Windows fallback
signal.signal(signal.SIGINT, lambda s, f: self._sig_handler())
def _setup_sigterm(self) -> None:
"""Set up SIGTERM handler for graceful shutdown."""
try:
loop = asyncio.get_running_loop()
loop.add_signal_handler(signal.SIGTERM, lambda *args: self._sig_handler())
except NotImplementedError:
# Windows fallback
signal.signal(signal.SIGTERM, lambda s, f: self._sig_handler())
def _sig_handler(self) -> None:
"""Handle interrupt signals by cancelling the runner."""
if not self._sig_task:
self._sig_task = asyncio.create_task(self._sig_cancel())
async def _sig_cancel(self) -> None:
"""Cancel the runner due to signal interruption."""
logger.warning(f"WorkerRunner '{self}': interruption detected, cancelling.")
await self.cancel(reason="interrupt signal")
async def _gc_collect(self) -> None:
"""Force garbage collection and log results."""
collected = await asyncio.to_thread(gc.collect)
logger.debug(f"Garbage collector: collected {collected} objects.")
logger.debug(f"Garbage collector: uncollectable objects {gc.garbage}")