#
# Copyright (c) 2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Shared registry for tracking known workers across runners."""
from collections import defaultdict
from collections.abc import Callable, Coroutine
from loguru import logger
from pipecat.registry.types import WorkerReadyData
WatchHandler = Callable[[WorkerReadyData], Coroutine]
[docs]
class WorkerRegistry:
"""Tracks all known workers across local and remote runners.
Owned by a runner and shared with its workers. Organizes workers into
local (this runner) and remote (other runners) so they are easy to
distinguish. Deduplication is built in: each worker name is
registered at most once.
Notifications use a targeted watch mechanism: call
``watch(worker_name, handler)`` to be notified when a specific worker
registers.
"""
[docs]
def __init__(self, runner_name: str):
"""Initialize the WorkerRegistry.
Args:
runner_name: Name of the runner that owns this registry.
"""
self._runner_name = runner_name
self._local_workers: dict[str, WorkerReadyData] = {}
self._remote_workers: dict[str, dict[str, WorkerReadyData]] = defaultdict(dict)
self._watches: dict[str, list[WatchHandler]] = defaultdict(list)
@property
def runner_name(self) -> str:
"""The name of the runner that owns this registry."""
return self._runner_name
@property
def local_workers(self) -> list[str]:
"""Names of workers registered under this runner."""
return list(self._local_workers.keys())
@property
def remote_workers(self) -> list[str]:
"""Names of workers registered under remote runners."""
result: list[str] = []
for workers in self._remote_workers.values():
result.extend(workers.keys())
return result
[docs]
def get(self, worker_name: str) -> WorkerReadyData | None:
"""Look up a registered worker by name.
Args:
worker_name: The worker name to look up.
Returns:
The worker's ``WorkerReadyData``, or None if not found.
"""
if worker_name in self._local_workers:
return self._local_workers[worker_name]
for workers in self._remote_workers.values():
if worker_name in workers:
return workers[worker_name]
return None
def __contains__(self, worker_name: str) -> bool:
return self.get(worker_name) is not None
[docs]
async def watch(self, worker_name: str, handler: WatchHandler) -> None:
"""Watch for a specific worker's registration.
Idempotent: registering the same ``(worker_name, handler)`` pair
more than once is a no-op (otherwise the handler would fire
multiple times when the worker registers — e.g. when a parent
both calls ``add_workers(child)`` (which auto-watches) and
declares a ``@worker_ready(name=child.name)`` handler that the
framework also installs).
If the worker is already registered, the handler fires immediately.
Args:
worker_name: The worker name to watch for.
handler: Async callable invoked with the worker's data.
"""
handlers = self._watches[worker_name]
if handler in handlers:
return
handlers.append(handler)
existing = self.get(worker_name)
if existing:
await handler(existing)
[docs]
async def register(self, worker_data: WorkerReadyData) -> bool:
"""Register a worker. Returns True if the worker was new.
If the worker is already registered, this is a no-op and returns
False. Otherwise the worker is added and watchers are notified.
Args:
worker_data: Information about the worker to register.
Returns:
True if the worker was newly registered, False if already known.
"""
is_local = worker_data.runner == self._runner_name
target = self._local_workers if is_local else self._remote_workers[worker_data.runner]
if worker_data.worker_name in target:
return False
# Warn if the same name exists on a different runner
existing = self.get(worker_data.worker_name)
if existing and existing.runner != worker_data.runner:
logger.warning(
f"Worker '{worker_data.worker_name}' registered on both "
f"'{existing.runner}' and '{worker_data.runner}'"
)
target[worker_data.worker_name] = worker_data
locality = "local" if is_local else worker_data.runner
logger.debug(f"Worker '{worker_data.worker_name}' ready ({locality})")
await self._notify(worker_data)
return True
async def _notify(self, worker_data: WorkerReadyData) -> None:
"""Notify watchers of a new registration."""
for handler in self._watches.get(worker_data.worker_name, []):
await handler(worker_data)