Source code for pipecat.workers.llm.llm_context_worker

#
# Copyright (c) 2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""LLM worker with a built-in `LLMContext` and aggregator pair.

Provides the `LLMContextWorker` class that extends `LLMWorker` with a
self-contained conversation context, removing the need for subclasses
to manually wire `LLMContextAggregatorPair`.
"""

from typing import Any

from pipecat.pipeline.pipeline import Pipeline
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
    LLMAssistantAggregator,
    LLMAssistantAggregatorParams,
    LLMContextAggregatorPair,
    LLMUserAggregator,
    LLMUserAggregatorParams,
)
from pipecat.services.llm_service import LLMService
from pipecat.workers.llm.llm_worker import LLMWorker


[docs] class LLMContextWorker(LLMWorker): """LLM worker that owns an `LLMContext` and a context aggregator pair. Useful for workers that need to track their own conversation history, typically workers that run their own LLM pipeline outside of a shared transport pipeline. Subclasses do not need to instantiate the context or aggregators themselves; the pipeline is built as ``[user_aggregator, llm, assistant_aggregator]`` automatically. Example:: worker = LLMContextWorker( "worker", llm=OpenAILLMService(...), ) @worker.assistant_aggregator.event_handler("on_assistant_turn_stopped") async def _on_stopped(aggregator, message): ... """
[docs] def __init__( self, name: str, *, llm: LLMService[Any], active: bool = False, bridged: tuple[str, ...] | None = None, defer_tool_frames: bool = True, context: LLMContext | None = None, user_params: LLMUserAggregatorParams | None = None, assistant_params: LLMAssistantAggregatorParams | None = None, ): """Initialize the LLMContextWorker. Args: name: Unique name for this worker. llm: The LLM service. active: Whether the worker starts active. Defaults to False. bridged: Bridge configuration forwarded to ``PipelineWorker``. Pass ``()`` to wrap the pipeline with bus edges so it can exchange frames with another bridged worker. defer_tool_frames: Whether to defer frames queued during tool execution until all tools complete. Defaults to True. context: Optional pre-built `LLMContext`. When omitted, a fresh empty context is created. user_params: Optional parameters for the user aggregator. assistant_params: Optional parameters for the assistant aggregator. """ self._context = context or LLMContext() self._aggregators = LLMContextAggregatorPair( self._context, user_params=user_params, assistant_params=assistant_params, ) pipeline = Pipeline( [ self._aggregators.user(), llm, self._aggregators.assistant(), ] ) super().__init__( name, llm=llm, pipeline=pipeline, active=active, bridged=bridged, defer_tool_frames=defer_tool_frames, )
@property def context(self) -> LLMContext: """The `LLMContext` owned by this worker.""" return self._context @property def user_aggregator(self) -> LLMUserAggregator: """The user-side context aggregator.""" return self._aggregators.user() @property def assistant_aggregator(self) -> LLMAssistantAggregator: """The assistant-side context aggregator.""" return self._aggregators.assistant()