#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""ElevenLabs text-to-speech service implementations.
This module provides WebSocket and HTTP-based TTS services using ElevenLabs API
with support for streaming audio, word timestamps, and voice customization.
"""
import asyncio
import base64
import json
from collections.abc import AsyncGenerator, Mapping
from dataclasses import dataclass, field
from typing import (
Any,
ClassVar,
Literal,
Union,
)
import aiohttp
from loguru import logger
from pydantic import BaseModel
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
InterruptionFrame,
LLMFullResponseEndFrame,
StartFrame,
TTSAudioRawFrame,
TTSStartedFrame,
TTSStoppedFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.settings import NOT_GIVEN, TTSSettings, _NotGiven, assert_given
from pipecat.services.tts_service import (
TextAggregationMode,
TTSService,
WebsocketTTSService,
)
from pipecat.transcriptions.language import Language, resolve_language
from pipecat.utils.tracing.service_decorators import traced_tts
# See .env.example for ElevenLabs configuration needed
try:
import websockets
from websockets.asyncio.client import connect as websocket_connect
from websockets.protocol import State
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use ElevenLabs, you need to `pip install pipecat-ai[elevenlabs]`.")
raise ImportError(f"Missing module: {e}") from e
# Models that support language codes
# The following models are excluded as they don't support language codes:
# - eleven_flash_v2
# - eleven_turbo_v2
# - eleven_multilingual_v2
ELEVENLABS_MULTILINGUAL_MODELS = {
"eleven_flash_v2_5",
"eleven_turbo_v2_5",
}
[docs]
def language_to_elevenlabs_language(language: Language) -> str:
"""Convert a Language enum to ElevenLabs language code.
Args:
language: The Language enum value to convert.
Returns:
The corresponding service language code. If ``language`` is not in
the verified mapping, falls back to the base language code (e.g.,
``en`` from ``en-US``) and logs a warning (via
``resolve_language(..., use_base_code=True)``).
"""
LANGUAGE_MAP = {
Language.AR: "ar",
Language.BG: "bg",
Language.CS: "cs",
Language.DA: "da",
Language.DE: "de",
Language.EL: "el",
Language.EN: "en",
Language.ES: "es",
Language.FI: "fi",
Language.FIL: "fil",
Language.FR: "fr",
Language.HI: "hi",
Language.HR: "hr",
Language.HU: "hu",
Language.ID: "id",
Language.IT: "it",
Language.JA: "ja",
Language.KO: "ko",
Language.MS: "ms",
Language.NL: "nl",
Language.NO: "no",
Language.PL: "pl",
Language.PT: "pt",
Language.RO: "ro",
Language.RU: "ru",
Language.SK: "sk",
Language.SV: "sv",
Language.TA: "ta",
Language.TR: "tr",
Language.UK: "uk",
Language.VI: "vi",
Language.ZH: "zh",
}
return resolve_language(language, LANGUAGE_MAP, use_base_code=True)
def _is_chinese_or_japanese_language(language: str) -> bool:
"""Check if the given language is Chinese or Japanese."""
base_lang = language.split("-")[0].lower()
return base_lang in {"zh", "ja"}
def _word_timestamps_include_inter_frame_spaces(language: str | None) -> bool:
"""Whether timestamp text should be treated as carrying its own spacing."""
return bool(language and _is_chinese_or_japanese_language(language))
[docs]
def build_elevenlabs_voice_settings(
settings: Union[dict[str, Any], "TTSSettings"],
) -> dict[str, float | bool] | None:
"""Build voice settings dictionary for ElevenLabs based on provided settings.
Args:
settings: Dictionary or settings containing voice settings parameters.
Returns:
Dictionary of voice settings or None if no valid settings are provided.
"""
voice_setting_keys = ["stability", "similarity_boost", "style", "use_speaker_boost", "speed"]
voice_settings = {}
for key in voice_setting_keys:
val = (
getattr(settings, key, None) if isinstance(settings, TTSSettings) else settings.get(key)
)
if val is not None:
voice_settings[key] = val
return voice_settings or None
[docs]
class PronunciationDictionaryLocator(BaseModel):
"""Locator for a pronunciation dictionary.
Parameters:
pronunciation_dictionary_id: The ID of the pronunciation dictionary.
version_id: The version ID of the pronunciation dictionary.
"""
pronunciation_dictionary_id: str
version_id: str
[docs]
@dataclass
class ElevenLabsTTSSettings(TTSSettings):
"""Settings for ElevenLabsTTSService.
Fields that appear in the WebSocket URL (``voice``, ``model``,
``language``) require a full reconnect when changed. Fields that
affect the voice character (``stability``, ``similarity_boost``,
``style``, ``use_speaker_boost``, ``speed``) can be applied by closing
the current audio context so a new one is opened with updated settings.
Parameters:
stability: Voice stability control (0.0 to 1.0).
similarity_boost: Similarity boost control (0.0 to 1.0).
style: Style control for voice expression (0.0 to 1.0).
use_speaker_boost: Whether to use speaker boost enhancement.
speed: Voice speed control (0.7 to 1.2).
apply_text_normalization: Text normalization mode ("auto", "on", "off").
"""
stability: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
similarity_boost: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
style: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
use_speaker_boost: bool | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
speed: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
apply_text_normalization: Literal["auto", "on", "off"] | None | _NotGiven = field(
default_factory=lambda: NOT_GIVEN
)
#: Fields in the WS URL — changing any of these requires a reconnect.
URL_FIELDS: ClassVar[frozenset[str]] = frozenset({"voice", "model", "language"})
#: Fields affecting voice character — changing these requires closing the
#: current audio context so the next one picks up new settings.
VOICE_SETTINGS_FIELDS: ClassVar[frozenset[str]] = frozenset(
{"stability", "similarity_boost", "style", "use_speaker_boost", "speed"}
)
[docs]
@dataclass
class ElevenLabsHttpTTSSettings(TTSSettings):
"""Settings for ElevenLabsHttpTTSService.
Parameters:
optimize_streaming_latency: Latency optimization level (0-4).
stability: Voice stability control (0.0 to 1.0).
similarity_boost: Similarity boost control (0.0 to 1.0).
style: Style control for voice expression (0.0 to 1.0).
use_speaker_boost: Whether to use speaker boost enhancement.
speed: Voice speed control (0.25 to 4.0).
apply_text_normalization: Text normalization mode ("auto", "on", "off").
"""
optimize_streaming_latency: int | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
stability: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
similarity_boost: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
style: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
use_speaker_boost: bool | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
speed: float | None | _NotGiven = field(default_factory=lambda: NOT_GIVEN)
apply_text_normalization: Literal["auto", "on", "off"] | None | _NotGiven = field(
default_factory=lambda: NOT_GIVEN
)
def _select_alignment(
msg: Mapping[str, Any],
*,
normalized_key: str,
alignment_key: str,
prefer_normalized: bool,
) -> Mapping[str, Any] | None:
"""Pick the alignment field to use from a TTS message, with fallback.
ElevenLabs returns two alignment fields per chunk:
- ``normalized_key`` (``normalizedAlignment`` for WebSocket,
``normalized_alignment`` for HTTP): the post-normalized form of what was
spoken - pronunciation-dictionary substitutions, text normalization, or
romanization of non-Latin scripts (e.g., Chinese rendered as pinyin).
- ``alignment_key`` (``alignment``): the original input characters.
Prefer ``normalized`` only when a pronunciation dictionary is configured -
that's the case where ``alignment`` has overlapping restarts that produce
duplicated/garbled words (issue #4316). Otherwise prefer ``alignment`` so
the LLM context preserves the original input rather than the normalized
form. Fall back to the other field if the preferred one is missing or
null - the API schema marks both as nullable.
Args:
msg: TTS response message from ElevenLabs.
normalized_key: Key for the normalized-alignment field on this transport.
alignment_key: Key for the original-alignment field on this transport.
prefer_normalized: True iff the caller is using pronunciation dictionaries.
Returns:
The chosen alignment dict, or ``None`` if both fields are absent/null.
"""
if prefer_normalized:
return msg.get(normalized_key) or msg.get(alignment_key)
return msg.get(alignment_key) or msg.get(normalized_key)
def _strip_utterance_leading_spaces(
alignment: Mapping[str, Any], keys: tuple[str, str, str], should_strip: bool
) -> Mapping[str, Any]:
"""Return alignment with utterance-leading space chars removed, if requested.
ElevenLabs Flash normalized alignment chunks can begin with a leading space
at the start of an utterance. Strip only utterance-leading spaces so bot
turn text does not start with whitespace. On subsequent chunks, however, a
leading space can be a real inter-word separator (Flash models commonly
split sentences this way), so it must be preserved for
``calculate_word_times`` to flush any partial word carried over from the
previous chunk.
Args:
alignment: Alignment dict from the API.
keys: Tuple of (chars_key, start_times_key, durations_or_end_times_key)
naming the three parallel arrays - these differ between the
WebSocket and HTTP response schemas.
should_strip: Whether this is still utterance-leading alignment data.
"""
chars_key, starts_key, tail_key = keys
chars = alignment.get(chars_key) or []
if should_strip and chars and chars[0] == " ":
strip_count = 0
while strip_count < len(chars) and chars[strip_count] == " ":
strip_count += 1
stripped = dict(alignment)
stripped[chars_key] = chars[strip_count:]
stripped[starts_key] = alignment.get(starts_key, [])[strip_count:]
stripped[tail_key] = alignment.get(tail_key, [])[strip_count:]
return stripped
return alignment
[docs]
def calculate_word_times(
alignment_info: Mapping[str, Any],
cumulative_time: float,
partial_word: str = "",
partial_word_start_time: float = 0.0,
) -> tuple[list[tuple[str, float]], str, float]:
"""Calculate word timestamps from character alignment information.
Args:
alignment_info: Character alignment data from ElevenLabs API.
cumulative_time: Base time offset for this chunk.
partial_word: Partial word carried over from previous chunk.
partial_word_start_time: Start time of the partial word.
Returns:
Tuple of (word_times, new_partial_word, new_partial_word_start_time):
- word_times: List of (word, timestamp) tuples for complete words
- new_partial_word: Incomplete word at end of chunk (empty if chunk ends with space)
- new_partial_word_start_time: Start time of the incomplete word
"""
chars = alignment_info["chars"]
char_start_times_ms = alignment_info["charStartTimesMs"]
if len(chars) != len(char_start_times_ms):
logger.error(
f"calculate_word_times: length mismatch - chars={len(chars)}, times={len(char_start_times_ms)}"
)
return ([], partial_word, partial_word_start_time)
# Build words and track their start positions
words = []
word_start_times = []
current_word = partial_word # Start with any partial word from previous chunk
word_start_time = partial_word_start_time if partial_word else None
for i, char in enumerate(chars):
if char == " ":
# End of current word
if current_word: # Only add non-empty words
words.append(current_word)
word_start_times.append(word_start_time)
current_word = ""
word_start_time = None
else:
# Building a word
if word_start_time is None: # First character of new word
# Convert from milliseconds to seconds and add cumulative offset
word_start_time = cumulative_time + (char_start_times_ms[i] / 1000.0)
current_word += char
# Build result for complete words
word_times = list(zip(words, word_start_times))
# Return any incomplete word at the end of this chunk
new_partial_word = current_word if current_word else ""
new_partial_word_start_time = word_start_time if word_start_time is not None else 0.0
return (word_times, new_partial_word, new_partial_word_start_time)
[docs]
class ElevenLabsTTSService(WebsocketTTSService):
"""ElevenLabs WebSocket-based TTS service with word timestamps.
Provides real-time text-to-speech using ElevenLabs' WebSocket streaming API.
Supports word-level timestamps, audio context management, and various voice
customization options including stability, similarity boost, and speed controls.
"""
Settings = ElevenLabsTTSSettings
_settings: Settings
[docs]
def __init__(
self,
*,
api_key: str,
voice_id: str | None = None,
model: str | None = None,
url: str = "wss://api.elevenlabs.io",
sample_rate: int | None = None,
auto_mode: bool | None = None,
enable_ssml_parsing: bool | None = None,
enable_logging: bool | None = None,
pronunciation_dictionary_locators: list[PronunciationDictionaryLocator] | None = None,
params: InputParams | None = None,
settings: Settings | None = None,
text_aggregation_mode: TextAggregationMode | None = None,
aggregate_sentences: bool | None = None,
**kwargs,
):
"""Initialize the ElevenLabs TTS service.
Args:
api_key: ElevenLabs API key for authentication.
voice_id: ID of the voice to use for synthesis.
.. deprecated:: 0.0.105
Use ``settings=ElevenLabsTTSService.Settings(voice=...)`` instead.
model: TTS model to use (e.g., "eleven_turbo_v2_5").
.. deprecated:: 0.0.105
Use ``settings=ElevenLabsTTSService.Settings(model=...)`` instead.
url: WebSocket URL for ElevenLabs TTS API.
sample_rate: Audio sample rate. If None, uses default.
auto_mode: Whether to enable ElevenLabs' auto mode, which reduces
latency by disabling server-side chunk scheduling and buffering.
Recommended when sending complete sentences or phrases. When
None (default), auto mode is enabled for ``SENTENCE``
aggregation and disabled for ``TOKEN`` aggregation — because
token streaming relies on the server-side chunk scheduler to
accumulate enough text for natural-sounding synthesis.
enable_ssml_parsing: Whether to parse SSML tags in text.
enable_logging: Whether to enable ElevenLabs server-side logging.
pronunciation_dictionary_locators: List of pronunciation dictionary
locators to use.
params: Additional input parameters for voice customization.
.. deprecated:: 0.0.105
Use ``settings=ElevenLabsTTSService.Settings(...)`` instead.
settings: Runtime-updatable settings. When provided alongside deprecated
parameters, ``settings`` values take precedence.
text_aggregation_mode: How to aggregate incoming text before synthesis.
aggregate_sentences: Whether to aggregate sentences within the TTSService.
.. deprecated:: 0.0.104
Use ``text_aggregation_mode`` instead.
**kwargs: Additional arguments passed to the parent service.
"""
# By default, we aggregate sentences before sending to TTS. This adds
# ~200-300ms of latency per sentence (waiting for the sentence-ending
# punctuation token from the LLM). Setting
# text_aggregation_mode=TextAggregationMode.TOKEN streams tokens
# directly. To use this mode, you must set auto_mode=False. This
# eliminates aggregation time, but slows down ElevenLabs.
#
# We also don't want to automatically push LLM response text frames,
# because the context aggregators will add them to the LLM context even
# if we're interrupted. ElevenLabs gives us word-by-word timestamps. We
# can use those to generate text frames ourselves aligned with the
# playout timing of the audio!
#
# Finally, ElevenLabs doesn't provide information on when the bot stops
# speaking for a while, so we want the parent class to send TTSStopFrame
# after a short period not receiving any audio.
# 1. Initialize default_settings with hardcoded defaults
default_settings = self.Settings(
model="eleven_turbo_v2_5",
voice=None,
language=None,
stability=None,
similarity_boost=None,
style=None,
use_speaker_boost=None,
speed=None,
apply_text_normalization=None,
)
# 2. Apply direct init arg overrides (deprecated)
if voice_id is not None:
self._warn_init_param_moved_to_settings("voice_id", "voice")
default_settings.voice = voice_id
if model is not None:
self._warn_init_param_moved_to_settings("model", "model")
default_settings.model = model
# 3. Apply params overrides — only if settings not provided
_pronunciation_dictionary_locators = pronunciation_dictionary_locators
if params is not None:
self._warn_init_param_moved_to_settings("params")
if not settings:
if params.language is not None:
default_settings.language = params.language
if params.stability is not None:
default_settings.stability = params.stability
if params.similarity_boost is not None:
default_settings.similarity_boost = params.similarity_boost
if params.style is not None:
default_settings.style = params.style
if params.use_speaker_boost is not None:
default_settings.use_speaker_boost = params.use_speaker_boost
if params.speed is not None:
default_settings.speed = params.speed
if params.auto_mode is not None:
auto_mode = params.auto_mode
if params.enable_ssml_parsing is not None:
enable_ssml_parsing = params.enable_ssml_parsing
if params.enable_logging is not None:
enable_logging = params.enable_logging
if params.apply_text_normalization is not None:
default_settings.apply_text_normalization = params.apply_text_normalization
if _pronunciation_dictionary_locators is None:
_pronunciation_dictionary_locators = params.pronunciation_dictionary_locators
# 4. Apply settings delta (canonical API, always wins)
if settings is not None:
default_settings.apply_update(settings)
super().__init__(
text_aggregation_mode=text_aggregation_mode,
aggregate_sentences=aggregate_sentences,
push_text_frames=False,
push_stop_frames=False,
pause_frame_processing=True,
sample_rate=sample_rate,
settings=default_settings,
**kwargs,
)
self._api_key = api_key
self._url = url
# Init-only WebSocket URL params (not runtime-updatable).
#
# ElevenLabs' auto mode reduces latency by disabling server-side chunk
# scheduling and buffering — it's designed for inputs that are already
# complete sentences or phrases. In TOKEN mode we stream individual LLM
# tokens, so we need the server-side scheduler to accumulate enough
# text for natural-sounding synthesis; enabling auto mode there would
# hurt quality. When the caller hasn't set auto_mode explicitly, we
# derive the right default from the text aggregation strategy.
if auto_mode is None:
auto_mode = self._text_aggregation_mode != TextAggregationMode.TOKEN
self._auto_mode = auto_mode
self._enable_ssml_parsing = enable_ssml_parsing
self._enable_logging = enable_logging
self._output_format = "" # initialized in start()
self._voice_settings = self._set_voice_settings()
self._pronunciation_dictionary_locators = _pronunciation_dictionary_locators
self._cumulative_time = 0
# Track partial words that span across alignment chunks
self._partial_word = ""
self._partial_word_start_time = 0.0
self._alignment_started_context_ids: set[str | None] = set()
# Context IDs whose context-init has been sent, so the keepalive knows
# which contexts are safe to target.
self._context_init_sent: set[str] = set()
# Context management for v1 multi API
self._receive_task = None
self._keepalive_task = None
[docs]
def can_generate_metrics(self) -> bool:
"""Check if this service can generate processing metrics.
Returns:
True, as ElevenLabs service supports metrics generation.
"""
return True
[docs]
def language_to_service_language(self, language: Language) -> str | None:
"""Convert a Language enum to ElevenLabs language format.
Args:
language: The language to convert.
Returns:
The ElevenLabs-specific language code, or None if not supported.
"""
return language_to_elevenlabs_language(language)
def _set_voice_settings(self):
return build_elevenlabs_voice_settings(self._settings)
async def _update_settings(self, delta: TTSSettings) -> dict[str, Any]:
"""Apply a settings delta, reconnecting as needed.
Uses the declarative ``URL_FIELDS`` and ``VOICE_SETTINGS_FIELDS``
sets on :class:`ElevenLabsTTSService.Settings` to decide whether to
reconnect the WebSocket or close the current audio context.
Args:
delta: A :class:`TTSSettings` (or ``ElevenLabsTTSService.Settings``) delta.
Returns:
Dict mapping changed field names to their previous values.
"""
changed = await super()._update_settings(delta)
if not changed:
return changed
# Rebuild voice settings for next context
self._voice_settings = self._set_voice_settings()
url_changed = bool(changed.keys() & self.Settings.URL_FIELDS)
voice_settings_changed = bool(changed.keys() & self.Settings.VOICE_SETTINGS_FIELDS)
if url_changed:
logger.debug(
f"URL-level setting changed ({changed.keys() & self.Settings.URL_FIELDS}), "
f"reconnecting WebSocket"
)
await self._disconnect()
await self._connect()
elif voice_settings_changed:
logger.debug(
f"Voice settings changed ({changed.keys() & self.Settings.VOICE_SETTINGS_FIELDS}), "
f"closing current context to apply changes"
)
audio_contexts = self.get_audio_contexts()
if audio_contexts:
for ctx_id in audio_contexts:
await self._close_context(ctx_id)
self._reset_alignment_state(ctx_id)
if not url_changed:
# Reconnect applies all settings; only warn about fields not handled
# by voice settings or URL changes.
handled = self.Settings.URL_FIELDS | self.Settings.VOICE_SETTINGS_FIELDS
self._warn_unhandled_updated_settings(changed.keys() - handled)
return changed
[docs]
async def start(self, frame: StartFrame):
"""Start the ElevenLabs TTS service.
Args:
frame: The start frame containing initialization parameters.
"""
await super().start(frame)
self._output_format = output_format_from_sample_rate(self.sample_rate)
await self._connect()
[docs]
async def stop(self, frame: EndFrame):
"""Stop the ElevenLabs TTS service.
Args:
frame: The end frame.
"""
await super().stop(frame)
await self._disconnect()
[docs]
async def cancel(self, frame: CancelFrame):
"""Cancel the ElevenLabs TTS service.
Args:
frame: The cancel frame.
"""
await super().cancel(frame)
await self._disconnect()
[docs]
async def flush_audio(self, context_id: str | None = None):
"""Flush any pending audio and finalize the current context.
Args:
context_id: The specific context to flush. If None, falls back to the
currently active context.
"""
flush_id = context_id or self.get_active_audio_context_id()
if not flush_id or not self._websocket:
return
logger.trace(f"{self}: flushing audio")
msg = {"context_id": flush_id, "flush": True}
await self._websocket.send(json.dumps(msg))
async def _connect(self):
await super()._connect()
await self._connect_websocket()
if self._websocket and not self._receive_task:
self._receive_task = self.create_task(self._receive_task_handler(self._report_error))
if self._websocket and not self._keepalive_task:
self._keepalive_task = self.create_task(self._keepalive_task_handler())
async def _disconnect(self):
await super()._disconnect()
if self._receive_task:
await self.cancel_task(self._receive_task)
self._receive_task = None
if self._keepalive_task:
await self.cancel_task(self._keepalive_task)
self._keepalive_task = None
await self._disconnect_websocket()
async def _connect_websocket(self):
try:
if self._websocket and self._websocket.state is State.OPEN:
return
logger.debug("Connecting to ElevenLabs")
voice_id = self._settings.voice
model = self._settings.model
output_format = self._output_format
url = f"{self._url}/v1/text-to-speech/{voice_id}/multi-stream-input?model_id={model}&output_format={output_format}&auto_mode={str(self._auto_mode).lower()}"
if self._enable_ssml_parsing is not None:
url += f"&enable_ssml_parsing={str(self._enable_ssml_parsing).lower()}"
if self._enable_logging is not None:
url += f"&enable_logging={str(self._enable_logging).lower()}"
if self._settings.apply_text_normalization is not None:
url += f"&apply_text_normalization={self._settings.apply_text_normalization}"
# Language can only be used with the ELEVENLABS_MULTILINGUAL_MODELS
language = self._settings.language
if model in ELEVENLABS_MULTILINGUAL_MODELS and language is not None:
url += f"&language_code={language}"
logger.debug(f"Using language code: {language}")
elif language is not None:
logger.warning(
f"Language code [{language}] not applied. Language codes can only be used with multilingual models: {', '.join(sorted(ELEVENLABS_MULTILINGUAL_MODELS))}"
)
# Set max websocket message size to 16MB for large audio responses
self._websocket = await websocket_connect(
url, max_size=16 * 1024 * 1024, additional_headers={"xi-api-key": self._api_key}
)
await self._call_event_handler("on_connected")
except Exception as e:
self._websocket = None
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
await self._call_event_handler("on_connection_error", f"{e}")
async def _disconnect_websocket(self):
try:
await self.stop_all_metrics()
if self._websocket:
logger.debug("Disconnecting from ElevenLabs")
await self._websocket.send(json.dumps({"close_socket": True}))
await self._websocket.close()
logger.debug("Disconnected from ElevenLabs")
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
finally:
await self.remove_active_audio_context()
self._websocket = None
self._context_init_sent.clear()
await self._call_event_handler("on_disconnected")
def _get_websocket(self):
if self._websocket:
return self._websocket
raise Exception("Websocket not connected")
async def _close_context(self, context_id: str):
# ElevenLabs requires that Pipecat explicitly closes contexts to free
# server-side resources, both on interruption and on normal completion.
if context_id and self._websocket:
logger.trace(f"{self}: Closing context {context_id}")
try:
# ElevenLabs requires that Pipecat manages the contexts and closes them
# when they're not longer in use. Since an InterruptionFrame is pushed
# every time the user speaks, we'll use this as a trigger to close the context
# and reset the state.
# Note: We do not need to call remove_audio_context here, as the context is
# automatically reset when super ()._handle_interruption is called.
await self._websocket.send(
json.dumps({"context_id": context_id, "close_context": True})
)
except Exception as e:
await self.push_error(error_msg=f"Unknown error occurred: {e}", exception=e)
def _reset_alignment_state(self, context_id: str):
self._cumulative_time = 0.0
self._partial_word = ""
self._partial_word_start_time = 0.0
self._alignment_started_context_ids.discard(context_id)
self._context_init_sent.discard(context_id)
[docs]
async def on_audio_context_interrupted(self, context_id: str):
"""Close the ElevenLabs context when the bot is interrupted."""
await self._close_context(context_id)
self._reset_alignment_state(context_id)
await super().on_audio_context_interrupted(context_id)
[docs]
async def on_audio_context_completed(self, context_id: str):
"""Reset alignment state after all audio for the context has played."""
self._reset_alignment_state(context_id)
await super().on_audio_context_completed(context_id)
[docs]
async def on_turn_context_completed(self):
"""Close the server-side context at end of turn.
Sends close_context so isFinal arrives immediately after the last audio byte.
"""
context_id = self._turn_context_id
await super().on_turn_context_completed()
if context_id:
await self._close_context(context_id)
async def _receive_messages(self):
"""Handle incoming WebSocket messages from ElevenLabs."""
async for message in self._get_websocket():
msg = json.loads(message)
received_ctx_id = msg.get("contextId")
# Handle final messages first, regardless of context availability
if msg.get("isFinal") is True:
logger.debug(f"Received final message for context {received_ctx_id}")
# In case of interruption, there is no audio context available, so we don’t need to do anything.
if self.audio_context_available(received_ctx_id):
await self.append_to_audio_context(
received_ctx_id, TTSStoppedFrame(context_id=received_ctx_id)
)
await self.remove_audio_context(received_ctx_id)
continue
if msg.get("audio"):
audio = base64.b64decode(msg["audio"])
frame = TTSAudioRawFrame(audio, self.sample_rate, 1, context_id=received_ctx_id)
await self.append_to_audio_context(received_ctx_id, frame)
raw_alignment = _select_alignment(
msg,
normalized_key="normalizedAlignment",
alignment_key="alignment",
prefer_normalized=bool(self._pronunciation_dictionary_locators),
)
if raw_alignment:
alignment = _strip_utterance_leading_spaces(
raw_alignment,
("chars", "charStartTimesMs", "charDurationsMs"),
received_ctx_id not in self._alignment_started_context_ids,
)
self._alignment_started_context_ids.add(received_ctx_id)
word_times, self._partial_word, self._partial_word_start_time = (
calculate_word_times(
alignment,
self._cumulative_time,
self._partial_word,
self._partial_word_start_time,
)
)
if word_times:
await self.add_word_timestamps(
word_times,
received_ctx_id,
includes_inter_frame_spaces=(
True
if _word_timestamps_include_inter_frame_spaces(
assert_given(self._settings.language)
)
else None
),
)
# Calculate the actual end time of this audio chunk
char_start_times_ms = alignment.get("charStartTimesMs", [])
char_durations_ms = alignment.get("charDurationsMs", [])
if char_start_times_ms and char_durations_ms:
# End time = start time of last character + duration of last character
chunk_end_time_ms = char_start_times_ms[-1] + char_durations_ms[-1]
chunk_end_time_seconds = chunk_end_time_ms / 1000.0
self._cumulative_time += chunk_end_time_seconds
else:
# Fallback: use the last word's start time (current behavior)
self._cumulative_time = word_times[-1][1]
logger.warning(
"_receive_messages: using fallback timing method - consider investigating alignment data structure"
)
async def _keepalive_task_handler(self):
"""Send periodic keepalive messages to maintain WebSocket connection."""
KEEPALIVE_SLEEP = 10
while True:
await asyncio.sleep(KEEPALIVE_SLEEP)
try:
await self._send_keepalive()
except websockets.ConnectionClosed as e:
logger.warning(f"{self} keepalive error: {e}")
break
async def _send_keepalive(self):
"""Send a single keepalive message to keep the WebSocket connection alive.
Only stamps a ``context_id`` once its context-init (carrying
``voice_settings``) has been sent. Otherwise the keepalive would be the
context's first message, with no ``voice_settings``, and ElevenLabs would
reject the later context-init with a 1008 policy violation. A context-less
keepalive is sufficient until the context-init is sent.
"""
if not self._websocket or self._websocket.state is not State.OPEN:
return
context_id = self.get_active_audio_context_id()
if context_id and context_id in self._context_init_sent:
# The context's voice_settings context-init has been sent, so it's
# safe to keep that context alive.
keepalive_message = {"text": "", "context_id": context_id}
else:
# No active context, or the active context's context-init hasn't been
# sent yet. A context-less keepalive keeps the connection alive without
# opening the context prematurely.
keepalive_message = {"text": ""}
await self._websocket.send(json.dumps(keepalive_message))
async def _send_text(self, text: str, context_id: str):
"""Send text to the WebSocket for synthesis."""
if self._websocket and context_id:
msg = {"text": text, "context_id": context_id}
await self._websocket.send(json.dumps(msg))
[docs]
@traced_tts
async def run_tts(self, text: str, context_id: str) -> AsyncGenerator[Frame | None, None]:
"""Generate speech from text using ElevenLabs' streaming WebSocket API.
Args:
text: The text to synthesize into speech.
context_id: The context ID for tracking audio frames.
Yields:
Frame: Audio frames containing the synthesized speech.
"""
logger.debug(f"{self}: Generating TTS [{text}]")
try:
if not self._websocket or self._websocket.state is State.CLOSED:
await self._connect()
if self._websocket is None:
logger.warning(f"{self}: websocket unavailable after reconnect, skipping TTS")
yield ErrorFrame(error="websocket unavailable")
return
try:
if not self.audio_context_available(context_id):
await self.create_audio_context(context_id)
await self.start_ttfb_metrics()
yield TTSStartedFrame(context_id=context_id)
self._cumulative_time = 0
self._partial_word = ""
self._partial_word_start_time = 0.0
# Initialize context with voice settings and pronunciation dictionaries
msg: dict[str, Any] = {"text": " ", "context_id": context_id}
if self._voice_settings:
msg["voice_settings"] = self._voice_settings
if self._pronunciation_dictionary_locators:
msg["pronunciation_dictionary_locators"] = [
locator.model_dump()
for locator in self._pronunciation_dictionary_locators
]
# Mark the context-init as sent so the keepalive may now
# target this context_id.
self._context_init_sent.add(context_id)
await self._websocket.send(json.dumps(msg))
logger.trace(f"Created new context {context_id}")
await self._send_text(text, context_id)
await self.start_tts_usage_metrics(text)
except Exception as e:
yield TTSStoppedFrame(context_id=context_id)
yield ErrorFrame(error=f"Unknown error occurred: {e}")
return
yield None
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
[docs]
class ElevenLabsHttpTTSService(TTSService):
"""ElevenLabs HTTP-based TTS service with word timestamps.
Provides text-to-speech using ElevenLabs' HTTP streaming API for simpler,
non-WebSocket integration. Suitable for use cases where streaming WebSocket
connection is not required or desired.
"""
Settings = ElevenLabsHttpTTSSettings
_settings: Settings
[docs]
def __init__(
self,
*,
api_key: str,
voice_id: str | None = None,
aiohttp_session: aiohttp.ClientSession,
model: str | None = None,
base_url: str = "https://api.elevenlabs.io",
sample_rate: int | None = None,
enable_logging: bool | None = None,
pronunciation_dictionary_locators: list[PronunciationDictionaryLocator] | None = None,
params: InputParams | None = None,
settings: Settings | None = None,
text_aggregation_mode: TextAggregationMode | None = None,
aggregate_sentences: bool | None = None,
**kwargs,
):
"""Initialize the ElevenLabs HTTP TTS service.
Args:
api_key: ElevenLabs API key for authentication.
voice_id: ID of the voice to use for synthesis.
.. deprecated:: 0.0.105
Use ``settings=ElevenLabsHttpTTSService.Settings(voice=...)`` instead.
aiohttp_session: aiohttp ClientSession for HTTP requests.
model: TTS model to use (e.g., "eleven_turbo_v2_5").
.. deprecated:: 0.0.105
Use ``settings=ElevenLabsHttpTTSService.Settings(model=...)`` instead.
base_url: Base URL for ElevenLabs HTTP API.
sample_rate: Audio sample rate. If None, uses default.
enable_logging: Whether to enable ElevenLabs server-side logging.
Set to False for zero retention mode (enterprise only).
pronunciation_dictionary_locators: List of pronunciation dictionary
locators to use.
params: Additional input parameters for voice customization.
.. deprecated:: 0.0.105
Use ``settings=ElevenLabsHttpTTSService.Settings(...)`` instead.
settings: Runtime-updatable settings. When provided alongside deprecated
parameters, ``settings`` values take precedence.
text_aggregation_mode: How to aggregate incoming text before synthesis.
aggregate_sentences: Whether to aggregate sentences within the TTSService.
.. deprecated:: 0.0.104
Use ``text_aggregation_mode`` instead.
**kwargs: Additional arguments passed to the parent service.
"""
# 1. Initialize default_settings with hardcoded defaults
default_settings = self.Settings(
model="eleven_turbo_v2_5",
voice=None,
language=None,
optimize_streaming_latency=None,
stability=None,
similarity_boost=None,
style=None,
use_speaker_boost=None,
speed=None,
apply_text_normalization=None,
)
# 2. Apply direct init arg overrides (deprecated)
if voice_id is not None:
self._warn_init_param_moved_to_settings("voice_id", "voice")
default_settings.voice = voice_id
if model is not None:
self._warn_init_param_moved_to_settings("model", "model")
default_settings.model = model
# 3. Apply params overrides — only if settings not provided
_pronunciation_dictionary_locators = pronunciation_dictionary_locators
if params is not None:
self._warn_init_param_moved_to_settings("params")
if not settings:
if params.language is not None:
default_settings.language = params.language
if params.optimize_streaming_latency is not None:
default_settings.optimize_streaming_latency = params.optimize_streaming_latency
if params.stability is not None:
default_settings.stability = params.stability
if params.similarity_boost is not None:
default_settings.similarity_boost = params.similarity_boost
if params.style is not None:
default_settings.style = params.style
if params.use_speaker_boost is not None:
default_settings.use_speaker_boost = params.use_speaker_boost
if params.speed is not None:
default_settings.speed = params.speed
if params.apply_text_normalization is not None:
default_settings.apply_text_normalization = params.apply_text_normalization
if _pronunciation_dictionary_locators is None:
_pronunciation_dictionary_locators = params.pronunciation_dictionary_locators
# 4. Apply settings delta (canonical API, always wins)
if settings is not None:
default_settings.apply_update(settings)
super().__init__(
text_aggregation_mode=text_aggregation_mode,
aggregate_sentences=aggregate_sentences,
push_text_frames=False,
push_stop_frames=True,
push_start_frame=True,
sample_rate=sample_rate,
settings=default_settings,
**kwargs,
)
self._api_key = api_key
self._base_url = base_url
self._session = aiohttp_session
self._enable_logging = enable_logging
self._output_format = "" # initialized in start()
self._voice_settings = self._set_voice_settings()
self._pronunciation_dictionary_locators = _pronunciation_dictionary_locators
# Track cumulative time to properly sequence word timestamps across utterances
self._cumulative_time = 0
# Store previous text for context within a turn
self._previous_text = ""
# Track partial words that span across alignment chunks
self._partial_word = ""
self._partial_word_start_time = 0.0
[docs]
def language_to_service_language(self, language: Language) -> str | None:
"""Convert pipecat Language to ElevenLabs language code.
Args:
language: The language to convert.
Returns:
The ElevenLabs-specific language code, or None if not supported.
"""
return language_to_elevenlabs_language(language)
[docs]
def can_generate_metrics(self) -> bool:
"""Check if this service can generate processing metrics.
Returns:
True, as ElevenLabs HTTP service supports metrics generation.
"""
return True
def _set_voice_settings(self):
return build_elevenlabs_voice_settings(self._settings)
async def _update_settings(self, delta: TTSSettings) -> dict[str, Any]:
"""Apply a settings delta and rebuild voice settings.
Args:
delta: A :class:`TTSSettings` (or ``ElevenLabsHttpTTSService.Settings``) delta.
Returns:
Dict mapping changed field names to their previous values.
"""
changed = await super()._update_settings(delta)
if changed:
self._voice_settings = self._set_voice_settings()
return changed
def _reset_state(self):
"""Reset internal state variables."""
self._cumulative_time = 0
self._previous_text = ""
self._partial_word = ""
self._partial_word_start_time = 0.0
logger.debug(f"{self}: Reset internal state")
[docs]
async def start(self, frame: StartFrame):
"""Start the ElevenLabs HTTP TTS service.
Args:
frame: The start frame containing initialization parameters.
"""
await super().start(frame)
self._output_format = output_format_from_sample_rate(self.sample_rate)
self._reset_state()
[docs]
async def push_frame(self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM):
"""Push a frame and handle state changes.
Args:
frame: The frame to push.
direction: The direction to push the frame.
"""
await super().push_frame(frame, direction)
if isinstance(frame, (InterruptionFrame, TTSStoppedFrame)):
# Reset timing on interruption or stop
self._reset_state()
elif isinstance(frame, LLMFullResponseEndFrame):
# End of turn - reset previous text
self._previous_text = ""
[docs]
def calculate_word_times(self, alignment_info: Mapping[str, Any]) -> list[tuple[str, float]]:
"""Calculate word timing from character alignment data.
This method handles partial words that may span across multiple alignment chunks.
Args:
alignment_info: Character timing data from ElevenLabs.
Returns:
List of (word, timestamp) pairs for complete words in this chunk.
Example input data::
{
"characters": [" ", "H", "e", "l", "l", "o", " ", "w", "o", "r", "l", "d"],
"character_start_times_seconds": [0.0, 0.1, 0.15, 0.2, 0.25, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9],
"character_end_times_seconds": [0.1, 0.15, 0.2, 0.25, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]
}
Would produce word times (with cumulative_time=0)::
[("Hello", 0.1), ("world", 0.5)]
"""
chars = alignment_info.get("characters", [])
char_start_times = alignment_info.get("character_start_times_seconds", [])
if not chars or not char_start_times or len(chars) != len(char_start_times):
logger.warning(
f"Invalid alignment data: chars={len(chars)}, times={len(char_start_times)}"
)
return []
# Build the words and find their start times
words = []
word_start_times = []
# Start with any partial word from previous chunk
current_word = self._partial_word
word_start_time = self._partial_word_start_time if self._partial_word else None
for i, char in enumerate(chars):
if char == " ":
if current_word: # Only add non-empty words
words.append(current_word)
word_start_times.append(word_start_time)
current_word = ""
word_start_time = None
else:
if word_start_time is None: # First character of a new word
# Use time of the first character of the word, offset by cumulative time
word_start_time = self._cumulative_time + char_start_times[i]
current_word += char
# Store any incomplete word at the end of this chunk
self._partial_word = current_word if current_word else ""
self._partial_word_start_time = word_start_time if word_start_time is not None else 0.0
# Create word-time pairs for complete words only
word_times = list(zip(words, word_start_times))
return word_times
[docs]
@traced_tts
async def run_tts(self, text: str, context_id: str) -> AsyncGenerator[Frame | None, None]:
"""Generate speech from text using ElevenLabs streaming API with timestamps.
Makes a request to the ElevenLabs API to generate audio and timing data.
Tracks the duration of each utterance to ensure correct sequencing.
Includes previous text as context for better prosody continuity.
Args:
text: Text to convert to speech.
context_id: The context ID for tracking audio frames.
Yields:
Frame: Audio and control frames containing the synthesized speech.
"""
logger.debug(f"{self}: Generating TTS [{text}]")
# Use the with-timestamps endpoint
url = f"{self._base_url}/v1/text-to-speech/{self._settings.voice}/stream/with-timestamps"
model_id = assert_given(self._settings.model)
payload: dict[str, Any] = {
"text": text,
"model_id": model_id,
}
# Include previous text as context if available
if self._previous_text:
payload["previous_text"] = self._previous_text
if self._voice_settings:
payload["voice_settings"] = self._voice_settings
if self._pronunciation_dictionary_locators:
payload["pronunciation_dictionary_locators"] = [
locator.model_dump() for locator in self._pronunciation_dictionary_locators
]
apply_text_normalization = assert_given(self._settings.apply_text_normalization)
if apply_text_normalization is not None:
payload["apply_text_normalization"] = apply_text_normalization
language = assert_given(self._settings.language)
if model_id in ELEVENLABS_MULTILINGUAL_MODELS and language:
payload["language_code"] = language
logger.debug(f"Using language code: {language}")
elif language:
logger.warning(
f"Language code [{language}] not applied. Language codes can only be used with multilingual models: {', '.join(sorted(ELEVENLABS_MULTILINGUAL_MODELS))}"
)
headers = {
"xi-api-key": self._api_key,
"Content-Type": "application/json",
}
# Build query parameters
params = {
"output_format": self._output_format,
}
optimize_streaming_latency = assert_given(self._settings.optimize_streaming_latency)
if optimize_streaming_latency is not None:
params["optimize_streaming_latency"] = str(optimize_streaming_latency)
if self._enable_logging is not None:
params["enable_logging"] = str(self._enable_logging).lower()
try:
async with self._session.post(
url, json=payload, headers=headers, params=params
) as response:
if response.status != 200:
error_text = await response.text()
yield ErrorFrame(error=f"ElevenLabs API error: {error_text}")
return
await self.start_tts_usage_metrics(text)
# Track the duration of this utterance based on the last character's end time
utterance_duration = 0
alignment_started = False
async for line in response.content:
line_str = line.decode("utf-8").strip()
if not line_str:
continue
try:
# Parse the JSON object
data = json.loads(line_str)
# Process audio if present
if data and "audio_base64" in data:
await self.stop_ttfb_metrics()
audio = base64.b64decode(data["audio_base64"])
yield TTSAudioRawFrame(
audio, self.sample_rate, 1, context_id=context_id
)
raw_alignment = data and _select_alignment(
data,
normalized_key="normalized_alignment",
alignment_key="alignment",
prefer_normalized=bool(self._pronunciation_dictionary_locators),
)
if raw_alignment:
alignment = _strip_utterance_leading_spaces(
raw_alignment,
(
"characters",
"character_start_times_seconds",
"character_end_times_seconds",
),
not alignment_started,
)
alignment_started = True
# Get end time of the last character in this chunk
char_end_times = alignment.get("character_end_times_seconds", [])
if char_end_times:
chunk_end_time = char_end_times[-1]
# Update to the longest end time seen so far
utterance_duration = max(utterance_duration, chunk_end_time)
# Calculate word timestamps
word_times = self.calculate_word_times(alignment)
if word_times:
await self.add_word_timestamps(
word_times,
context_id,
includes_inter_frame_spaces=(
True
if _word_timestamps_include_inter_frame_spaces(
assert_given(self._settings.language)
)
else None
),
)
except json.JSONDecodeError as e:
logger.warning(f"Failed to parse JSON from stream: {e}")
continue
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
continue
# After processing all chunks, emit any remaining partial word
# since this is the end of the utterance
if self._partial_word:
final_word_time = [(self._partial_word, self._partial_word_start_time)]
await self.add_word_timestamps(
final_word_time,
context_id,
includes_inter_frame_spaces=(
True
if _word_timestamps_include_inter_frame_spaces(
assert_given(self._settings.language)
)
else None
),
)
self._partial_word = ""
self._partial_word_start_time = 0.0
# After processing all chunks, add the total utterance duration
# to the cumulative time to ensure next utterance starts after this one
if utterance_duration > 0:
self._cumulative_time += utterance_duration
# Append the current text to previous_text for context continuity
# Only add a space if there's already text
if self._previous_text:
self._previous_text += " " + text
else:
self._previous_text = text
except Exception as e:
yield ErrorFrame(error=f"Unknown error occurred: {e}")
finally:
await self.stop_ttfb_metrics()