aggregated_frame_sequencer

Ordered sequencer for AggregatedTextFrame slots through TTS processing.

class pipecat.utils.context.aggregated_frame_sequencer.AggregatedFrameSequencer(name: str = 'AggregatedFrameSequencer')[source]

Bases: object

Sequences AggregatedTextFrame slots to preserve TTS context ordering.

Manages an ordered queue of spoken and skipped TTS slots. Spoken slots are tracked via a WordCompletionTracker; skipped slots (e.g. code blocks excluded from TTS synthesis) wait in-place until all preceding spoken slots are complete, then are flushed downstream with append_to_context=True.

All methods are synchronous and return lists of frames the caller should push downstream, making the sequencer fully testable without any async machinery.

Example:

sequencer = AggregatedFrameSequencer()
sequencer.register_spoken(frame, ctx_id, tracker, append_to_context=True)
for f in sequencer.process_word("hello", pts=1000, context_id=ctx_id):
    await self.push_frame(f)
__init__(name: str = 'AggregatedFrameSequencer')[source]

Initialize the sequencer.

Parameters:

name – Label used in log messages (typically the owning TTS service name).

register_spoken(frame: AggregatedTextFrame, context_id: str, tracker: WordCompletionTracker | None, append_to_context: bool, includes_inter_frame_spaces: bool = False) None[source]

Register a spoken AggregatedTextFrame slot.

Called from _push_tts_frames for frames sent to the TTS service. The slot is marked complete either via process_word() (word-timestamp services) or complete_spoken_slot() (push_text_frames=True services).

Parameters:
  • frame – The AggregatedTextFrame being spoken.

  • context_id – The TTS context ID assigned to this frame.

  • tracker – WordCompletionTracker for word-timestamp services; None for push_text_frames=True services (they complete via complete_spoken_slot).

  • append_to_context – Whether word frames built for this context should carry append_to_context=True.

  • includes_inter_frame_spaces – When True, every TTSTextFrame emitted for this slot carries includes_inter_frame_spaces=True so downstream consumers do not inject extra spaces between consecutive frames.

register_skipped(frame: AggregatedTextFrame, context_id: str, transport_destination: str | None) list[Frame][source]

Register a skipped AggregatedTextFrame and attempt an immediate flush.

The frame is appended as a skipped slot. If no incomplete spoken slot precedes it, the frame is returned right away; otherwise it waits until a later flush() unblocks it.

Parameters:
  • frame – The skipped AggregatedTextFrame (e.g. a code block).

  • context_id – The context ID assigned in _push_tts_frames.

  • transport_destination – Transport routing value to attach at flush time.

Returns:

Frames to push downstream (empty when blocked by a preceding spoken slot).

process_word(word: str, pts: int, context_id: str | None, includes_inter_frame_spaces: bool = False) list[Frame][source]

Process one word-timestamp event and return frames to push downstream.

Locates the active (first incomplete spoken) slot with a tracker, advances it by the incoming word, and builds a TTSTextFrame. Handles:

  • Normal words that fit entirely within the active slot.

  • Overflow words straddling two slot boundaries.

  • Force-complete when the TTS drops an event (word belongs to the next slot).

  • Passthrough for words not recognised by any slot.

  • Flushes any skipped slots unblocked by slot completion.

Parameters:
  • word – A word token from the TTS service word-timestamp stream.

  • pts – Presentation timestamp (nanoseconds) to assign to the frame.

  • context_id – TTS context ID from the word-timestamp event.

  • includes_inter_frame_spaces – Stamped onto the emitted TTSTextFrame so downstream consumers know not to inject extra spaces between frames.

Returns:

Ordered list of frames (TTSTextFrame and/or AggregatedTextFrame) to push.

complete_spoken_slot() list[Frame][source]

Mark the first pending spoken slot complete and flush unblocked skipped frames.

Used by push_text_frames=True services: after the TTSTextFrame has been appended to the audio context, this marks the spoken slot done and releases any skipped frames waiting behind it.

Returns:

AggregatedTextFrame(s) that are now unblocked and should be pushed.

flush(last_word_pts: int | None = None) list[Frame][source]

Walk the slot queue and return all skipped frames that are now unblocked.

Removes complete spoken slots from the head of the queue, then emits (and removes) skipped slots whose preceding spoken slots are all done. Stops at the first incomplete spoken slot.

Parameters:

last_word_pts – When provided, skipped frames receive this PTS so they appear immediately after the last spoken word in the timeline.

Returns:

AggregatedTextFrame(s) ready to be pushed downstream.

force_complete(last_word_pts: int) list[Frame][source]

Force-complete all incomplete spoken slots and flush skipped frames.

Called at the end of an audio context to handle TTS providers that silently drop word-timestamp events. Emits a TTSTextFrame for any remaining unspoken text in each incomplete slot, marks it complete, then flushes all now-unblocked skipped frames.

Parameters:

last_word_pts – PTS of the last received word frame, used as the PTS for force-completed frames and forwarded to flush().

Returns:

Combined list of TTSTextFrames (for incomplete spoken slots) and AggregatedTextFrames (skipped slots now unblocked), in emission order.

clear() None[source]

Clear all slots and context metadata (called on interruption/reset).