Source code for pipecat.utils.context.aggregated_frame_sequencer

#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""Ordered sequencer for AggregatedTextFrame slots through TTS processing."""

from dataclasses import dataclass, field

from loguru import logger

from pipecat.frames.frames import (
    AggregatedTextFrame,
    AggregationType,
    Frame,
    TTSTextFrame,
)
from pipecat.utils.context.word_completion_tracker import WordCompletionTracker


@dataclass
class _AggregatedFrameSlot:
    """Ordered slot tracking one AggregatedTextFrame through TTS processing.

    Every frame that passes through _push_tts_frames — whether spoken or skipped —
    occupies a slot in the sequencer. Skipped frames wait at their position and are
    emitted downstream only after all preceding spoken slots are complete, preserving
    correct context ordering.
    """

    frame: AggregatedTextFrame
    context_id: str
    spoken: bool
    tracker: WordCompletionTracker | None = None
    transport_destination: str | None = None
    complete: bool = False
    includes_inter_frame_spaces: bool = False


[docs] class AggregatedFrameSequencer: """Sequences AggregatedTextFrame slots to preserve TTS context ordering. Manages an ordered queue of spoken and skipped TTS slots. Spoken slots are tracked via a :class:`WordCompletionTracker`; skipped slots (e.g. code blocks excluded from TTS synthesis) wait in-place until all preceding spoken slots are complete, then are flushed downstream with ``append_to_context=True``. All methods are synchronous and return lists of frames the caller should push downstream, making the sequencer fully testable without any async machinery. Example:: sequencer = AggregatedFrameSequencer() sequencer.register_spoken(frame, ctx_id, tracker, append_to_context=True) for f in sequencer.process_word("hello", pts=1000, context_id=ctx_id): await self.push_frame(f) """
[docs] def __init__(self, name: str = "AggregatedFrameSequencer"): """Initialize the sequencer. Args: name: Label used in log messages (typically the owning TTS service name). """ self._name = name self._slots: list[_AggregatedFrameSlot] = [] self._context_append_to_context: dict[str, bool] = {}
[docs] def register_spoken( self, frame: AggregatedTextFrame, context_id: str, tracker: WordCompletionTracker | None, append_to_context: bool, includes_inter_frame_spaces: bool = False, ) -> None: """Register a spoken AggregatedTextFrame slot. Called from _push_tts_frames for frames sent to the TTS service. The slot is marked complete either via :meth:`process_word` (word-timestamp services) or :meth:`complete_spoken_slot` (push_text_frames=True services). Args: frame: The AggregatedTextFrame being spoken. context_id: The TTS context ID assigned to this frame. tracker: WordCompletionTracker for word-timestamp services; None for push_text_frames=True services (they complete via complete_spoken_slot). append_to_context: Whether word frames built for this context should carry append_to_context=True. includes_inter_frame_spaces: When True, every TTSTextFrame emitted for this slot carries ``includes_inter_frame_spaces=True`` so downstream consumers do not inject extra spaces between consecutive frames. """ self._context_append_to_context[context_id] = append_to_context self._slots.append( _AggregatedFrameSlot( frame=frame, context_id=context_id, spoken=True, tracker=tracker, includes_inter_frame_spaces=includes_inter_frame_spaces, ) )
[docs] def register_skipped( self, frame: AggregatedTextFrame, context_id: str, transport_destination: str | None, ) -> list[Frame]: """Register a skipped AggregatedTextFrame and attempt an immediate flush. The frame is appended as a skipped slot. If no incomplete spoken slot precedes it, the frame is returned right away; otherwise it waits until a later :meth:`flush` unblocks it. Args: frame: The skipped AggregatedTextFrame (e.g. a code block). context_id: The context ID assigned in _push_tts_frames. transport_destination: Transport routing value to attach at flush time. Returns: Frames to push downstream (empty when blocked by a preceding spoken slot). """ frame.context_id = context_id self._slots.append( _AggregatedFrameSlot( frame=frame, context_id=context_id, spoken=False, transport_destination=transport_destination, ) ) return self.flush()
[docs] def process_word( self, word: str, pts: int, context_id: str | None, includes_inter_frame_spaces: bool = False, ) -> list[Frame]: """Process one word-timestamp event and return frames to push downstream. Locates the active (first incomplete spoken) slot with a tracker, advances it by the incoming word, and builds a :class:`TTSTextFrame`. Handles: - Normal words that fit entirely within the active slot. - Overflow words straddling two slot boundaries. - Force-complete when the TTS drops an event (word belongs to the next slot). - Passthrough for words not recognised by any slot. - Flushes any skipped slots unblocked by slot completion. Args: word: A word token from the TTS service word-timestamp stream. pts: Presentation timestamp (nanoseconds) to assign to the frame. context_id: TTS context ID from the word-timestamp event. includes_inter_frame_spaces: Stamped onto the emitted TTSTextFrame so downstream consumers know not to inject extra spaces between frames. Returns: Ordered list of frames (TTSTextFrame and/or AggregatedTextFrame) to push. """ active = self._get_active_slot() is_complete = False raw_overflow_word = None if active and active.tracker: if not active.tracker.word_belongs_here(word): next_slot = self._get_next_active_slot(active) word_fits_next = ( next_slot is not None and next_slot.tracker is not None and next_slot.tracker.word_belongs_here(word) ) if not word_fits_next: logger.warning( f"{self._name} Word '{word}' not recognised by any slot, " "emitting as passthrough" ) return [ self._build_word_frame( word, pts, context_id, includes_inter_frame_spaces=includes_inter_frame_spaces, ) ] is_complete = active.tracker.add_word_and_check_complete(word) raw_overflow_word = active.tracker.get_overflow_word() # Give preference to the per-call flag; fall back to the slot's flag. # Also propagate the per-call flag onto the slot so force_complete inherits it. if active and includes_inter_frame_spaces: active.includes_inter_frame_spaces = True slot_ifs = includes_inter_frame_spaces or ( active.includes_inter_frame_spaces if active else False ) frame_text = ( active.tracker.get_word_for_frame() if (active and active.tracker) else word ) or word raw_text = active.tracker.get_llm_consumed() if (active and active.tracker) else None emit_context_id = active.context_id if active else context_id # logger.debug(f"{self._name} Word '{word}' → frame_text='{frame_text}', raw='{raw_text}'") frames: list[Frame] = [ self._build_word_frame( frame_text, pts, emit_context_id, raw_text=raw_text, includes_inter_frame_spaces=slot_ifs, ) ] if is_complete and active: active.complete = True frames.extend(self.flush(last_word_pts=pts)) if raw_overflow_word: logger.debug(f"{self._name} Emitting overflow word '{raw_overflow_word}'") frames.extend(self._process_overflow(raw_overflow_word, pts)) return frames
[docs] def complete_spoken_slot(self) -> list[Frame]: """Mark the first pending spoken slot complete and flush unblocked skipped frames. Used by push_text_frames=True services: after the TTSTextFrame has been appended to the audio context, this marks the spoken slot done and releases any skipped frames waiting behind it. Returns: AggregatedTextFrame(s) that are now unblocked and should be pushed. """ slot = next((s for s in self._slots if s.spoken and not s.complete), None) if slot: slot.complete = True return self.flush()
[docs] def flush(self, last_word_pts: int | None = None) -> list[Frame]: """Walk the slot queue and return all skipped frames that are now unblocked. Removes complete spoken slots from the head of the queue, then emits (and removes) skipped slots whose preceding spoken slots are all done. Stops at the first incomplete spoken slot. Args: last_word_pts: When provided, skipped frames receive this PTS so they appear immediately after the last spoken word in the timeline. Returns: AggregatedTextFrame(s) ready to be pushed downstream. """ frames: list[Frame] = [] while self._slots: slot = self._slots[0] if slot.spoken and slot.complete: self._slots.pop(0) elif not slot.spoken and not slot.complete: slot.frame.append_to_context = True slot.frame.transport_destination = slot.transport_destination if last_word_pts: slot.frame.pts = last_word_pts logger.debug(f"{self._name}: Flushing Aggregated Frame {slot.frame}") frames.append(slot.frame) slot.complete = True self._slots.pop(0) else: break # spoken but not yet complete — wait return frames
[docs] def force_complete(self, last_word_pts: int) -> list[Frame]: """Force-complete all incomplete spoken slots and flush skipped frames. Called at the end of an audio context to handle TTS providers that silently drop word-timestamp events. Emits a TTSTextFrame for any remaining unspoken text in each incomplete slot, marks it complete, then flushes all now-unblocked skipped frames. Args: last_word_pts: PTS of the last received word frame, used as the PTS for force-completed frames and forwarded to :meth:`flush`. Returns: Combined list of TTSTextFrames (for incomplete spoken slots) and AggregatedTextFrames (skipped slots now unblocked), in emission order. """ frames: list[Frame] = [] for slot in self._slots: if slot.spoken and not slot.complete: if slot.tracker: remaining_text = slot.tracker.get_remaining_tts_text() raw_remaining = slot.tracker.get_remaining_llm_text() if raw_remaining and remaining_text and remaining_text not in raw_remaining: logger.warning( f"{self._name} force-complete: raw_remaining {repr(raw_remaining)} " f"does not contain remaining_text {repr(remaining_text)}, discarding" ) raw_remaining = None if remaining_text: logger.debug( f"{self._name} force-completing slot with remaining text " f"{repr(remaining_text)}" ) frames.append( self._build_word_frame( remaining_text, last_word_pts, slot.context_id, raw_text=raw_remaining, includes_inter_frame_spaces=slot.includes_inter_frame_spaces, ) ) slot.complete = True frames.extend(self.flush(last_word_pts=last_word_pts)) return frames
[docs] def clear(self) -> None: """Clear all slots and context metadata (called on interruption/reset).""" self._slots.clear() self._context_append_to_context.clear()
# ------------------------------------------------------------------------- # Internal helpers # ------------------------------------------------------------------------- def _get_active_slot(self) -> _AggregatedFrameSlot | None: """Return the first incomplete spoken slot that has a tracker.""" return next( (s for s in self._slots if s.spoken and not s.complete and s.tracker is not None), None, ) def _get_next_active_slot(self, current: _AggregatedFrameSlot) -> _AggregatedFrameSlot | None: """Return the first incomplete spoken slot with a tracker after *current*.""" found = False for s in self._slots: if s is current: found = True continue if found and s.spoken and not s.complete and s.tracker is not None: return s return None def _build_word_frame( self, text: str, pts: int, context_id: str | None, raw_text: str | None = None, includes_inter_frame_spaces: bool = False, ) -> Frame: """Build a TTSTextFrame with all standard word-timestamp attributes set.""" frame = TTSTextFrame(text, aggregated_by=AggregationType.WORD) frame.pts = pts frame.context_id = context_id frame.append_to_context = ( self._context_append_to_context.get(context_id, True) if context_id is not None else True ) frame.raw_text = raw_text frame.includes_inter_frame_spaces = includes_inter_frame_spaces return frame def _process_overflow(self, raw_overflow_word: str, pts: int) -> list[Frame]: """Feed an overflow suffix into the next active slot and return resulting frames.""" frames: list[Frame] = [] next_active = self._get_active_slot() if not next_active or not next_active.tracker: return frames overflow_complete = next_active.tracker.add_word_and_check_complete(raw_overflow_word) frames.append( self._build_word_frame( raw_overflow_word, pts, next_active.context_id, raw_text=next_active.tracker.get_llm_consumed(), includes_inter_frame_spaces=next_active.includes_inter_frame_spaces, ) ) if overflow_complete: next_active.complete = True frames.extend(self.flush(last_word_pts=pts)) return frames