llm

LLM worker package – LLMWorker, LLMContextWorker, and the @tool decorator.

class pipecat.workers.llm.LLMWorker(name: str, *, llm: LLMService[Any], pipeline: Pipeline | None = None, active: bool = False, bridged: tuple[str, ...] | None = None, defer_tool_frames: bool = True)[source]

Bases: PipelineWorker

Worker with an LLM pipeline and automatic tool registration.

Methods decorated with @tool are registered as direct functions on the LLM and tracked so that frames queued during tool execution can be deferred until all tools complete.

Example:

class MyTask(LLMWorker):
    @tool
    async def my_function(self, params, arg: str):
        ...

worker = MyTask("worker", bus=bus, llm=OpenAILLMService(api_key="..."))
__init__(name: str, *, llm: LLMService[Any], pipeline: Pipeline | None = None, active: bool = False, bridged: tuple[str, ...] | None = None, defer_tool_frames: bool = True)[source]

Initialize the LLMWorker.

Parameters:
  • name – Unique name for this worker.

  • llm – The LLM service. @tool decorated methods are automatically registered on it.

  • pipeline – Optional pipeline override. When None, defaults to Pipeline([llm]). Subclasses can pass a custom pipeline that wraps the LLM with additional processors.

  • active – Whether the worker starts active. Defaults to False.

  • bridged – Bridge configuration forwarded to PipelineWorker. Pass () to wrap the LLM pipeline with bus edge processors so it can exchange frames with another bridged worker.

  • defer_tool_frames – Whether to defer frames queued during tool execution until all tools complete. Defaults to True.

property llm: LLMService

The LLM service this worker wraps.

property tool_call_active: bool

True when one or more @tool methods are executing.

async on_activated(args: dict | None) None[source]

Configure the LLM with tools and activation messages.

Parameters:

args – Optional activation arguments with messages to append.

async queue_frame(frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM) None[source]

Queue a frame, deferring delivery until all tools complete (if any).

When tool calls are in progress, the frame is held in an internal queue and delivered automatically once the last tool finishes. When no tools are active, the frame is queued immediately.

Parameters:
  • frame – Any Frame to deliver.

  • direction – Direction the frame should travel. Defaults to FrameDirection.DOWNSTREAM.

build_tools() list[source]

Return the tools for this worker’s LLM.

By default, returns all methods decorated with @tool. Override to provide additional or different tools.

Returns:

List of tool functions.

async end(*, reason: str | None = None, messages: list | None = None, result_callback: Callable[[...], Any] | None = None) None[source]

Request a graceful end of the session.

When called from a @tool handler, pass params.result_callback to ensure any pending LLM output is fully delivered before ending.

Parameters:
  • reason – Optional human-readable reason for ending.

  • messages – Optional LLM messages to inject and speak before ending. The LLM runs immediately so the output is delivered before the session terminates.

  • result_callback – The result_callback from FunctionCallParams.

async activate_worker(worker_name: str, *, args: WorkerActivationArgs | None = None, deactivate_self: bool = False, messages: list | None = None, result_callback: Callable[[...], Any] | None = None) None[source]

Activate another worker, optionally finishing an in-progress tool call.

When called from a @tool handler, pass params.result_callback to ensure any pending LLM output is fully delivered before the target is activated.

Parameters:
  • worker_name – The name of the worker to activate.

  • args – Optional WorkerActivationArgs forwarded to the target worker’s on_activated handler.

  • deactivate_self – Whether to deactivate this worker before activating the target.

  • messages – Optional LLM messages to inject and speak before activating the target. The LLM runs immediately so the output is delivered before the transfer completes.

  • result_callback – The result_callback from FunctionCallParams.

async process_deferred_tool_frames(frames: list[tuple[Frame, FrameDirection]]) list[tuple[Frame, FrameDirection]][source]

Process deferred frames before they are flushed.

Called after all in-flight tools complete, before the deferred frames are queued into the pipeline. Override to inspect, modify, reorder, or filter the frames.

Parameters:

frames – The deferred frames collected during tool execution.

Returns:

The frames to queue. Return the list as-is for default behavior.

class pipecat.workers.llm.LLMWorkerActivationArgs(metadata: dict | None = None, messages: list | None = None, run_llm: bool | None = None)[source]

Bases: WorkerActivationArgs

Activation arguments for LLM workers.

messages

LLM context messages to inject on activation.

Type:

list | None

run_llm

Whether to run the LLM after appending messages. Defaults to True when messages is set.

Type:

bool | None

messages: list | None = None
run_llm: bool | None = None
class pipecat.workers.llm.LLMContextWorker(name: str, *, llm: LLMService[Any], active: bool = False, bridged: tuple[str, ...] | None = None, defer_tool_frames: bool = True, context: LLMContext | None = None, user_params: LLMUserAggregatorParams | None = None, assistant_params: LLMAssistantAggregatorParams | None = None)[source]

Bases: LLMWorker

LLM worker that owns an LLMContext and a context aggregator pair.

Useful for workers that need to track their own conversation history, typically workers that run their own LLM pipeline outside of a shared transport pipeline. Subclasses do not need to instantiate the context or aggregators themselves; the pipeline is built as [user_aggregator, llm, assistant_aggregator] automatically.

Example:

worker = LLMContextWorker(
    "worker",
    llm=OpenAILLMService(...),
)

@worker.assistant_aggregator.event_handler("on_assistant_turn_stopped")
async def _on_stopped(aggregator, message):
    ...
__init__(name: str, *, llm: LLMService[Any], active: bool = False, bridged: tuple[str, ...] | None = None, defer_tool_frames: bool = True, context: LLMContext | None = None, user_params: LLMUserAggregatorParams | None = None, assistant_params: LLMAssistantAggregatorParams | None = None)[source]

Initialize the LLMContextWorker.

Parameters:
  • name – Unique name for this worker.

  • llm – The LLM service.

  • active – Whether the worker starts active. Defaults to False.

  • bridged – Bridge configuration forwarded to PipelineWorker. Pass () to wrap the pipeline with bus edges so it can exchange frames with another bridged worker.

  • defer_tool_frames – Whether to defer frames queued during tool execution until all tools complete. Defaults to True.

  • context – Optional pre-built LLMContext. When omitted, a fresh empty context is created.

  • user_params – Optional parameters for the user aggregator.

  • assistant_params – Optional parameters for the assistant aggregator.

property context: LLMContext

The LLMContext owned by this worker.

property user_aggregator: LLMUserAggregator

The user-side context aggregator.

property assistant_aggregator: LLMAssistantAggregator

The assistant-side context aggregator.

pipecat.workers.llm.tool(fn=None, *, cancel_on_interruption=True, timeout=None)[source]

Mark a method as a tool.

On LLMWorker subclasses, decorated methods are automatically registered with the LLM via register_direct_function and included in build_tools().

Can be used with or without arguments:

@tool
async def my_tool(self, params, arg: str):
    ...

@tool(cancel_on_interruption=False, timeout=60)
async def my_tool(self, params, arg: str):
    ...
Parameters:
  • fn – The function to decorate (when used without arguments).

  • cancel_on_interruption – Whether to cancel this tool call when an interruption occurs. Defaults to True. Only applies to LLMWorker tools.

  • timeout – Optional timeout in seconds for this tool call. Defaults to None (uses the LLM service default).

Submodules