Source code for pipecat.processors.aggregators.llm_response_universal

#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""LLM response aggregators for handling conversation context and message aggregation.

This module provides aggregators that process and accumulate LLM responses, user inputs,
and conversation context. These aggregators handle the flow between speech-to-text,
LLM processing, and text-to-speech components in conversational AI pipelines.
"""

import asyncio
import json
import warnings
from abc import abstractmethod
from collections.abc import Callable
from dataclasses import dataclass, field
from typing import Any, Literal

from loguru import logger

from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.vad_analyzer import VADAnalyzer
from pipecat.audio.vad.vad_controller import VADController
from pipecat.frames.frames import (
    AggregatedTextFrame,
    AssistantImageRawFrame,
    BotStartedSpeakingFrame,
    BotStoppedSpeakingFrame,
    CancelFrame,
    EndFrame,
    Frame,
    FunctionCallCancelFrame,
    FunctionCallInProgressFrame,
    FunctionCallResultFrame,
    FunctionCallsStartedFrame,
    InputAudioRawFrame,
    InterimTranscriptionFrame,
    InterruptionFrame,
    LLMAssistantPushAggregationFrame,
    LLMContextAssistantTimestampFrame,
    LLMContextFrame,
    LLMContextSummaryRequestFrame,
    LLMFullResponseEndFrame,
    LLMFullResponseStartFrame,
    LLMMarkerFrame,
    LLMMessagesAppendFrame,
    LLMMessagesTransformFrame,
    LLMMessagesUpdateFrame,
    LLMRunFrame,
    LLMSetToolChoiceFrame,
    LLMSetToolsFrame,
    LLMThoughtEndFrame,
    LLMThoughtStartFrame,
    LLMThoughtTextFrame,
    RealtimeServiceMetadataFrame,
    StartFrame,
    STTMetadataFrame,
    TextFrame,
    TranscriptionFrame,
    TranslationFrame,
    UserImageRawFrame,
    UserMuteStartedFrame,
    UserMuteStoppedFrame,
    UserSpeakingFrame,
    UserStartedSpeakingFrame,
    UserStoppedSpeakingFrame,
    VADUserStartedSpeakingFrame,
    VADUserStoppedSpeakingFrame,
)
from pipecat.processors.aggregators import async_tool_messages
from pipecat.processors.aggregators.llm_context import (
    LLMContext,
    LLMContextMessage,
    LLMSpecificMessage,
    NotGiven,
    is_given,
)
from pipecat.processors.aggregators.llm_context_summarizer import (
    LLMContextSummarizer,
    SummaryAppliedEvent,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.stt_latency import DEFAULT_TTFS_P99
from pipecat.turns.user_idle_controller import UserIdleController
from pipecat.turns.user_mute import BaseUserMuteStrategy
from pipecat.turns.user_start import (
    BaseUserTurnStartStrategy,
    ExternalUserTurnStartStrategy,
    TranscriptionUserTurnStartStrategy,
    UserTurnStartedParams,
)
from pipecat.turns.user_stop import (
    BaseUserTurnStopStrategy,
    ExternalUserTurnStopStrategy,
    UserTurnStoppedParams,
)
from pipecat.turns.user_turn_completion_mixin import UserTurnCompletionConfig
from pipecat.turns.user_turn_controller import UserTurnController
from pipecat.turns.user_turn_strategies import (
    FilterIncompleteUserTurnStrategies,
    UserTurnStrategies,
)
from pipecat.utils.context.llm_context_summarization import (
    LLMAutoContextSummarizationConfig,
    LLMContextSummarizationConfig,
)
from pipecat.utils.string import TextPartForConcatenation, concatenate_aggregated_text
from pipecat.utils.time import time_now_iso8601


[docs] @dataclass class LLMUserAggregatorParams: """Parameters for configuring LLM user aggregation behavior. Parameters: add_tool_change_messages: When True, on each ``LLMSetToolsFrame`` the aggregator computes the diff against the currently advertised tools and appends a developer-role message to the context describing additions/removals. Helps the LLM stay coherent across mid-conversation tool changes, mitigating several flavors of tool-call-related hallucination: calling tools that have been removed, avoiding tools that have been re-added, and hallucinating output (made-up answers or tool-call-shaped non-tool-calls) when tools are unavailable. Only standard tools are diffed; custom (LLM-specific) tools are ignored. When using ``LLMContextAggregatorPair``, prefer setting this via its ``add_tool_change_messages`` argument instead. Defaults to False. audio_idle_timeout: Timeout in seconds to force speech stop when no audio frames are received while in SPEAKING state (e.g. user mutes mic mid-speech). Set to 0 to disable. Defaults to 1.0. user_turn_strategies: User turn start and stop strategies. user_mute_strategies: List of user mute strategies. user_turn_stop_timeout: Time in seconds to wait before considering the user's turn finished. user_idle_timeout: Timeout in seconds for detecting user idle state. The aggregator will emit an `on_user_turn_idle` event when the user has been idle (not speaking) for this duration. Set to 0 to disable idle detection. vad_analyzer: Voice Activity Detection analyzer instance. filter_incomplete_user_turns: When enabled, the LLM outputs a turn-completion marker at the start of each response: ✓ (complete), ○ (incomplete short), or ◐ (incomplete long). Incomplete responses are suppressed and timeouts trigger re-prompting. .. deprecated:: 1.2.0 Use ``user_turn_strategies=FilterIncompleteUserTurnStrategies()`` instead. Will be removed in version 2.0.0. user_turn_completion_config: Configuration for turn completion behavior including custom instructions, timeouts, and prompts. Only used when filter_incomplete_user_turns is True (deprecated path) — for the new strategy-based API, pass the config directly to ``FilterIncompleteUserTurnStrategies(config=...)``. .. deprecated:: 1.2.0 Pass the config directly to ``FilterIncompleteUserTurnStrategies(config=...)`` instead. Will be removed in version 2.0.0. """ add_tool_change_messages: bool = False audio_idle_timeout: float = 1.0 user_turn_strategies: UserTurnStrategies | None = None user_mute_strategies: list[BaseUserMuteStrategy] = field(default_factory=list) user_turn_stop_timeout: float = 5.0 user_idle_timeout: float = 0 vad_analyzer: VADAnalyzer | None = None filter_incomplete_user_turns: bool = False user_turn_completion_config: UserTurnCompletionConfig | None = None def __post_init__(self): if self.filter_incomplete_user_turns: warnings.warn( "LLMUserAggregatorParams.filter_incomplete_user_turns is deprecated. " "Use user_turn_strategies=FilterIncompleteUserTurnStrategies() instead.", DeprecationWarning, stacklevel=2, ) if self.user_turn_completion_config: warnings.warn( "LLMUserAggregatorParams.user_turn_completion_config is deprecated. " "Use user_turn_strategies=FilterIncompleteUserTurnStrategies() instead.", DeprecationWarning, stacklevel=2, ) if self.user_turn_completion_config is not None: warnings.warn( "LLMUserAggregatorParams.user_turn_completion_config is deprecated. " "Pass the config directly to " "FilterIncompleteUserTurnStrategies(config=...) instead.", DeprecationWarning, stacklevel=2, )
[docs] @dataclass class LLMAssistantAggregatorParams: """Parameters for configuring LLM assistant aggregation behavior. Parameters: enable_auto_context_summarization: Enable automatic context summarization when token or message-count limits are reached (disabled by default). When enabled, older conversation messages are automatically compressed into summaries to manage context size. auto_context_summarization_config: Configuration for automatic context summarization. Controls trigger thresholds, message preservation, and summarization prompts. If None, uses default ``LLMAutoContextSummarizationConfig`` values. add_tool_change_messages: When True, on each ``LLMSetToolsFrame`` the aggregator computes the diff against the currently advertised tools and appends a developer-role message to the context describing additions/removals. Helps the LLM stay coherent across mid-conversation tool changes, mitigating several flavors of tool-call-related hallucination: calling tools that have been removed, avoiding tools that have been re-added, and hallucinating output (made-up answers or tool-call-shaped non-tool-calls) when tools are unavailable. Only standard tools are diffed; custom (LLM-specific) tools are ignored. When using ``LLMContextAggregatorPair``, prefer setting this via its ``add_tool_change_messages`` argument instead. Defaults to False. """ enable_auto_context_summarization: bool = False auto_context_summarization_config: LLMAutoContextSummarizationConfig | None = None add_tool_change_messages: bool = False # --------------------------------------------------------------------------- # Deprecated field names — kept for backward compatibility. # Use enable_auto_context_summarization and auto_context_summarization_config instead. # # .. deprecated:: 1.2.0 # Use ``enable_auto_context_summarization`` and # ``auto_context_summarization_config`` instead. Will be removed in # version 2.0.0. # --------------------------------------------------------------------------- enable_context_summarization: bool | None = None context_summarization_config: LLMContextSummarizationConfig | None = None def __post_init__(self): if self.enable_context_summarization is not None: warnings.warn( "LLMAssistantAggregatorParams.enable_context_summarization is deprecated. " "Use enable_auto_context_summarization instead.", DeprecationWarning, stacklevel=2, ) self.enable_auto_context_summarization = self.enable_context_summarization self.enable_context_summarization = None if self.context_summarization_config is not None: warnings.warn( "LLMAssistantAggregatorParams.context_summarization_config is deprecated. " "Use auto_context_summarization_config (LLMAutoContextSummarizationConfig) instead.", DeprecationWarning, stacklevel=2, ) if isinstance(self.context_summarization_config, LLMContextSummarizationConfig): self.auto_context_summarization_config = ( self.context_summarization_config.to_auto_config() ) else: # Accept LLMAutoContextSummarizationConfig passed to the deprecated field self.auto_context_summarization_config = self.context_summarization_config # type: ignore[assignment] self.context_summarization_config = None
[docs] @dataclass class UserTurnStoppedMessage: """A user turn stopped message containing a user transcript update. A message in a conversation transcript containing the user content. This is the aggregated transcript that is then used in the context. Parameters: content: The message content/text. ``None`` in realtime mode (``realtime_service_mode=True``) when fired from a user-turn-stop frame: in realtime mode the user message isn't finalized until the assistant response start acts as the effective end-of-turn signal. Subscribers that need the finalized text should listen to ``on_user_turn_message_added`` instead. timestamp: When the user turn started. user_id: Optional identifier for the user. """ content: str | None timestamp: str user_id: str | None = None
[docs] @dataclass class UserTurnMessageAddedMessage: """A message accompanying ``on_user_turn_message_added``. Fired when a user message is written to the LLM context. In cascade mode (``realtime_service_mode=False``) this coincides with ``on_user_turn_stopped``. In realtime mode (``realtime_service_mode=True``) the write is triggered by the assistant response start (or session end), so this event fires decoupled from user-turn-end frames. ``content`` is always populated. Parameters: content: The aggregated user transcript. timestamp: When the user turn started. user_id: Optional identifier for the user. """ content: str timestamp: str user_id: str | None = None
[docs] @dataclass class AssistantTurnStoppedMessage: """An assistant turn stopped message containing an assistant transcript update. A message in a conversation transcript containing the assistant content. This is the aggregated transcript that is then used in the context. Parameters: content: The message content/text. May be empty if the LLM returned zero tokens (e.g. turn was interrupted before any tokens were received or pushed) interrupted: Whether the assistant turn was interrupted. timestamp: When the assistant turn started. """ content: str interrupted: bool timestamp: str
[docs] @dataclass class AssistantThoughtMessage: """An assistant thought message containing an assistant thought update. A message in a conversation transcript containing the assistant thought content. Parameters: content: The message content/text. timestamp: When the thought started. """ content: str timestamp: str
[docs] class LLMContextAggregator(FrameProcessor): """Base LLM aggregator that uses an LLMContext for conversation storage. This aggregator maintains conversation state using an LLMContext and pushes LLMContextFrame objects as aggregation frames. It provides common functionality for context-based conversation management. """ # Developer-role messages appended to the context when tools are added/ # removed via ``LLMSetToolsFrame`` (only when ``add_tool_change_messages`` # is enabled on the aggregator's params). ``{function_names}`` is # substituted with a sorted, comma-separated, backtick-wrapped list. TOOL_ACTIVATION_MESSAGE_TEMPLATE = ( "The following function(s) have just been added and may now be called: " "{function_names}. Any previously available functions remain available." ) TOOL_DEACTIVATION_MESSAGE_TEMPLATE = ( "The following function(s) have just been removed and should not be called: " "{function_names}. Any previously available functions remain available. " "The removed function(s) may become available again later, in which case " "you will be informed." )
[docs] def __init__( self, *, context: LLMContext, role: str, add_tool_change_messages: bool = False, **kwargs, ): """Initialize the context response aggregator. Args: context: The LLM context to use for conversation storage. role: The role this aggregator represents (e.g. "user", "assistant"). add_tool_change_messages: See the field of the same name on the aggregator-specific params dataclasses. Subclasses propagate this from their ``params``. **kwargs: Additional arguments passed to parent class. """ super().__init__(**kwargs) self._context = context self._role = role self._add_tool_change_messages = add_tool_change_messages self._aggregation: list[TextPartForConcatenation] = []
def _maybe_add_tool_change_messages(self, new_tools: ToolsSchema | NotGiven) -> None: """Append a developer message describing tool add/remove deltas. No-op unless ``add_tool_change_messages`` was enabled on the aggregator, and no-op when the diff against the currently advertised tools is empty. Custom (LLM-specific) tools are ignored — only standard tools are diffed. Both aggregators call this on every ``LLMSetToolsFrame`` they handle. Whichever aggregator handles the frame first computes a real diff against the shared context and adds the announcement; by the time the other aggregator sees it (if at all), the context already reflects the new tools, so its diff is empty and no duplicate message is added. This is order-independent: it works whether the frame flows downstream (user aggregator first) or upstream (assistant aggregator first, and consumed without being forwarded). """ if not self._add_tool_change_messages: return def _names(tools: ToolsSchema | NotGiven) -> set[str]: if not is_given(tools): return set() return {s.name for s in tools.standard_tools} old_names = _names(self._context.tools) new_names = _names(new_tools) added = new_names - old_names removed = old_names - new_names if not added and not removed: return parts: list[str] = [] if added: names = ", ".join(f"`{n}`" for n in sorted(added)) parts.append(self.TOOL_ACTIVATION_MESSAGE_TEMPLATE.format(function_names=names)) if removed: names = ", ".join(f"`{n}`" for n in sorted(removed)) parts.append(self.TOOL_DEACTIVATION_MESSAGE_TEMPLATE.format(function_names=names)) self._context.add_message({"role": "developer", "content": " ".join(parts)}) @property def messages(self) -> list[LLMContextMessage]: """Get messages from the LLM context. Returns: List of message dictionaries from the context. """ return self._context.get_messages() @property def role(self) -> str: """Get the role for this aggregator. Returns: The role string for this aggregator. """ return self._role @property def context(self): """Get the LLM context. Returns: The LLMContext instance used by this aggregator. """ return self._context def _get_context_frame(self) -> LLMContextFrame: """Create a context frame with the current context. Returns: LLMContextFrame containing the current context. """ return LLMContextFrame(context=self._context)
[docs] async def push_context_frame(self, direction: FrameDirection = FrameDirection.DOWNSTREAM): """Push a context frame in the specified direction. Args: direction: The direction to push the frame (upstream or downstream). """ frame = self._get_context_frame() await self.push_frame(frame, direction)
[docs] def add_messages(self, messages): """Add messages to the context. Args: messages: Messages to add to the conversation context. """ self._context.add_messages(messages)
[docs] def set_messages(self, messages): """Set the context messages. Args: messages: Messages to replace the current context messages. """ self._context.set_messages(messages)
[docs] def transform_messages( self, transform: Callable[[list[LLMContextMessage]], list[LLMContextMessage]] ): """Transform the context messages using a provided function. Args: transform: A function that takes the current list of messages and returns a modified list of messages to set in the context. """ self._context.transform_messages(transform)
[docs] def set_tools(self, tools: ToolsSchema | NotGiven): """Set tools in the context. Args: tools: List of tool definitions to set in the context. """ self._context.set_tools(tools)
[docs] def set_tool_choice(self, tool_choice: Literal["none", "auto", "required"] | dict): """Set tool choice in the context. Args: tool_choice: Tool choice configuration for the context. """ self._context.set_tool_choice(tool_choice)
[docs] async def reset(self): """Reset the aggregation state.""" self._aggregation = []
[docs] @abstractmethod async def push_aggregation(self) -> str: """Push the current aggregation downstream. Returns: The pushed aggregation. """ pass
[docs] def aggregation_string(self) -> str: """Get the current aggregation as a string. Returns: The concatenated aggregation string. """ return concatenate_aggregated_text(self._aggregation)
[docs] class LLMUserAggregator(LLMContextAggregator): """User LLM aggregator that aggregates user input during active user turns. This aggregator uses a turn controller and operates within turn boundaries defined by the controller's configured user turn strategies. User turn start strategies indicate when a user turn begins, while user turn stop strategies signal when the user turn has ended. The aggregator collects and aggregates speech-to-text transcriptions that occur while a user turn is active and pushes the final aggregation when the user turn is finished. Event handlers available: - on_user_turn_started: Called when the user turn starts - on_user_turn_stopped: Called when the user turn ends - on_user_turn_stop_timeout: Called when no user turn stop strategy triggers - on_user_turn_idle: Called when the user has been idle for the configured timeout - on_user_turn_message_added: Called when a user message is written to context. In realtime mode (``realtime_service_mode=True``) the write is triggered by the assistant response start rather than the user-turn-end frame, so this fires decoupled from ``on_user_turn_stopped`` — subscribe here for the finalized user text. - on_user_mute_started: Called when the user becomes muted - on_user_mute_stopped: Called when the user becomes unmuted Example:: @aggregator.event_handler("on_user_turn_started") async def on_user_turn_started(aggregator, strategy: BaseUserTurnStartStrategy): ... @aggregator.event_handler("on_user_turn_stopped") async def on_user_turn_stopped(aggregator, strategy: BaseUserTurnStopStrategy, message: UserTurnStoppedMessage): ... @aggregator.event_handler("on_user_turn_stop_timeout") async def on_user_turn_stop_timeout(aggregator): ... @aggregator.event_handler("on_user_turn_idle") async def on_user_turn_idle(aggregator): ... # In realtime mode (realtime_service_mode=True) the user message # is written when the assistant response starts, not at # user-turn-end — subscribe here for the finalized text. @aggregator.event_handler("on_user_turn_message_added") async def on_user_turn_message_added(aggregator, message: UserTurnMessageAddedMessage): ... @aggregator.event_handler("on_user_mute_started") async def on_user_mute_started(aggregator): ... @aggregator.event_handler("on_user_mute_stopped") async def on_user_mute_stopped(aggregator): ... """
[docs] def __init__( self, context: LLMContext, *, params: LLMUserAggregatorParams | None = None, _realtime_service_mode: bool = False, **kwargs, ): """Initialize the user context aggregator. Args: context: The LLM context for conversation storage. params: Configuration parameters for aggregation behavior. _realtime_service_mode: Pair-internal. Realtime-mode flag propagated from ``LLMContextAggregatorPair``. Not intended for direct use — construct the aggregators via the pair. **kwargs: Additional arguments. """ params = params or LLMUserAggregatorParams() super().__init__( context=context, role="user", add_tool_change_messages=params.add_tool_change_messages, **kwargs, ) self._params = params self._register_event_handler("on_user_turn_started") self._register_event_handler("on_user_turn_stopped") self._register_event_handler("on_user_turn_stop_timeout") self._register_event_handler("on_user_turn_idle") self._register_event_handler("on_user_turn_inference_triggered") self._register_event_handler("on_user_turn_message_added") self._register_event_handler("on_user_mute_started") self._register_event_handler("on_user_mute_stopped") # Realtime-mode wiring. Default (False) preserves cascade # behavior: context writes happen on turn frames, turn-stop # strategies wait for transcripts. True flips both behaviors. self._realtime_service_mode = _realtime_service_mode user_turn_strategies = self._params.user_turn_strategies or UserTurnStrategies() # Deprecated path: translate filter_incomplete_user_turns into # the equivalent FilterIncompleteUserTurnStrategies wiring. The # DeprecationWarning is emitted in LLMUserAggregatorParams.__post_init__. if self._params.filter_incomplete_user_turns: user_turn_strategies = FilterIncompleteUserTurnStrategies( start=user_turn_strategies.start, stop=user_turn_strategies.stop, config=self._params.user_turn_completion_config, ) self._params.user_turn_strategies = user_turn_strategies # Realtime-mode mutation: drop the transcription-based start # strategy and flip the wait_for_transcript flag on stop # strategies that expose it, so turn-stop fires as soon as VAD / # the turn analyzer / external frames report end-of-speech. # # The other realtime-mode strategy hook — swapping defaults out # for external strategies when the realtime service emits its # own turn frames — runs later in # ``_handle_realtime_service_metadata`` (we need the broadcast # to know whether the service is emitting turn frames at # start time). if self._realtime_service_mode: self._apply_realtime_mode_strategy_mutations( user_turn_strategies, are_user_provided_custom_strategies=self._params.user_turn_strategies is not None, ) self._user_is_muted = False self._user_turn_start_timestamp = "" # Tracks whether the realtime-mode recommendation log has already # fired for this session — see _handle_realtime_service_metadata. self._realtime_recommendation_logged = False # Realtime-mode deferred-flush state. Realtime mode treats the # assistant response start as the "user turn ended" signal for # context-writing purposes, then waits up to this many seconds # for any in-flight ``TranscriptionFrame`` to land before # flushing — mirroring the cascade-mode pattern of waiting up # to ``ttfs_p99_latency`` after ``UserStoppedSpeakingFrame``. # Captured from the most recent ``STTMetadataFrame`` (which a # realtime LLM service may broadcast for its internal STT # pass), falling back to ``DEFAULT_TTFS_P99``. self._ttfs_p99_latency: float | None = None self._realtime_handoff_flush_task: asyncio.Task | None = None # Full transcript across the user turn. Each # `_on_user_turn_inference_triggered` push captures only the # new segment since the previous push (push_aggregation resets # `_aggregation` after writing to context); we accumulate those # segments here so the eventual `on_user_turn_stopped` event # surfaces the full turn transcript even when several # inferences fire before finalization. self._full_user_turn_aggregation: str | None = None self._user_turn_controller = UserTurnController( user_turn_strategies=user_turn_strategies, user_turn_stop_timeout=self._params.user_turn_stop_timeout, ) self._user_turn_controller.add_event_handler("on_push_frame", self._on_push_frame) self._user_turn_controller.add_event_handler("on_broadcast_frame", self._on_broadcast_frame) self._user_turn_controller.add_event_handler( "on_user_turn_started", self._on_user_turn_started ) self._user_turn_controller.add_event_handler( "on_user_turn_inference_triggered", self._on_user_turn_inference_triggered ) self._user_turn_controller.add_event_handler( "on_user_turn_stopped", self._on_user_turn_stopped ) self._user_turn_controller.add_event_handler( "on_user_turn_stop_timeout", self._on_user_turn_stop_timeout ) self._user_turn_controller.add_event_handler( "on_reset_aggregation", self._on_reset_aggregation ) self._user_idle_controller = UserIdleController( user_idle_timeout=self._params.user_idle_timeout ) self._user_idle_controller.add_event_handler("on_user_turn_idle", self._on_user_turn_idle) # VAD controller self._vad_controller: VADController | None = None if self._params.vad_analyzer: self._vad_controller = VADController( self._params.vad_analyzer, audio_idle_timeout=self._params.audio_idle_timeout, ) self._vad_controller.add_event_handler("on_speech_started", self._on_vad_speech_started) self._vad_controller.add_event_handler("on_speech_stopped", self._on_vad_speech_stopped) self._vad_controller.add_event_handler( "on_speech_activity", self._on_vad_speech_activity ) self._vad_controller.add_event_handler("on_push_frame", self._on_push_frame) self._vad_controller.add_event_handler("on_broadcast_frame", self._on_broadcast_frame)
[docs] async def cleanup(self): """Clean up processor resources.""" await super().cleanup() await self._cleanup()
[docs] async def process_frame(self, frame: Frame, direction: FrameDirection): """Process frames for user speech aggregation and context management. Args: frame: The frame to process. direction: The direction of frame flow in the pipeline. """ await super().process_frame(frame, direction) if await self._maybe_mute_frame(frame): return if self._vad_controller: await self._vad_controller.process_frame(frame) if isinstance(frame, StartFrame): # Push StartFrame before start(), because we want StartFrame to be # processed by every processor before any other frame is processed. await self.push_frame(frame, direction) await self._start(frame) elif isinstance(frame, EndFrame): # Push EndFrame before stop(), because stop() waits on the task to # finish and the task finishes when EndFrame is processed. await self.push_frame(frame, direction) await self._stop(frame) elif isinstance(frame, CancelFrame): await self._cancel(frame) await self.push_frame(frame, direction) elif isinstance(frame, TranscriptionFrame): await self._handle_transcription(frame) elif isinstance(frame, (InterimTranscriptionFrame, TranslationFrame)): # Interim transcriptions and translations are consumed here # and not pushed downstream, same as final TranscriptionFrame. pass elif isinstance(frame, LLMRunFrame): await self._handle_llm_run(frame) elif isinstance(frame, LLMMessagesAppendFrame): await self._handle_llm_messages_append(frame) elif isinstance(frame, LLMMessagesUpdateFrame): await self._handle_llm_messages_update(frame) elif isinstance(frame, LLMMessagesTransformFrame): await self._handle_llm_messages_transform(frame) elif isinstance(frame, LLMSetToolsFrame): self._maybe_add_tool_change_messages(frame.tools) self.set_tools(frame.tools) # Push the LLMSetToolsFrame as well, since speech-to-speech LLM # services (like OpenAI Realtime) may need to know about tool # changes; unlike text-based LLM services they won't just "pick up # the change" on the next LLM run, as the LLM is continuously # running. await self.push_frame(frame, direction) elif isinstance(frame, LLMSetToolChoiceFrame): self.set_tool_choice(frame.tool_choice) elif isinstance(frame, RealtimeServiceMetadataFrame): await self._handle_realtime_service_metadata(frame) await self.push_frame(frame, direction) elif isinstance(frame, STTMetadataFrame): # Capture the STT TTFS P99 so the realtime-mode deferred # handoff flush can size itself to the real transcript-arrival # latency. Frame still flows downstream for other consumers. self._ttfs_p99_latency = frame.ttfs_p99_latency await self.push_frame(frame, direction) else: await self.push_frame(frame, direction) await self._user_turn_controller.process_frame(frame) await self._user_idle_controller.process_frame(frame)
[docs] async def push_aggregation(self) -> str: """Push the current aggregation.""" if len(self._aggregation) == 0: return "" aggregation = self.aggregation_string() await self.reset() self._context.add_message({"role": self.role, "content": aggregation}) await self.push_context_frame() message = UserTurnMessageAddedMessage( content=aggregation, timestamp=self._user_turn_start_timestamp ) await self._call_event_handler("on_user_turn_message_added", message) return aggregation
async def _start(self, frame: StartFrame): if self._vad_controller: await self._vad_controller.setup(self.task_manager) await self._user_turn_controller.setup(self.task_manager) await self._user_idle_controller.setup(self.task_manager) for s in self._params.user_mute_strategies: await s.setup(self.task_manager) async def _stop(self, frame: EndFrame): if self._realtime_service_mode: # Realtime mode: cancel any pending deferred handoff flush # and commit trailing user content directly. The # on_user_turn_stopped event already fired (if turn frames # were emitted), so don't re-fire it from session end. await self._cancel_realtime_handoff_flush_task() await self.push_aggregation() else: await self._maybe_emit_user_turn_stopped(on_session_end=True) await self._cleanup() async def _cancel(self, frame: CancelFrame): # See _stop — same realtime-mode vs cascade dispatch. if self._realtime_service_mode: await self._cancel_realtime_handoff_flush_task() await self.push_aggregation() else: await self._maybe_emit_user_turn_stopped(on_session_end=True) await self._cleanup() def _apply_realtime_mode_strategy_mutations( self, user_turn_strategies: UserTurnStrategies, are_user_provided_custom_strategies: bool ) -> None: """Mutate turn strategies for realtime mode. Drops ``TranscriptionUserTurnStartStrategy`` from the start strategies (transcripts shouldn't start a turn when the realtime service drives the conversation) and flips ``wait_for_transcript=False`` on stop strategies that expose the flag, so end-of-turn fires as soon as VAD / the turn analyzer reports end-of-speech. """ start_strategies = user_turn_strategies.start or [] dropped: list[str] = [] kept_start: list[BaseUserTurnStartStrategy] = [] for s in start_strategies: if isinstance(s, TranscriptionUserTurnStartStrategy): dropped.append(s.__class__.__name__) else: kept_start.append(s) user_turn_strategies.start = kept_start flipped: list[str] = [] for s in user_turn_strategies.stop or []: if hasattr(s, "wait_for_transcript"): try: s.wait_for_transcript = False flipped.append(s.__class__.__name__) except AttributeError: # Strategy exposes the property but no setter — skip. pass if not dropped and not flipped: return msg = ( f"{self}: realtime_service_mode=True — mutated turn strategies: " f"dropped {dropped or 'no'} start strategy(ies); set " f"wait_for_transcript=False on {flipped or 'no'} stop strategy(ies)." ) if are_user_provided_custom_strategies: logger.warning(msg) else: logger.debug(msg) async def _handle_realtime_service_metadata(self, frame: RealtimeServiceMetadataFrame): """Handle a ``RealtimeServiceMetadataFrame`` broadcast by a realtime LLM service. When ``realtime_service_mode`` is not enabled, log a one-time WARNING recommendation pointing the user at the option and warning about the timing change on ``on_user_turn_stopped``. When it is enabled, log a confirming debug message and — if the service advertises ``emits_user_turn_frames=True`` and the user didn't pass custom ``user_turn_strategies`` — swap the default turn strategies for ``ExternalUserTurnStart/StopStrategy`` so ``on_user_turn_*`` events fire from the server-emitted ``UserStarted/StoppedSpeakingFrame``. Fires at most once per session. """ if self._realtime_recommendation_logged: return self._realtime_recommendation_logged = True if not self._realtime_service_mode: logger.warning( f"{self}: detected realtime service `{frame.service_name}` in the " "pipeline. For correct context-write semantics with realtime " "services, consider passing " "realtime_service_mode=True to LLMContextAggregatorPair. " "Note: this changes when user messages are written to context " "— they're written when the assistant response starts rather " "than when the user-turn-end frame fires. Subscribe to " "`on_user_turn_message_added` instead of `on_user_turn_stopped` to " "handle new user messages." ) return logger.debug( f"{self}: detected realtime service `{frame.service_name}`; " "realtime_service_mode is enabled." ) # Realtime-mode strategy swap. Only kicks in when (a) the # service advertises that it emits its own # UserStarted/StoppedSpeakingFrame and (b) the user didn't # pass custom strategies — explicit user choice wins, and # services that don't emit their own turn frames need to keep # the default strategies so locally-driven turns (e.g. local # VAD) can fire on_user_turn_* events. if frame.emits_user_turn_frames and self._params.user_turn_strategies is None: new_strategies = UserTurnStrategies( start=[ExternalUserTurnStartStrategy()], stop=[ExternalUserTurnStopStrategy()], ) self._apply_realtime_mode_strategy_mutations( new_strategies, are_user_provided_custom_strategies=False ) await self._user_turn_controller.update_strategies(new_strategies) logger.debug( f"{self}: replaced default turn strategies with " f"ExternalUserTurnStart/StopStrategy for realtime service " f"`{frame.service_name}` (emits_user_turn_frames=True)." ) async def _realtime_handoff_flush(self) -> None: """Commit the user message in realtime mode, allowing for late transcripts. Called by the paired assistant half when the assistant response starts — realtime mode treats that as the user turn's end signal for context-writing purposes (in place of ``UserStoppedSpeakingFrame``). Realtime services may deliver the finalized user ``TranscriptionFrame`` in pieces, and some deliver it only after the assistant starts responding, so we always defer the flush by ``ttfs_p99_latency`` to let any in-flight transcript land. The ``_realtime_handoff_flush_immediate`` failsafe on ``LLMFullResponseEndFrame`` catches any stragglers. """ # Cancel any prior deferred flush task — a new turn supersedes it. await self._cancel_realtime_handoff_flush_task() self._realtime_handoff_flush_task = self.create_task( self._realtime_deferred_handoff_flush(), name=f"{self}::realtime_handoff_flush", ) # Yield so the task's wrapper coroutine starts running before # any immediate cancellation by the failsafe path — otherwise # asyncio GCs the inner coroutine without ever entering it and # emits a "coroutine was never awaited" warning. await asyncio.sleep(0) async def _realtime_deferred_handoff_flush(self) -> None: """Wait one ``ttfs_p99_latency`` window, then flush whatever has arrived.""" wait = self._ttfs_p99_latency if self._ttfs_p99_latency is not None else DEFAULT_TTFS_P99 await asyncio.sleep(wait) if self._aggregation: await self.push_aggregation() self._user_turn_start_timestamp = "" async def _realtime_handoff_flush_immediate(self) -> None: """Failsafe flush invoked when the assistant response ends. By the time the assistant response has fully completed, even slow transcripts should have landed. Cancel any pending deferred flush and write the user message now so it precedes the assistant message in context. """ await self._cancel_realtime_handoff_flush_task() if self._aggregation: await self.push_aggregation() self._user_turn_start_timestamp = "" async def _cancel_realtime_handoff_flush_task(self) -> None: if self._realtime_handoff_flush_task is None: return if not self._realtime_handoff_flush_task.done(): await self.cancel_task(self._realtime_handoff_flush_task) self._realtime_handoff_flush_task = None async def _cleanup(self): if self._vad_controller: await self._vad_controller.cleanup() await self._user_turn_controller.cleanup() await self._user_idle_controller.cleanup() for s in self._params.user_mute_strategies: await s.cleanup() async def _maybe_mute_frame(self, frame: Frame): # Lifecycle frames should never be muted and should not trigger mute # state changes. Evaluating mute strategies on StartFrame would # broadcast UserMuteStartedFrame before StartFrame reaches downstream # processors. if isinstance(frame, (StartFrame, EndFrame, CancelFrame)): return False should_mute_frame = self._user_is_muted and isinstance( frame, ( InterruptionFrame, VADUserStartedSpeakingFrame, VADUserStoppedSpeakingFrame, UserStartedSpeakingFrame, UserStoppedSpeakingFrame, InputAudioRawFrame, InterimTranscriptionFrame, TranscriptionFrame, ), ) if should_mute_frame: logger.trace(f"{frame.name} suppressed - user currently muted") should_mute_next_time = False for s in self._params.user_mute_strategies: should_mute_next_time |= await s.process_frame(frame) if should_mute_next_time != self._user_is_muted: logger.debug(f"{self}: user is now {'muted' if should_mute_next_time else 'unmuted'}") self._user_is_muted = should_mute_next_time # Emit mute state change events if self._user_is_muted: await self._call_event_handler("on_user_mute_started") await self.broadcast_frame(UserMuteStartedFrame) else: await self._call_event_handler("on_user_mute_stopped") await self.broadcast_frame(UserMuteStoppedFrame) return should_mute_frame async def _handle_llm_run(self, frame: LLMRunFrame): await self.push_context_frame() async def _handle_llm_messages_append(self, frame: LLMMessagesAppendFrame): self.add_messages(frame.messages) if frame.run_llm: await self.push_context_frame() async def _handle_llm_messages_update(self, frame: LLMMessagesUpdateFrame): self.set_messages(frame.messages) if frame.run_llm: await self.push_context_frame() async def _handle_llm_messages_transform(self, frame: LLMMessagesTransformFrame): self.transform_messages(frame.transform) if frame.run_llm: await self.push_context_frame() async def _handle_transcription(self, frame: TranscriptionFrame): text = frame.text # Make sure we really have some text. if not text.strip(): return # In realtime mode with a service that doesn't emit user-turn # frames (Gemini Live, AWS Nova Sonic, Ultravox), # ``_on_user_turn_started`` never fires, so seed # ``_user_turn_start_timestamp`` here from the first transcript # so the eventual ``on_user_turn_message_added`` event carries one. # Harmless in cascade mode — turn frames set it before this # runs (the timestamp check is a no-op). if not self._user_turn_start_timestamp: self._user_turn_start_timestamp = time_now_iso8601() # Transcriptions never include inter-part spaces (so far). self._aggregation.append( TextPartForConcatenation( text, includes_inter_part_spaces=frame.includes_inter_frame_spaces ) ) async def _queued_broadcast_frame(self, frame_cls: type[Frame], **kwargs): """Broadcasts a frame upstream and queues it for internal processing. Queues the frame so it flows through `process_frame` and is handled internally (e.g. by the `UserTurnController`). The upstream frame is pushed directly. Args: frame_cls: The class of the frame to be broadcasted. **kwargs: Keyword arguments to be passed to the frame's constructor. """ await self.queue_frame(frame_cls(**kwargs)) await self.push_frame(frame_cls(**kwargs), FrameDirection.UPSTREAM) async def _on_push_frame( self, controller, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM ): await self.queue_frame(frame, direction) async def _on_broadcast_frame(self, controller, frame_cls: type[Frame], **kwargs): await self._queued_broadcast_frame(frame_cls, **kwargs) async def _on_vad_speech_started(self, controller): await self._queued_broadcast_frame( VADUserStartedSpeakingFrame, start_secs=controller._vad_analyzer.params.start_secs, ) async def _on_vad_speech_stopped(self, controller): await self._queued_broadcast_frame( VADUserStoppedSpeakingFrame, stop_secs=controller._vad_analyzer.params.stop_secs, ) async def _on_vad_speech_activity(self, controller): await self._queued_broadcast_frame(UserSpeakingFrame) async def _on_user_turn_started( self, controller: UserTurnController, strategy: BaseUserTurnStartStrategy, params: UserTurnStartedParams, ): logger.debug(f"{self}: User started speaking (strategy: {strategy})") self._user_turn_start_timestamp = time_now_iso8601() self._full_user_turn_aggregation = None if params.enable_user_speaking_frames: await self.broadcast_frame(UserStartedSpeakingFrame) await self._user_idle_controller.process_frame(UserStartedSpeakingFrame()) if params.enable_interruptions: await self.broadcast_interruption() await self._call_event_handler("on_user_turn_started", strategy) async def _on_user_turn_inference_triggered( self, controller: UserTurnController, strategy: BaseUserTurnStopStrategy, ): if self._realtime_service_mode: # Realtime mode: the assistant response start, not turn # frames, is the signal that the user turn has ended for # context-writing purposes. Fire the event without pushing # aggregation here. logger.debug( f"{self}: User turn inference triggered (strategy: {strategy}) " "[realtime mode: event-only, no context push]" ) await self._call_event_handler("on_user_turn_inference_triggered", strategy) return logger.debug(f"{self}: User turn inference triggered (strategy: {strategy})") # Push aggregation now: this writes the user message segment to # the context and emits LLMContextFrame, which kicks LLM # inference. Concatenate the segment into # `_full_user_turn_aggregation` so multiple inferences in the # same turn don't lose earlier segments from the eventual # `on_user_turn_stopped` event. segment = await self.push_aggregation() if segment: if self._full_user_turn_aggregation: self._full_user_turn_aggregation = ( f"{self._full_user_turn_aggregation} {segment}".strip() ) else: self._full_user_turn_aggregation = segment await self._call_event_handler("on_user_turn_inference_triggered", strategy) async def _on_user_turn_stopped( self, controller: UserTurnController, strategy: BaseUserTurnStopStrategy, params: UserTurnStoppedParams, ): logger.debug(f"{self}: User stopped speaking (strategy: {strategy})") if params.enable_user_speaking_frames: await self.broadcast_frame(UserStoppedSpeakingFrame) await self._user_idle_controller.process_frame(UserStoppedSpeakingFrame()) if self._realtime_service_mode: # Realtime mode: the user message isn't finalized at # turn-stop time — the assistant response start is the # effective end-of-turn signal, and the user message is # written then. Content is None here; subscribers wanting # the finalized text use on_user_turn_message_added instead. message = UserTurnStoppedMessage( content=None, timestamp=self._user_turn_start_timestamp ) await self._call_event_handler("on_user_turn_stopped", strategy, message) return await self._maybe_emit_user_turn_stopped(strategy) async def _on_reset_aggregation( self, controller: UserTurnController, strategy: BaseUserTurnStartStrategy ): logger.debug(f"{self}: Resetting aggregation (strategy: {strategy})") await self.reset() async def _on_user_turn_stop_timeout(self, controller): await self._call_event_handler("on_user_turn_stop_timeout") async def _on_user_turn_idle(self, controller): await self._call_event_handler("on_user_turn_idle") async def _maybe_emit_user_turn_stopped( self, strategy: BaseUserTurnStopStrategy | None = None, on_session_end: bool = False, ): """Maybe emit user turn stopped event. Earlier inference triggers in the same turn have already pushed their segments to the context and accumulated them into ``self._full_user_turn_aggregation``. Any aggregation that arrived after the last inference trigger is flushed here so end-of-turn content is never lost from the public event. Args: strategy: The strategy that triggered the turn stop. on_session_end: If True, only emit if there's unemitted content (avoids duplicate events when session ends). """ segment = await self.push_aggregation() full_aggregation = self._full_user_turn_aggregation self._full_user_turn_aggregation = None if segment and full_aggregation: content = f"{full_aggregation} {segment}".strip() else: content = full_aggregation or segment if not on_session_end or content: message = UserTurnStoppedMessage( content=content, timestamp=self._user_turn_start_timestamp ) await self._call_event_handler("on_user_turn_stopped", strategy, message) self._user_turn_start_timestamp = ""
[docs] class LLMAssistantAggregator(LLMContextAggregator): """Assistant LLM aggregator that processes bot responses and function calls. This aggregator handles the complex logic of processing assistant responses including: - Text frame aggregation between response start/end markers - Function call lifecycle management - Context updates with timestamps - Tool execution and result handling - Interruption handling during responses The aggregator manages function calls in progress and coordinates between text generation and tool execution phases of LLM responses. Event handlers available: - on_assistant_turn_started: Called when the assistant turn starts - on_assistant_turn_stopped: Called when the assistant turn ends - on_assistant_thought: Called when an assistant thought is available - on_summary_applied: Called when a context summarization is applied Example:: @aggregator.event_handler("on_assistant_turn_started") async def on_assistant_turn_started(aggregator): ... @aggregator.event_handler("on_assistant_turn_stopped") async def on_assistant_turn_stopped(aggregator, message: AssistantTurnStoppedMessage): ... @aggregator.event_handler("on_assistant_thought") async def on_assistant_thought(aggregator, message: AssistantThoughtMessage): ... @aggregator.event_handler("on_summary_applied") async def on_summary_applied(aggregator, summarizer, event: SummaryAppliedEvent): ... """
[docs] def __init__( self, context: LLMContext, *, params: LLMAssistantAggregatorParams | None = None, _realtime_service_mode: bool = False, _paired_user_aggregator: "LLMUserAggregator | None" = None, **kwargs, ): """Initialize the assistant context aggregator. Args: context: The OpenAI LLM context for conversation storage. params: Configuration parameters for aggregation behavior. _realtime_service_mode: Pair-internal. Realtime-mode flag propagated from ``LLMContextAggregatorPair``. Not intended for direct use — construct the aggregators via the pair. _paired_user_aggregator: Pair-internal. Back-reference to the paired ``LLMUserAggregator``. The assistant flushes it on ``LLMFullResponseStartFrame`` so the user message lands in context before the assistant turn starts. **kwargs: Additional arguments. """ params = params or LLMAssistantAggregatorParams() super().__init__( context=context, role="assistant", add_tool_change_messages=params.add_tool_change_messages, **kwargs, ) self._params = params # Realtime-mode wiring. Default (False) preserves cascade behavior. self._realtime_service_mode = _realtime_service_mode self._paired_user_aggregator = _paired_user_aggregator self._function_calls_in_progress: dict[str, FunctionCallInProgressFrame | None] = {} self._function_calls_image_results: dict[str, UserImageRawFrame] = {} self._context_updated_tasks: set[asyncio.Task] = set() self._user_speaking: bool = False self._bot_speaking: bool = False # When a function call result arrives while the bot is speaking, we defer the LLM # re-invocation until the bot stops speaking. This flag is set to True in that case # so that `BotStoppedSpeakingFrame` knows to push a context frame. Multiple results # arriving in the same speaking window are bundled into a single deferred push. self._push_context_on_bot_stopped_speaking: bool = False self._assistant_turn_start_timestamp = "" self._thought_append_to_context = False self._thought_llm: str = "" self._thought_aggregation: list[TextPartForConcatenation] = [] self._thought_start_time: str = "" # Context summarization — always create the summarizer so that manually # pushed LLMSummarizeContextFrame frames are always handled. # Auto-triggering based on thresholds is only enabled when # enable_auto_context_summarization is True. self._summarizer: LLMContextSummarizer | None = LLMContextSummarizer( context=self._context, config=self._params.auto_context_summarization_config, auto_trigger=self._params.enable_auto_context_summarization, ) self._summarizer.add_event_handler( "on_request_summarization", self._on_request_summarization ) self._summarizer.add_event_handler("on_summary_applied", self._on_summary_applied) self._register_event_handler("on_assistant_turn_started") self._register_event_handler("on_assistant_turn_stopped") self._register_event_handler("on_assistant_thought") self._register_event_handler("on_summary_applied")
@property def has_function_calls_in_progress(self) -> bool: """Check if there are any function calls currently in progress. Returns: True if function calls are in progress, False otherwise. """ return bool(self._function_calls_in_progress)
[docs] async def reset(self): """Reset the aggregation state.""" await super().reset() await self._reset_thought_aggregation() # Just to be safe self._push_context_on_bot_stopped_speaking = False
async def _reset_thought_aggregation(self): """Reset the thought aggregation state.""" self._thought_append_to_context = False self._thought_llm = "" self._thought_aggregation = []
[docs] async def process_frame(self, frame: Frame, direction: FrameDirection): """Process frames for assistant response aggregation and function call management. Args: frame: The frame to process. direction: The direction of frame flow in the pipeline. """ await super().process_frame(frame, direction) if isinstance(frame, StartFrame): # Push StartFrame before start(), because we want StartFrame to be # processed by every processor before any other frame is processed. await self.push_frame(frame, direction) await self._start(frame) elif isinstance(frame, InterruptionFrame): await self._handle_interruptions(frame) await self.push_frame(frame, direction) elif isinstance(frame, (EndFrame, CancelFrame)): await self._handle_end_or_cancel(frame) await self.push_frame(frame, direction) elif isinstance(frame, LLMAssistantPushAggregationFrame): await self._handle_push_aggregation() elif isinstance(frame, LLMFullResponseStartFrame): await self._handle_llm_start(frame) elif isinstance(frame, LLMFullResponseEndFrame): await self._handle_llm_end(frame) elif isinstance(frame, TextFrame): await self._handle_text(frame) elif isinstance(frame, LLMMarkerFrame): await self._handle_marker_frame(frame) elif isinstance(frame, LLMThoughtStartFrame): await self._handle_thought_start(frame) elif isinstance(frame, LLMThoughtTextFrame): await self._handle_thought_text(frame) elif isinstance(frame, LLMThoughtEndFrame): await self._handle_thought_end(frame) elif isinstance(frame, LLMRunFrame): await self._handle_llm_run(frame) elif isinstance(frame, LLMMessagesAppendFrame): await self._handle_llm_messages_append(frame) elif isinstance(frame, LLMMessagesUpdateFrame): await self._handle_llm_messages_update(frame) elif isinstance(frame, LLMMessagesTransformFrame): await self._handle_llm_messages_transform(frame) elif isinstance(frame, LLMSetToolsFrame): self._maybe_add_tool_change_messages(frame.tools) self.set_tools(frame.tools) elif isinstance(frame, LLMSetToolChoiceFrame): self.set_tool_choice(frame.tool_choice) elif isinstance(frame, FunctionCallsStartedFrame): await self._handle_function_calls_started(frame) elif isinstance(frame, FunctionCallInProgressFrame): await self._handle_function_call_in_progress(frame) elif isinstance(frame, FunctionCallResultFrame): await self._handle_function_call_result(frame) elif isinstance(frame, FunctionCallCancelFrame): await self._handle_function_call_cancel(frame) elif isinstance(frame, UserImageRawFrame): await self._handle_user_image_frame(frame) elif isinstance(frame, AssistantImageRawFrame): await self._handle_assistant_image_frame(frame) elif isinstance(frame, UserStartedSpeakingFrame): self._user_speaking = True await self.push_frame(frame, direction) elif isinstance(frame, UserStoppedSpeakingFrame): self._user_speaking = False await self.push_frame(frame, direction) elif isinstance(frame, BotStartedSpeakingFrame): self._bot_speaking = True await self.push_frame(frame, direction) elif isinstance(frame, BotStoppedSpeakingFrame): self._bot_speaking = False await self.push_frame(frame, direction) if self._push_context_on_bot_stopped_speaking and not self._user_speaking: logger.debug(f"{self}: Bot stopped speaking — pushing deferred context frame!") await self.push_context_frame(FrameDirection.UPSTREAM) elif isinstance(frame, RealtimeServiceMetadataFrame): # The user half logs the realtime-mode recommendation; the # assistant half just passes the frame through. await self.push_frame(frame, direction) else: await self.push_frame(frame, direction) # Pass frames to summarizer for monitoring if self._summarizer: await self._summarizer.process_frame(frame)
async def _start(self, frame: StartFrame): self._validate_realtime_pairing() if self._summarizer: await self._summarizer.setup(self.task_manager) def _validate_realtime_pairing(self): """Validate the realtime-mode wiring set by ``LLMContextAggregatorPair``. Realtime mode needs the assistant half to hold a back-reference to the user half so it can flush the user message on ``LLMFullResponseStartFrame``. The pair sets this up; direct construction of the assistant with the private realtime kwargs bypasses that and is not supported. """ if self._realtime_service_mode and self._paired_user_aggregator is None: raise RuntimeError( f"{self}: realtime_service_mode is enabled but this assistant " "aggregator has no paired user aggregator. Construct the pair " "via LLMContextAggregatorPair(context, realtime_service_mode=True)." ) if ( self._paired_user_aggregator is not None and self._realtime_service_mode != self._paired_user_aggregator._realtime_service_mode ): raise RuntimeError( f"{self}: realtime_service_mode mismatch between user and " "assistant halves. Use LLMContextAggregatorPair to construct " "the pair so both halves share the same configuration." )
[docs] async def push_aggregation(self) -> str: """Push the current assistant aggregation with timestamp.""" if not self._aggregation: return "" aggregation = self.aggregation_string() await self.reset() self._context.add_message({"role": "assistant", "content": aggregation}) # Push context frame await self.push_context_frame() # Push timestamp frame with current time timestamp_frame = LLMContextAssistantTimestampFrame(timestamp=time_now_iso8601()) await self.push_frame(timestamp_frame) return aggregation
[docs] async def push_context_frame(self, direction: FrameDirection = FrameDirection.DOWNSTREAM): """Push a context frame in the specified direction. Args: direction: The direction to push the frame (upstream or downstream). """ await super().push_context_frame(direction) self._push_context_on_bot_stopped_speaking = False
async def _handle_llm_run(self, frame: LLMRunFrame): await self.push_context_frame(FrameDirection.UPSTREAM) async def _handle_llm_messages_append(self, frame: LLMMessagesAppendFrame): self.add_messages(frame.messages) if frame.run_llm: await self.push_context_frame(FrameDirection.UPSTREAM) async def _handle_llm_messages_update(self, frame: LLMMessagesUpdateFrame): self.set_messages(frame.messages) if frame.run_llm: await self.push_context_frame(FrameDirection.UPSTREAM) async def _handle_llm_messages_transform(self, frame: LLMMessagesTransformFrame): self.transform_messages(frame.transform) if frame.run_llm: await self.push_context_frame(FrameDirection.UPSTREAM) async def _handle_interruptions(self, frame: InterruptionFrame): await self._trigger_assistant_turn_stopped(interrupted=True) await self.reset() async def _handle_end_or_cancel(self, frame: Frame): await self._trigger_assistant_turn_stopped(interrupted=isinstance(frame, CancelFrame)) if self._summarizer: await self._summarizer.cleanup() async def _handle_function_calls_started(self, frame: FunctionCallsStartedFrame): function_names = [f"{f.function_name}:{f.tool_call_id}" for f in frame.function_calls] logger.debug(f"{self} FunctionCallsStartedFrame: {function_names}") for function_call in frame.function_calls: self._function_calls_in_progress[function_call.tool_call_id] = None async def _handle_function_call_in_progress(self, frame: FunctionCallInProgressFrame): logger.debug( f"{self} FunctionCallInProgressFrame: [{frame.function_name}:{frame.tool_call_id}]" ) # Update context with the in-progress function call self._context.add_message( { "role": "assistant", "tool_calls": [ { "id": frame.tool_call_id, "function": { "name": frame.function_name, "arguments": json.dumps(frame.arguments, ensure_ascii=False), }, "type": "function", } ], } ) is_async = not frame.cancel_on_interruption if is_async: self._context.add_message(async_tool_messages.build_started_message(frame.tool_call_id)) else: self._context.add_message( { "role": "tool", "content": "IN_PROGRESS", "tool_call_id": frame.tool_call_id, } ) self._function_calls_in_progress[frame.tool_call_id] = frame async def _handle_function_call_result(self, frame: FunctionCallResultFrame): logger.debug( f"{self} FunctionCallResultFrame: [{frame.function_name}:{frame.tool_call_id}]" ) if frame.tool_call_id not in self._function_calls_in_progress: logger.warning( f"FunctionCallResultFrame tool_call_id [{frame.tool_call_id}] is not running" ) return in_progress_frame = self._function_calls_in_progress[frame.tool_call_id] group_id = in_progress_frame.group_id if in_progress_frame else None properties = frame.properties is_final = frame.properties.is_final if frame.properties else True if is_final: await self._handle_function_call_finished(frame, in_progress_frame) else: await self._handle_function_call_intermediate_result(frame, in_progress_frame) run_llm = False # Append any images that were generated by function calls. if frame.tool_call_id in self._function_calls_image_results: image_frame = self._function_calls_image_results[frame.tool_call_id] del self._function_calls_image_results[frame.tool_call_id] # If an image frame has been added to the context, let's run inference. run_llm = await self._maybe_append_image_to_context(image_frame) # Run inference if the function call result requires it. if frame.result: if properties and properties.run_llm is not None: # If the tool call result has a run_llm property, use it. run_llm = properties.run_llm elif frame.run_llm is not None: # If the frame is indicating we should run the LLM, do it. run_llm = frame.run_llm else: # Run the LLM when this is the last function call in the group # to complete. If group_id is set, only consider sibling calls; # otherwise always execute as soon as we receive the result. if group_id: run_llm = not any( f is not None and f.group_id == group_id # We are now able to receive "updates", so the current # frame can still be in the in progress list, and we need to # ignore it. and f.tool_call_id != frame.tool_call_id for f in self._function_calls_in_progress.values() ) else: run_llm = True if run_llm and not self._user_speaking: await self._maybe_push_context_after_function_result() # Call the `on_context_updated` callback once the function call result # is added to the context. Also, run this in a separate task to make # sure we don't block the pipeline. if properties and properties.on_context_updated: worker_name = f"{frame.function_name}:{frame.tool_call_id}:on_context_updated" task = self.create_task(properties.on_context_updated(), worker_name) self._context_updated_tasks.add(task) task.add_done_callback(self._context_updated_task_finished) async def _maybe_push_context_after_function_result(self) -> None: """Decide whether to push a context frame after a function-call result. Push an ``LLMContextFrame`` upstream (with care to avoid duplicate pushes while results are queued or the bot is still speaking). Cascade LLMs use the context frame to re-run inference with the new tool result in scope. Realtime LLMs read the new tool result out of the context the same way — they don't get function results from ``FunctionCallResultFrame`` directly — so the same push is load-bearing for both modes. """ if self.has_queued_frame(FunctionCallResultFrame): # Another FunctionCallResultFrame is already queued. Defer the context push # to bundle all results into a single LLM call instead of triggering one # inference pass per result. The context will be pushed once the last # function call in the queue is processed. logger.debug( f"{self}: More FunctionCallResultFrames queued — deferring context frame push." ) elif self._bot_speaking: # Defer the context frame push until the bot finishes speaking. If multiple # function call results arrive while the bot is speaking, they all accumulate # in the context and a single push is performed once speaking stops, preventing # the LLM from running multiple times and producing duplicated responses. # This should be an edge case, since it would require a FunctionCallResultFrame # being queued between an LLM response start and end frame. logger.debug(f"{self}: Bot is speaking — deferring context frame push.") self._push_context_on_bot_stopped_speaking = True else: logger.debug(f"{self}: Pushing context frame!") await self.push_context_frame(FrameDirection.UPSTREAM) async def _handle_function_call_intermediate_result( self, frame: FunctionCallResultFrame, in_progress_frame: FunctionCallInProgressFrame ): """Handle an intermediate result for an async function call. Injects an intermediate developer message into the context without removing the call from the in-progress map. """ if not frame.result: logger.warning(f"{self} result_callback called with is_final=False but no result!") return result = json.dumps(frame.result, ensure_ascii=False) self._context.add_message( async_tool_messages.build_intermediate_result_message(frame.tool_call_id, result) ) async def _handle_function_call_finished( self, frame: FunctionCallResultFrame, in_progress_frame: FunctionCallInProgressFrame ): """Handle the final result of a function call. Removes the call from the in-progress map, updates the context, and triggers LLM inference when appropriate. """ is_async = not in_progress_frame.cancel_on_interruption del self._function_calls_in_progress[frame.tool_call_id] result = json.dumps(frame.result, ensure_ascii=False) if frame.result else "COMPLETED" if is_async: # For async function calls inject a developer message so the LLM is # notified of the completed result instead of updating the IN_PROGRESS # tool message. self._context.add_message( async_tool_messages.build_final_result_message(frame.tool_call_id, result) ) else: self._update_function_call_result(frame.function_name, frame.tool_call_id, result) async def _handle_function_call_cancel(self, frame: FunctionCallCancelFrame): logger.debug( f"{self} FunctionCallCancelFrame: [{frame.function_name}:{frame.tool_call_id}]" ) function_call = self._function_calls_in_progress.get(frame.tool_call_id) if function_call and function_call.cancel_on_interruption: # Update context with the function call cancellation self._update_function_call_result(frame.function_name, frame.tool_call_id, "CANCELLED") del self._function_calls_in_progress[frame.tool_call_id] async def _handle_user_image_frame(self, frame: UserImageRawFrame): image_appended = False # Check if this image is a result of a function call. if ( frame.request and frame.request.tool_call_id and frame.request.tool_call_id in self._function_calls_in_progress ): self._function_calls_image_results[frame.request.tool_call_id] = frame # Call the result_callback if provided. This signals that the image # has been retrieved and the function call can now complete. if frame.request.result_callback: await frame.request.result_callback(None) else: image_appended = await self._maybe_append_image_to_context(frame) if image_appended: await self.push_context_frame(FrameDirection.UPSTREAM) async def _handle_assistant_image_frame(self, frame: AssistantImageRawFrame): logger.debug(f"{self} Appending AssistantImageRawFrame to LLM context (size: {frame.size})") if frame.original_data and frame.original_mime_type: await self._context.add_image_frame_message( format=frame.original_mime_type, size=frame.size, # Technically doesn't matter, since already encoded image=frame.original_data, role="assistant", ) else: await self._context.add_image_frame_message( format=frame.format, size=frame.size, image=frame.image, role="assistant", ) async def _handle_llm_start(self, _: LLMFullResponseStartFrame): # Realtime mode treats LLMFullResponseStartFrame as the user # turn's end signal for context-writing purposes — see # _realtime_handle_llm_start. if self._realtime_service_mode: await self._realtime_handle_llm_start() return await self._trigger_assistant_turn_started() async def _realtime_handle_llm_start(self): """Realtime mode: commit the paired user message, then start the assistant turn. Realtime mode treats ``LLMFullResponseStartFrame`` as the user turn's end signal for context-writing purposes (in place of ``UserStoppedSpeakingFrame``). The paired user aggregator may defer the actual write for one ``ttfs_p99_latency`` window to allow a late-arriving transcript to land. """ if self._paired_user_aggregator is not None: await self._paired_user_aggregator._realtime_handoff_flush() await self._trigger_assistant_turn_started() async def _handle_llm_end(self, _: LLMFullResponseEndFrame): # Realtime mode failsafe: by the time the assistant response # fully completes, even slow transcripts should have landed. # Cancel any still-pending deferred handoff flush and commit # the user message now so it precedes the assistant message in # context. if self._realtime_service_mode and self._paired_user_aggregator is not None: await self._paired_user_aggregator._realtime_handoff_flush_immediate() await self._trigger_assistant_turn_stopped() async def _handle_push_aggregation(self): # LLMAssistantPushAggregationFrame is emitted by TTSService at the end # of a TTSSpeakFrame-driven utterance (no surrounding LLM response # cycle), so no LLMFullResponseStartFrame ever set the turn-start # timestamp. Open a turn now so on_assistant_turn_stopped fires for the # greeting text the same way it did before LLMAssistantPushAggregationFrame # was introduced. if not self._assistant_turn_start_timestamp: await self._trigger_assistant_turn_started() await self._trigger_assistant_turn_stopped() async def _handle_text(self, frame: TextFrame): # Skip TextFrame types not intended to build the assistant context if isinstance(frame, (TranscriptionFrame, TranslationFrame, InterimTranscriptionFrame)): return if not frame.append_to_context: return # Make sure we really have text (spaces count, too!) if len(frame.text) == 0: return text = ( frame.raw_text if isinstance(frame, AggregatedTextFrame) and frame.raw_text else frame.text ) self._aggregation.append( TextPartForConcatenation( text, includes_inter_part_spaces=frame.includes_inter_frame_spaces ) ) async def _handle_marker_frame(self, frame: LLMMarkerFrame): if frame.append_to_context_immediately: # Stand-alone marker: write it to the context now as its # own assistant message. Used when the marker is the entire # assistant turn — e.g. the ○ / ◐ incomplete-turn signals, # where the spoken response is suppressed and the marker # is the only artifact. self._context.add_message({"role": "assistant", "content": frame.marker}) await self.push_context_frame() timestamp_frame = LLMContextAssistantTimestampFrame(timestamp=time_now_iso8601()) await self.push_frame(timestamp_frame) return # Marker is part of an in-progress assistant response. Append # it to the running aggregation so `push_aggregation` writes # marker + text as a single context message — e.g. the ✓ # complete-turn signal that prefixes the spoken response, # producing "✓ <response>" in context. Markers are stripped # from the transcript via # `_maybe_strip_turn_completion_markers` so consumers see # clean text. self._aggregation.append( TextPartForConcatenation(frame.marker, includes_inter_part_spaces=False) ) async def _handle_thought_start(self, frame: LLMThoughtStartFrame): await self._reset_thought_aggregation() self._thought_append_to_context = frame.append_to_context self._thought_llm = frame.llm self._thought_start_time = time_now_iso8601() async def _handle_thought_text(self, frame: LLMThoughtTextFrame): # Make sure we really have text (spaces count, too!) if len(frame.text) == 0: return self._thought_aggregation.append( TextPartForConcatenation( frame.text, includes_inter_part_spaces=frame.includes_inter_frame_spaces ) ) async def _handle_thought_end(self, frame: LLMThoughtEndFrame): thought = concatenate_aggregated_text(self._thought_aggregation) if self._thought_append_to_context: llm = self._thought_llm self._context.add_message( LLMSpecificMessage( llm=llm, message={ "type": "thought", "text": thought, "signature": frame.signature, }, ) ) message = AssistantThoughtMessage(content=thought, timestamp=self._thought_start_time) await self._reset_thought_aggregation() await self._call_event_handler("on_assistant_thought", message) async def _maybe_append_image_to_context(self, frame: UserImageRawFrame) -> bool: if not frame.append_to_context: return False logger.debug(f"{self} Appending UserImageRawFrame to LLM context (size: {frame.size})") await self._context.add_image_frame_message( format=frame.format, size=frame.size, image=frame.image, text=frame.text, ) return True def _update_function_call_result(self, function_name: str, tool_call_id: str, result: Any): for message in self._context.get_messages(): if ( not isinstance(message, LLMSpecificMessage) and message["role"] == "tool" and message["tool_call_id"] and message["tool_call_id"] == tool_call_id ): message["content"] = result def _context_updated_task_finished(self, task: asyncio.Task): self._context_updated_tasks.discard(task) async def _trigger_assistant_turn_started(self): self._assistant_turn_start_timestamp = time_now_iso8601() await self._call_event_handler("on_assistant_turn_started") async def _trigger_assistant_turn_stopped(self, *, interrupted: bool = False): if not self._assistant_turn_start_timestamp: return aggregation = await self.push_aggregation() if aggregation: # Strip turn completion markers from the transcript aggregation = self._maybe_strip_turn_completion_markers(aggregation) message = AssistantTurnStoppedMessage( content=aggregation, interrupted=interrupted, timestamp=self._assistant_turn_start_timestamp, ) await self._call_event_handler("on_assistant_turn_stopped", message) self._assistant_turn_start_timestamp = "" def _maybe_strip_turn_completion_markers(self, text: str) -> str: """Strip turn completion markers from assistant transcript. These markers (✓, ○, ◐) are used internally for turn completion detection and shouldn't appear in the final transcript. """ from pipecat.turns.user_turn_completion_mixin import ( USER_TURN_COMPLETE_MARKER, USER_TURN_INCOMPLETE_LONG_MARKER, USER_TURN_INCOMPLETE_SHORT_MARKER, ) marker_found = False for marker in ( USER_TURN_COMPLETE_MARKER, USER_TURN_INCOMPLETE_SHORT_MARKER, USER_TURN_INCOMPLETE_LONG_MARKER, ): if marker in text: text = text.replace(marker, "") marker_found = True # Only strip whitespace if we removed a marker return text.strip() if marker_found else text async def _on_request_summarization( self, summarizer: LLMContextSummarizer, frame: LLMContextSummaryRequestFrame ): """Handle summarization request from the summarizer. Push the request frame UPSTREAM to the LLM service for processing. Args: summarizer: The summarizer that generated the request. frame: The summarization request frame to broadcast. """ await self.push_frame(frame, FrameDirection.UPSTREAM) async def _on_summary_applied( self, summarizer: LLMContextSummarizer, event: SummaryAppliedEvent ): """Handle summary applied event from the summarizer. Forwards the event to any registered `on_summary_applied` handlers. Args: summarizer: The summarizer that applied the summary. event: The summary applied event. """ await self._call_event_handler("on_summary_applied", summarizer, event)
[docs] class LLMContextAggregatorPair: """Pair of LLM context aggregators for updating context with user and assistant messages."""
[docs] def __init__( self, context: LLMContext, *, user_params: LLMUserAggregatorParams | None = None, assistant_params: LLMAssistantAggregatorParams | None = None, add_tool_change_messages: bool | None = None, realtime_service_mode: bool = False, ): """Initialize the LLM context aggregator pair. Args: context: The context to be managed by the aggregators. user_params: Parameters for the user context aggregator. assistant_params: Parameters for the assistant context aggregator. add_tool_change_messages: When provided, sets the field of the same name on both ``user_params`` and ``assistant_params``, overriding any value already set on either. This is the preferred way to enable tool-change announcements: it ensures both aggregators participate, which makes the feature robust regardless of which aggregator handles a given ``LLMSetToolsFrame``. The shared context guarantees the announcement is added exactly once (the second aggregator's diff is empty by the time it sees the frame). Leave as ``None`` to respect per-params settings. realtime_service_mode: When ``True``, configures the pair for use with a realtime (speech-to-speech) LLM service. Context writes become trailing — driven by the content stream itself (transcripts, ``LLMFullResponseStartFrame``) rather than turn frames — and turn-end strategies stop waiting for transcripts. Both halves share this setting via a private channel; mismatched halves are rejected at ``StartFrame``. Defaults to ``False`` (cascade behavior). """ user_params = user_params or LLMUserAggregatorParams() assistant_params = assistant_params or LLMAssistantAggregatorParams() if add_tool_change_messages is not None: user_params.add_tool_change_messages = add_tool_change_messages assistant_params.add_tool_change_messages = add_tool_change_messages self._user = LLMUserAggregator( context, params=user_params, _realtime_service_mode=realtime_service_mode, ) self._assistant = LLMAssistantAggregator( context, params=assistant_params, _realtime_service_mode=realtime_service_mode, ) # Cross-half wiring is one-way and only needed in realtime mode. # Realtime mode treats the assistant response start as the user # turn's end signal: the assistant half triggers a (possibly # deferred) flush of the user half so the user message lands in # context before the assistant turn starts. The user side has # nothing to flush back — the assistant writes its own message # when its response ends, just like cascade mode does. if realtime_service_mode: self._assistant._paired_user_aggregator = self._user
[docs] def user(self) -> LLMUserAggregator: """Get the user context aggregator. Returns: The user context aggregator instance. """ return self._user
[docs] def assistant(self) -> LLMAssistantAggregator: """Get the assistant context aggregator. Returns: The assistant context aggregator instance. """ return self._assistant
def __iter__(self): """Allow tuple unpacking of the aggregator pair. This enables both usage patterns:: pair = LLMContextAggregatorPair(context) # Returns the instance user, assistant = LLMContextAggregatorPair(context) # Unpacks into tuple Yields: The user aggregator, then the assistant aggregator. """ return iter((self._user, self._assistant))