llm
LLM worker package – LLMWorker, LLMContextWorker, and the @tool decorator.
- class pipecat.workers.llm.LLMWorker(name: str, *, llm: LLMService[Any], pipeline: Pipeline | None = None, active: bool = False, bridged: tuple[str, ...] | None = None, defer_tool_frames: bool = True)[source]
Bases:
PipelineWorkerWorker with an LLM pipeline and automatic tool registration.
Methods decorated with
@toolare registered as direct functions on the LLM and tracked so that frames queued during tool execution can be deferred until all tools complete.Example:
class MyTask(LLMWorker): @tool async def my_function(self, params, arg: str): ... worker = MyTask("worker", bus=bus, llm=OpenAILLMService(api_key="..."))
- __init__(name: str, *, llm: LLMService[Any], pipeline: Pipeline | None = None, active: bool = False, bridged: tuple[str, ...] | None = None, defer_tool_frames: bool = True)[source]
Initialize the LLMWorker.
- Parameters:
name – Unique name for this worker.
llm – The LLM service.
@tooldecorated methods are automatically registered on it.pipeline – Optional pipeline override. When
None, defaults toPipeline([llm]). Subclasses can pass a custom pipeline that wraps the LLM with additional processors.active – Whether the worker starts active. Defaults to False.
bridged – Bridge configuration forwarded to
PipelineWorker. Pass()to wrap the LLM pipeline with bus edge processors so it can exchange frames with another bridged worker.defer_tool_frames – Whether to defer frames queued during tool execution until all tools complete. Defaults to True.
- property llm: LLMService
The LLM service this worker wraps.
- property tool_call_active: bool
True when one or more
@toolmethods are executing.
- async on_activated(args: dict | None) None[source]
Configure the LLM with tools and activation messages.
- Parameters:
args – Optional activation arguments with messages to append.
- async queue_frame(frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM) None[source]
Queue a frame, deferring delivery until all tools complete (if any).
When tool calls are in progress, the frame is held in an internal queue and delivered automatically once the last tool finishes. When no tools are active, the frame is queued immediately.
- Parameters:
frame – Any
Frameto deliver.direction – Direction the frame should travel. Defaults to
FrameDirection.DOWNSTREAM.
- build_tools() list[source]
Return the tools for this worker’s LLM.
By default, returns all methods decorated with
@tool. Override to provide additional or different tools.- Returns:
List of tool functions.
- async end(*, reason: str | None = None, messages: list | None = None, result_callback: Callable[[...], Any] | None = None) None[source]
Request a graceful end of the session.
When called from a
@toolhandler, passparams.result_callbackto ensure any pending LLM output is fully delivered before ending.- Parameters:
reason – Optional human-readable reason for ending.
messages – Optional LLM messages to inject and speak before ending. The LLM runs immediately so the output is delivered before the session terminates.
result_callback – The
result_callbackfrom FunctionCallParams.
- async activate_worker(worker_name: str, *, args: WorkerActivationArgs | None = None, deactivate_self: bool = False, messages: list | None = None, result_callback: Callable[[...], Any] | None = None) None[source]
Activate another worker, optionally finishing an in-progress tool call.
When called from a
@toolhandler, passparams.result_callbackto ensure any pending LLM output is fully delivered before the target is activated.- Parameters:
worker_name – The name of the worker to activate.
args – Optional
WorkerActivationArgsforwarded to the target worker’son_activatedhandler.deactivate_self – Whether to deactivate this worker before activating the target.
messages – Optional LLM messages to inject and speak before activating the target. The LLM runs immediately so the output is delivered before the transfer completes.
result_callback – The
result_callbackfrom FunctionCallParams.
- async process_deferred_tool_frames(frames: list[tuple[Frame, FrameDirection]]) list[tuple[Frame, FrameDirection]][source]
Process deferred frames before they are flushed.
Called after all in-flight tools complete, before the deferred frames are queued into the pipeline. Override to inspect, modify, reorder, or filter the frames.
- Parameters:
frames – The deferred frames collected during tool execution.
- Returns:
The frames to queue. Return the list as-is for default behavior.
- class pipecat.workers.llm.LLMWorkerActivationArgs(metadata: dict | None = None, messages: list | None = None, run_llm: bool | None = None)[source]
Bases:
WorkerActivationArgsActivation arguments for LLM workers.
- messages
LLM context messages to inject on activation.
- Type:
list | None
- run_llm
Whether to run the LLM after appending messages. Defaults to True when
messagesis set.- Type:
bool | None
- messages: list | None = None
- run_llm: bool | None = None
- class pipecat.workers.llm.LLMContextWorker(name: str, *, llm: LLMService[Any], active: bool = False, bridged: tuple[str, ...] | None = None, defer_tool_frames: bool = True, context: LLMContext | None = None, user_params: LLMUserAggregatorParams | None = None, assistant_params: LLMAssistantAggregatorParams | None = None)[source]
Bases:
LLMWorkerLLM worker that owns an LLMContext and a context aggregator pair.
Useful for workers that need to track their own conversation history, typically workers that run their own LLM pipeline outside of a shared transport pipeline. Subclasses do not need to instantiate the context or aggregators themselves; the pipeline is built as
[user_aggregator, llm, assistant_aggregator]automatically.Example:
worker = LLMContextWorker( "worker", llm=OpenAILLMService(...), ) @worker.assistant_aggregator.event_handler("on_assistant_turn_stopped") async def _on_stopped(aggregator, message): ...
- __init__(name: str, *, llm: LLMService[Any], active: bool = False, bridged: tuple[str, ...] | None = None, defer_tool_frames: bool = True, context: LLMContext | None = None, user_params: LLMUserAggregatorParams | None = None, assistant_params: LLMAssistantAggregatorParams | None = None)[source]
Initialize the LLMContextWorker.
- Parameters:
name – Unique name for this worker.
llm – The LLM service.
active – Whether the worker starts active. Defaults to False.
bridged – Bridge configuration forwarded to
PipelineWorker. Pass()to wrap the pipeline with bus edges so it can exchange frames with another bridged worker.defer_tool_frames – Whether to defer frames queued during tool execution until all tools complete. Defaults to True.
context – Optional pre-built LLMContext. When omitted, a fresh empty context is created.
user_params – Optional parameters for the user aggregator.
assistant_params – Optional parameters for the assistant aggregator.
- property context: LLMContext
The LLMContext owned by this worker.
- property user_aggregator: LLMUserAggregator
The user-side context aggregator.
- property assistant_aggregator: LLMAssistantAggregator
The assistant-side context aggregator.
- pipecat.workers.llm.tool(fn=None, *, cancel_on_interruption=True, timeout=None)[source]
Mark a method as a tool.
On
LLMWorkersubclasses, decorated methods are automatically registered with the LLM viaregister_direct_functionand included inbuild_tools().Can be used with or without arguments:
@tool async def my_tool(self, params, arg: str): ... @tool(cancel_on_interruption=False, timeout=60) async def my_tool(self, params, arg: str): ...
- Parameters:
fn – The function to decorate (when used without arguments).
cancel_on_interruption – Whether to cancel this tool call when an interruption occurs. Defaults to True. Only applies to
LLMWorkertools.timeout – Optional timeout in seconds for this tool call. Defaults to None (uses the LLM service default).