#
# Copyright (c) 2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Worker group types for structured concurrent worker execution."""
from __future__ import annotations
import asyncio
from dataclasses import dataclass, field
from enum import StrEnum
from typing import TYPE_CHECKING, ClassVar
if TYPE_CHECKING:
from pipecat.workers.base_worker import BaseWorker
[docs]
class JobStatus(StrEnum):
"""Status of a completed worker.
Inherits from ``str`` so values compare naturally with plain strings
and serialize without extra handling.
Attributes:
COMPLETED: The worker finished successfully.
CANCELLED: The worker was cancelled by the requester.
FAILED: The worker failed due to a logical or business error.
ERROR: The worker encountered an unexpected runtime error.
"""
COMPLETED = "completed"
CANCELLED = "cancelled"
FAILED = "failed"
ERROR = "error"
[docs]
class JobError(Exception):
"""Raised when a worker is cancelled due to a worker error or timeout."""
pass
[docs]
class JobGroupError(Exception):
"""Raised when a worker group is cancelled due to a worker error or timeout."""
pass
[docs]
@dataclass
class JobGroupResponse:
"""Collected results from a completed job group.
Parameters:
job_id: The shared job identifier.
responses: Collected responses keyed by worker name.
"""
job_id: str
responses: dict[str, dict]
[docs]
@dataclass
class JobEvent:
"""An event received from a worker during a single-worker job.
Parameters:
type: The event type.
data: Optional event payload.
"""
UPDATE: ClassVar[str] = "update"
STREAM_START: ClassVar[str] = "stream_start"
STREAM_DATA: ClassVar[str] = "stream_data"
STREAM_END: ClassVar[str] = "stream_end"
type: str
data: dict | None = None
[docs]
@dataclass
class JobGroupEvent:
"""An event received from a worker during job group execution.
Parameters:
type: The event type.
worker_name: The name of the worker that sent the event.
data: Optional event payload.
"""
UPDATE: ClassVar[str] = "update"
STREAM_START: ClassVar[str] = "stream_start"
STREAM_DATA: ClassVar[str] = "stream_data"
STREAM_END: ClassVar[str] = "stream_end"
type: str
worker_name: str
data: dict | None = None
[docs]
@dataclass
class JobGroup:
"""Tracks a group of workers launched together.
Parameters:
job_id: Shared identifier for all workers in this group.
worker_names: Names of the workers in the group.
responses: Collected responses keyed by worker name.
timeout_task: Optional asyncio worker that cancels the group on timeout.
cancel_on_error: Whether to cancel the group if a worker errors.
event_queue: Optional queue for streaming events to a
``JobGroupContext`` async iterator.
"""
job_id: str
worker_names: set[str]
responses: dict[str, dict] = field(default_factory=dict)
timeout_task: asyncio.Task | None = None
cancel_on_error: bool = True
event_queue: asyncio.Queue | None = field(default=None, repr=False)
_done: asyncio.Event = field(default_factory=asyncio.Event, repr=False)
_error: str | None = field(default=None, repr=False)
@property
def is_done(self) -> bool:
"""Whether the group has completed or failed."""
return self._done.is_set()
[docs]
async def wait(self) -> None:
"""Wait for all workers in the group to respond.
Raises:
JobGroupError: If the group was cancelled due to error or timeout.
"""
await self._done.wait()
if self._error:
raise JobGroupError(self._error)
[docs]
def complete(self) -> None:
"""Signal that all workers have responded."""
self._done.set()
if self.event_queue:
self.event_queue.put_nowait(None)
[docs]
def fail(self, reason: str | None = None) -> None:
"""Signal that the group was cancelled.
Args:
reason: Human-readable reason for the failure.
"""
self._error = reason
self._done.set()
if self.event_queue:
self.event_queue.put_nowait(None)
[docs]
class JobGroupContext:
"""Async context manager and iterator for structured job group execution.
Sends job requests on enter, waits for all responses on exit.
Supports ``async for`` to receive intermediate events (updates
and streaming data) from workers while waiting for completion.
On normal completion, results are available via ``responses``.
On worker error (with ``cancel_on_error=True``) or timeout, raises
``JobGroupError``. If the ``async with`` block raises, remaining
jobs are cancelled.
Example::
async with self.job_group("w1", "w2", payload=data) as tg:
async for event in tg:
print(f"{event.worker_name} [{event.type}]: {event.data}")
for name, result in tg.responses.items():
print(name, result)
"""
[docs]
def __init__(
self,
worker: BaseWorker,
worker_names: tuple[str, ...],
*,
name: str | None = None,
payload: dict | None = None,
timeout: float | None = None,
cancel_on_error: bool = True,
):
"""Initialize the JobGroupContext.
Args:
worker: The parent `BaseWorker` that owns this job group.
worker_names: Names of the workers to send the job to.
name: Optional job name for routing to named ``@job`` handlers.
payload: Optional structured data describing the work.
timeout: Optional timeout in seconds covering both the
ready-wait and job execution.
cancel_on_error: Whether to cancel the group if a worker
errors. Defaults to True.
"""
self._worker = worker
self._worker_names = worker_names
self._name = name
self._payload = payload
self._timeout = timeout
self._cancel_on_error = cancel_on_error
self._group: JobGroup | None = None
@property
def job_id(self) -> str:
"""The shared job identifier for this group."""
if not self._group:
raise RuntimeError("Job group has not been started")
return self._group.job_id
@property
def responses(self) -> dict[str, dict]:
"""Collected responses keyed by worker name."""
if not self._group:
raise RuntimeError("Job group has not been started")
return self._group.responses
def __aiter__(self):
return self
async def __anext__(self) -> JobGroupEvent:
if not self._group or not self._group.event_queue:
raise StopAsyncIteration
event = await self._group.event_queue.get()
if event is None:
raise StopAsyncIteration
return event
async def __aenter__(self) -> JobGroupContext:
self._group = await self._worker.create_job_group_and_request_job(
list(self._worker_names),
name=self._name,
payload=self._payload,
timeout=self._timeout,
cancel_on_error=self._cancel_on_error,
)
self._group.event_queue = asyncio.Queue()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> bool:
if exc_type is not None:
if self._group and self._group.job_id in self._worker.job_groups:
# Shield the cleanup so it completes even if the
# surrounding worker is being cancelled (e.g. tool
# interruption).
await asyncio.shield(
self._worker.cancel_job_group(
self._group.job_id, reason="context exited with error"
)
)
return False
assert self._group is not None
await self._group.wait()
return False
[docs]
class JobContext:
"""Async context manager and iterator for a single-worker job.
Sends a job request on enter, waits for the response on exit.
Supports ``async for`` to receive intermediate events (updates
and streaming data) from the worker while waiting for completion.
On normal completion, the result is available via ``response``.
On worker error or timeout, raises ``JobError``. If the
``async with`` block raises, the job is cancelled.
Example::
async with self.job("worker", payload=data) as t:
async for event in t:
print(f"[{event.type}]: {event.data}")
print(t.response)
"""
[docs]
def __init__(
self,
worker: BaseWorker,
worker_name: str,
*,
name: str | None = None,
payload: dict | None = None,
timeout: float | None = None,
):
"""Initialize the JobContext.
Args:
worker: The parent `BaseWorker` that owns this job.
worker_name: Name of the worker to send the job to.
name: Optional job name for routing to a named ``@job`` handler.
payload: Optional structured data describing the work.
timeout: Optional timeout in seconds covering both the
ready-wait and job execution.
"""
self._worker = worker
self._worker_name = worker_name
self._name = name
self._payload = payload
self._timeout = timeout
self._group: JobGroup | None = None
@property
def job_id(self) -> str:
"""The job identifier."""
if not self._group:
raise RuntimeError("Job has not been started")
return self._group.job_id
@property
def response(self) -> dict:
"""The worker's response payload."""
if not self._group:
raise RuntimeError("Job has not been started")
return self._group.responses.get(self._worker_name, {})
def __aiter__(self):
return self
async def __anext__(self) -> JobEvent:
if not self._group or not self._group.event_queue:
raise StopAsyncIteration
event = await self._group.event_queue.get()
if event is None:
raise StopAsyncIteration
return JobEvent(type=event.type, data=event.data)
async def __aenter__(self) -> JobContext:
self._group = await self._worker.create_job_group_and_request_job(
[self._worker_name],
name=self._name,
payload=self._payload,
timeout=self._timeout,
cancel_on_error=True,
)
self._group.event_queue = asyncio.Queue()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> bool:
if exc_type is not None:
if self._group and self._group.job_id in self._worker.job_groups:
# Shield the cleanup so it completes even if the
# surrounding worker is being cancelled (e.g. tool
# interruption).
await asyncio.shield(
self._worker.cancel_job_group(
self._group.job_id, reason="context exited with error"
)
)
return False
assert self._group is not None
try:
await self._group.wait()
except JobGroupError as e:
raise JobError(str(e)) from e
return False