llm_worker

LLM worker with tool registration.

Provides the LLMWorker class that extends PipelineWorker with an LLM pipeline and automatic tool registration.

class pipecat.workers.llm.llm_worker.PipelineFlushFrame[source]

Bases: ControlFrame, UninterruptibleFrame

Probe frame used to flush all in-flight frames from the pipeline.

class pipecat.workers.llm.llm_worker.LLMWorkerActivationArgs(metadata: dict | None = None, messages: list | None = None, run_llm: bool | None = None)[source]

Bases: WorkerActivationArgs

Activation arguments for LLM workers.

messages

LLM context messages to inject on activation.

Type:

list | None

run_llm

Whether to run the LLM after appending messages. Defaults to True when messages is set.

Type:

bool | None

messages: list | None = None
run_llm: bool | None = None
class pipecat.workers.llm.llm_worker.LLMWorker(name: str, *, llm: LLMService[Any], pipeline: Pipeline | None = None, active: bool = False, bridged: tuple[str, ...] | None = None, defer_tool_frames: bool = True)[source]

Bases: PipelineWorker

Worker with an LLM pipeline and automatic tool registration.

Methods decorated with @tool are registered as direct functions on the LLM and tracked so that frames queued during tool execution can be deferred until all tools complete.

Example:

class MyTask(LLMWorker):
    @tool
    async def my_function(self, params, arg: str):
        ...

worker = MyTask("worker", bus=bus, llm=OpenAILLMService(api_key="..."))
__init__(name: str, *, llm: LLMService[Any], pipeline: Pipeline | None = None, active: bool = False, bridged: tuple[str, ...] | None = None, defer_tool_frames: bool = True)[source]

Initialize the LLMWorker.

Parameters:
  • name – Unique name for this worker.

  • llm – The LLM service. @tool decorated methods are automatically registered on it.

  • pipeline – Optional pipeline override. When None, defaults to Pipeline([llm]). Subclasses can pass a custom pipeline that wraps the LLM with additional processors.

  • active – Whether the worker starts active. Defaults to False.

  • bridged – Bridge configuration forwarded to PipelineWorker. Pass () to wrap the LLM pipeline with bus edge processors so it can exchange frames with another bridged worker.

  • defer_tool_frames – Whether to defer frames queued during tool execution until all tools complete. Defaults to True.

property llm: LLMService

The LLM service this worker wraps.

property tool_call_active: bool

True when one or more @tool methods are executing.

async on_activated(args: dict | None) None[source]

Configure the LLM with tools and activation messages.

Parameters:

args – Optional activation arguments with messages to append.

async queue_frame(frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM) None[source]

Queue a frame, deferring delivery until all tools complete (if any).

When tool calls are in progress, the frame is held in an internal queue and delivered automatically once the last tool finishes. When no tools are active, the frame is queued immediately.

Parameters:
  • frame – Any Frame to deliver.

  • direction – Direction the frame should travel. Defaults to FrameDirection.DOWNSTREAM.

build_tools() list[source]

Return the tools for this worker’s LLM.

By default, returns all methods decorated with @tool. Override to provide additional or different tools.

Returns:

List of tool functions.

async end(*, reason: str | None = None, messages: list | None = None, result_callback: Callable[[...], Any] | None = None) None[source]

Request a graceful end of the session.

When called from a @tool handler, pass params.result_callback to ensure any pending LLM output is fully delivered before ending.

Parameters:
  • reason – Optional human-readable reason for ending.

  • messages – Optional LLM messages to inject and speak before ending. The LLM runs immediately so the output is delivered before the session terminates.

  • result_callback – The result_callback from FunctionCallParams.

async activate_worker(worker_name: str, *, args: WorkerActivationArgs | None = None, deactivate_self: bool = False, messages: list | None = None, result_callback: Callable[[...], Any] | None = None) None[source]

Activate another worker, optionally finishing an in-progress tool call.

When called from a @tool handler, pass params.result_callback to ensure any pending LLM output is fully delivered before the target is activated.

Parameters:
  • worker_name – The name of the worker to activate.

  • args – Optional WorkerActivationArgs forwarded to the target worker’s on_activated handler.

  • deactivate_self – Whether to deactivate this worker before activating the target.

  • messages – Optional LLM messages to inject and speak before activating the target. The LLM runs immediately so the output is delivered before the transfer completes.

  • result_callback – The result_callback from FunctionCallParams.

async process_deferred_tool_frames(frames: list[tuple[Frame, FrameDirection]]) list[tuple[Frame, FrameDirection]][source]

Process deferred frames before they are flushed.

Called after all in-flight tools complete, before the deferred frames are queued into the pipeline. Override to inspect, modify, reorder, or filter the frames.

Parameters:

frames – The deferred frames collected during tool execution.

Returns:

The frames to queue. Return the list as-is for default behavior.