Source code for pipecat.adapters.services.aws_nova_sonic_adapter

#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""AWS Nova Sonic LLM adapter for Pipecat."""

import copy
import json
from dataclasses import asdict, dataclass
from enum import Enum
from typing import Any, TypedDict, cast

from loguru import logger

from pipecat.adapters.base_llm_adapter import BaseLLMAdapter
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.processors.aggregators.llm_context import LLMContext, LLMContextMessage


[docs] class Role(Enum): """Roles supported in AWS Nova Sonic conversations. Parameters: SYSTEM: System-level messages (not used in conversation history). USER: Messages sent by the user. ASSISTANT: Messages sent by the assistant. TOOL: Messages sent by tools (not used in conversation history). """ SYSTEM = "SYSTEM" USER = "USER" ASSISTANT = "ASSISTANT" TOOL = "TOOL"
[docs] @dataclass class AWSNovaSonicConversationHistoryMessage: """A single message in AWS Nova Sonic conversation history. Parameters: role: The role of the message sender (USER or ASSISTANT only). text: The text content of the message. """ role: Role # only USER and ASSISTANT text: str
[docs] class AWSNovaSonicLLMInvocationParams(TypedDict): """Context-based parameters for invoking AWS Nova Sonic LLM API. This is a placeholder until support for universal LLMContext machinery is added for AWS Nova Sonic. """ system_instruction: str | None messages: list[AWSNovaSonicConversationHistoryMessage] tools: list[dict[str, Any]]
[docs] class AWSNovaSonicLLMAdapter(BaseLLMAdapter[AWSNovaSonicLLMInvocationParams]): """Adapter for AWS Nova Sonic language models. Converts Pipecat's standard function schemas into AWS Nova Sonic's specific function-calling format, enabling tool use with Nova Sonic models. """ @property def id_for_llm_specific_messages(self) -> str: """Get the identifier used in LLMSpecificMessage instances for AWS Nova Sonic.""" return "aws-nova-sonic"
[docs] def get_llm_invocation_params( self, context: LLMContext, *, system_instruction: str | None = None ) -> AWSNovaSonicLLMInvocationParams: """Get AWS Nova Sonic-specific LLM invocation parameters from a universal LLM context. Args: context: The LLM context containing messages, tools, etc. system_instruction: Optional system instruction from service settings. Returns: Dictionary of parameters for invoking AWS Nova Sonic's LLM API. """ messages = self._from_universal_context_messages(self.get_messages(context)) effective_system = self._resolve_system_instruction( messages.system_instruction, system_instruction, discard_context_system=True, ) return { "system_instruction": effective_system, "messages": messages.messages, # NOTE: LLMContext's tools are guaranteed to be a ToolsSchema (or NOT_GIVEN) "tools": self.from_standard_tools(context.tools) or [], }
[docs] def get_messages_for_logging(self, context) -> list[dict[str, Any]]: """Get messages from a universal LLM context in a format ready for logging about AWS Nova Sonic. Removes or truncates sensitive data like image content for safe logging. This is a placeholder until support for universal LLMContext machinery is added for AWS Nova Sonic. Args: context: The LLM context containing messages. Returns: List of messages in a format ready for logging about AWS Nova Sonic. """ return [ asdict(m) for m in self._from_universal_context_messages(self.get_messages(context)).messages ]
[docs] @dataclass class ConvertedMessages: """Container for Google-formatted messages converted from universal context.""" messages: list[AWSNovaSonicConversationHistoryMessage] system_instruction: str | None = None
def _from_universal_context_messages( self, universal_context_messages: list[LLMContextMessage] ) -> ConvertedMessages: system_instruction = None messages: list[AWSNovaSonicConversationHistoryMessage] = [] # Bail if there are no messages if not universal_context_messages: return self.ConvertedMessages(messages=[]) # NOTE: This adapter does not yet handle ``LLMSpecificMessage`` — # those are filtered out below (the role-extraction and conversion # logic only applies to standard message dicts). If/when this # adapter grows a per-provider passthrough like the Anthropic # adapter has, LLMSpecific items can flow through. ucm: list[dict[str, Any]] = [ cast(dict[str, Any], m) for m in copy.deepcopy(universal_context_messages) if isinstance(m, dict) ] # If we have a "system" message as our first message, # pull that out into "instruction" if ucm and ucm[0].get("role") == "system": system = ucm.pop(0) content = system.get("content") if isinstance(content, str): system_instruction = content elif isinstance(content, list): system_instruction = content[0].get("text") if system_instruction: self._system_instruction = system_instruction # Convert any remaining "system"/"developer" messages to "user", # as Nova Sonic only supports "user" and "assistant" in history. for msg in ucm: if msg.get("role") in ("system", "developer"): msg["role"] = "user" # Process remaining messages to fill out conversation history. for universal_context_message in ucm: message = self._from_universal_context_message(universal_context_message) if message: messages.append(message) return self.ConvertedMessages(messages=messages, system_instruction=system_instruction) def _from_universal_context_message( self, message: dict[str, Any] ) -> AWSNovaSonicConversationHistoryMessage | None: """Convert standard message format to Nova Sonic format. Args: message: Standard message dictionary to convert. Returns: Nova Sonic conversation history message, or None if not convertible. """ role = message.get("role") if role == "user" or role == "assistant": content = message.get("content") if isinstance(content, list): text_parts = [] for c in content: if c.get("type") == "text": text_parts.append(c.get("text")) else: logger.error( f"Unhandled content type in context message: {c.get('type')} - {message}" ) content = " ".join(t for t in text_parts if t) # There won't be content if this is an assistant tool call entry. # We're ignoring those since they can't be loaded into AWS Nova Sonic conversation # history if content: return AWSNovaSonicConversationHistoryMessage(role=Role[role.upper()], text=content) # NOTE: we're ignoring messages with role "tool" since they can't be loaded into AWS Nova # Sonic conversation history @staticmethod def _to_aws_nova_sonic_function_format(function: FunctionSchema) -> dict[str, Any]: """Convert a function schema to AWS Nova Sonic format. Args: function: The function schema to convert. Returns: Dictionary in AWS Nova Sonic function format with toolSpec structure. """ return { "toolSpec": { "name": function.name, "description": function.description, "inputSchema": { "json": json.dumps( { "type": "object", "properties": function.properties, "required": function.required, } ) }, } }
[docs] def to_provider_tools_format(self, tools_schema: ToolsSchema) -> list[dict[str, Any]]: """Convert tools schema to AWS Nova Sonic function-calling format. Args: tools_schema: The tools schema containing function definitions to convert. Returns: List of dictionaries in AWS Nova Sonic function format. """ functions_schema = tools_schema.standard_tools return [self._to_aws_nova_sonic_function_format(func) for func in functions_schema]