#
# Copyright (c) 2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""LLM worker with tool registration.
Provides the `LLMWorker` class that extends `PipelineWorker` with an LLM
pipeline and automatic tool registration.
"""
import asyncio
import functools
from collections import deque
from collections.abc import Callable
from dataclasses import dataclass
from typing import Any
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.frames.frames import (
ControlFrame,
Frame,
FunctionCallResultProperties,
LLMMessagesAppendFrame,
LLMSetToolsFrame,
UninterruptibleFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.worker import PipelineParams, PipelineWorker
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.llm_service import LLMService
from pipecat.workers.base_worker import WorkerActivationArgs
from pipecat.workers.llm.tool_decorator import _collect_tools
FunctionCallResultCallback = Callable[..., Any]
[docs]
@dataclass
class PipelineFlushFrame(ControlFrame, UninterruptibleFrame):
"""Probe frame used to flush all in-flight frames from the pipeline."""
pass
[docs]
@dataclass
class LLMWorkerActivationArgs(WorkerActivationArgs):
"""Activation arguments for LLM workers.
Attributes:
messages: LLM context messages to inject on activation.
run_llm: Whether to run the LLM after appending messages.
Defaults to True when ``messages`` is set.
"""
messages: list | None = None
run_llm: bool | None = None
[docs]
class LLMWorker(PipelineWorker):
"""Worker with an LLM pipeline and automatic tool registration.
Methods decorated with ``@tool`` are registered as direct functions
on the LLM and tracked so that frames queued during tool execution
can be deferred until all tools complete.
Example::
class MyTask(LLMWorker):
@tool
async def my_function(self, params, arg: str):
...
worker = MyTask("worker", bus=bus, llm=OpenAILLMService(api_key="..."))
"""
[docs]
def __init__(
self,
name: str,
*,
llm: LLMService[Any],
pipeline: Pipeline | None = None,
active: bool = False,
bridged: tuple[str, ...] | None = None,
defer_tool_frames: bool = True,
):
"""Initialize the LLMWorker.
Args:
name: Unique name for this worker.
llm: The LLM service. ``@tool`` decorated methods are
automatically registered on it.
pipeline: Optional pipeline override. When ``None``,
defaults to ``Pipeline([llm])``. Subclasses can pass a
custom pipeline that wraps the LLM with additional
processors.
active: Whether the worker starts active. Defaults to False.
bridged: Bridge configuration forwarded to ``PipelineWorker``.
Pass ``()`` to wrap the LLM pipeline with bus edge
processors so it can exchange frames with another
bridged worker.
defer_tool_frames: Whether to defer frames queued during
tool execution until all tools complete. Defaults to True.
"""
# State referenced by tool wrapper closures; must be set before
# _register_tools wraps any handlers.
self._defer_tool_frames = defer_tool_frames
self._tool_call_inflight: int = 0
self._deferred_frames: deque[tuple[Frame, FrameDirection]] = deque()
self._closing: bool = False
self._flush_done: asyncio.Event = asyncio.Event()
self._llm = llm
self._register_tools(llm)
pipeline = pipeline if pipeline is not None else Pipeline([self._llm])
super().__init__(
pipeline,
name=name,
bridged=bridged,
exclude_frames=(PipelineFlushFrame,),
enable_rtvi=False,
idle_timeout_secs=None,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
)
# PipelineWorker's __init__ doesn't accept active; configure after.
self._active = active
self._pending_activation = active
# Pipeline flush wiring: the probe frame travels up to the source
# and back down to the sink, signalling that all in-flight frames
# have been processed.
self.add_reached_upstream_filter((PipelineFlushFrame,))
self.add_reached_downstream_filter((PipelineFlushFrame,))
@self.event_handler("on_frame_reached_upstream")
async def _on_flush_upstream(worker, frame):
if isinstance(frame, PipelineFlushFrame):
await super().queue_frame(PipelineFlushFrame())
@self.event_handler("on_frame_reached_downstream")
async def _on_flush_downstream(worker, frame):
if isinstance(frame, PipelineFlushFrame):
self._flush_done.set()
@property
def llm(self) -> LLMService:
"""The LLM service this worker wraps."""
return self._llm
@property
def tool_call_active(self) -> bool:
"""True when one or more ``@tool`` methods are executing."""
return self._tool_call_inflight > 0
[docs]
async def on_activated(self, args: dict | None) -> None:
"""Configure the LLM with tools and activation messages.
Args:
args: Optional activation arguments with messages to append.
"""
await super().on_activated(args)
activation = LLMWorkerActivationArgs.from_dict(args) if args else LLMWorkerActivationArgs()
tools = self.build_tools()
if tools:
await self.queue_frame(LLMSetToolsFrame(tools=ToolsSchema(standard_tools=tools)))
if activation.messages:
run_llm = activation.run_llm if activation.run_llm is not None else True
await self.queue_frame(
LLMMessagesAppendFrame(messages=activation.messages, run_llm=run_llm)
)
[docs]
async def queue_frame(
self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM
) -> None:
"""Queue a frame, deferring delivery until all tools complete (if any).
When tool calls are in progress, the frame is held in an internal
queue and delivered automatically once the last tool finishes.
When no tools are active, the frame is queued immediately.
Args:
frame: Any ``Frame`` to deliver.
direction: Direction the frame should travel. Defaults to
``FrameDirection.DOWNSTREAM``.
"""
if self._defer_tool_frames and self._tool_call_inflight > 0 and not self._closing:
self._deferred_frames.append((frame, direction))
else:
await super().queue_frame(frame, direction)
[docs]
async def end(
self,
*,
reason: str | None = None,
messages: list | None = None,
result_callback: FunctionCallResultCallback | None = None,
) -> None:
"""Request a graceful end of the session.
When called from a ``@tool`` handler, pass ``params.result_callback`` to
ensure any pending LLM output is fully delivered before ending.
Args:
reason: Optional human-readable reason for ending.
messages: Optional LLM messages to inject and speak before
ending. The LLM runs immediately so the output is
delivered before the session terminates.
result_callback: The ``result_callback`` from
`FunctionCallParams`.
"""
self._closing = True
await self._finish_function_call(result_callback, messages=messages)
await super().end(reason=reason)
[docs]
async def activate_worker(
self,
worker_name: str,
*,
args: WorkerActivationArgs | None = None,
deactivate_self: bool = False,
messages: list | None = None,
result_callback: FunctionCallResultCallback | None = None,
) -> None:
"""Activate another worker, optionally finishing an in-progress tool call.
When called from a ``@tool`` handler, pass ``params.result_callback`` to
ensure any pending LLM output is fully delivered before the target is
activated.
Args:
worker_name: The name of the worker to activate.
args: Optional ``WorkerActivationArgs`` forwarded to the target
worker's ``on_activated`` handler.
deactivate_self: Whether to deactivate this worker before activating
the target.
messages: Optional LLM messages to inject and speak before
activating the target. The LLM runs immediately so the output
is delivered before the transfer completes.
result_callback: The ``result_callback`` from `FunctionCallParams`.
"""
await self._finish_function_call(result_callback, messages=messages)
await super().activate_worker(worker_name, args=args, deactivate_self=deactivate_self)
def _register_tools(self, llm: LLMService) -> None:
"""Register ``@tool`` methods on the LLM in place."""
for method in _collect_tools(self):
tracked = self._track_tool_call(method)
llm.register_direct_function(
tracked,
cancel_on_interruption=method.cancel_on_interruption,
timeout_secs=method.timeout,
)
def _track_tool_call(self, method: Callable) -> Callable:
@functools.wraps(method)
async def wrapper(params, *args, **kwargs):
self._tool_call_inflight += 1
try:
return await method(params, *args, **kwargs)
finally:
self._tool_call_inflight = max(0, self._tool_call_inflight - 1)
if not self._closing and self._tool_call_inflight == 0:
await self._flush_deferred_frames()
return wrapper
async def _flush_deferred_frames(self) -> None:
# Wait until the function result frame is really processed.
await self._flush_pipeline()
frames = list(self._deferred_frames)
self._deferred_frames.clear()
for frame, direction in await self.process_deferred_tool_frames(frames):
await self.queue_frame(frame, direction)
async def _flush_pipeline(self) -> None:
self._flush_done.clear()
# Bypass our deferral override by going to PipelineWorker directly.
await super().queue_frame(PipelineFlushFrame(), FrameDirection.UPSTREAM)
await self._flush_done.wait()
async def _finish_function_call(
self,
result_callback: FunctionCallResultCallback | None,
*,
messages: list | None = None,
) -> None:
"""Finish an in-progress function call before taking action.
Optionally injects LLM messages and flushes the pipeline so the
output is fully delivered before handing off or ending.
Args:
result_callback: The callback from `FunctionCallParams`, or None.
messages: Optional LLM messages to inject before completing.
"""
if messages:
# Bypass our deferral override: this runs inside a tool call, so
# self.queue_frame would defer the frame and the flush below would
# return before the LLM output is delivered.
await super().queue_frame(LLMMessagesAppendFrame(messages=messages, run_llm=True))
await self._flush_pipeline()
if not result_callback:
return
await result_callback(None, properties=FunctionCallResultProperties(run_llm=False))
# Wait until the function result frame is really processed.
await self._flush_pipeline()