utils

AWS utility functions for Pipecat services.

This module provides shared credential resolution and AWS Transcribe utilities for creating presigned URLs, building event messages, and handling AWS event stream protocol for real-time transcription services.

class pipecat.services.aws.utils.AWSCredentials(access_key: str | None, secret_key: str | None, session_token: str | None, region: str)[source]

Bases: object

Resolved AWS credentials ready for use by any AWS service.

access_key: str | None
secret_key: str | None
session_token: str | None
region: str
to_boto_kwargs() dict[str, str | None][source]

Return credentials as kwargs accepted by boto3/aioboto3 clients.

pipecat.services.aws.utils.resolve_credentials(*, aws_access_key_id: str | None = None, aws_secret_access_key: str | None = None, aws_session_token: str | None = None, region: str | None = None) AWSCredentials[source]

Resolve AWS credentials using the standard fallback chain.

Resolution order: 1. Explicit parameters 2. Environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY,

AWS_SESSION_TOKEN, AWS_REGION)

  1. Default boto3/botocore credential chain (instance profiles, IRSA, ECS task roles, SSO, credential files, etc.)

The boto3 fallback (step 3) is only attempted when both access key and secret key are still unresolved after steps 1-2. This avoids replacing explicitly provided credentials with ambient ones.

Parameters:
  • aws_access_key_id – Explicit access key ID.

  • aws_secret_access_key – Explicit secret access key.

  • aws_session_token – Explicit session token.

  • region – Explicit AWS region.

Returns:

An AWSCredentials instance. access_key and secret_key may still be None if no credentials could be resolved (the caller should raise an appropriate error).

pipecat.services.aws.utils.get_presigned_url(*, region: str, credentials: dict[str, str | None], language_code: str, media_encoding: str = 'pcm', sample_rate: int = 16000, number_of_channels: int = 1, enable_partial_results_stabilization: bool = True, partial_results_stability: str = 'high', vocabulary_name: str | None = None, vocabulary_filter_name: str | None = None, show_speaker_label: bool = False, enable_channel_identification: bool = False) str[source]

Create a presigned URL for AWS Transcribe streaming.

Parameters:
  • region – AWS region for the service.

  • credentials – Dictionary containing AWS credentials. Must include ‘access_key’ and ‘secret_key’, with optional ‘session_token’.

  • language_code – Language code for transcription (e.g., “en-US”).

  • media_encoding – Audio encoding format. Defaults to “pcm”.

  • sample_rate – Audio sample rate in Hz. Defaults to 16000.

  • number_of_channels – Number of audio channels. Defaults to 1.

  • enable_partial_results_stabilization – Whether to enable partial result stabilization.

  • partial_results_stability – Stability level for partial results.

  • vocabulary_name – Custom vocabulary name to use.

  • vocabulary_filter_name – Vocabulary filter name to apply.

  • show_speaker_label – Whether to include speaker labels.

  • enable_channel_identification – Whether to enable channel identification.

Returns:

Presigned WebSocket URL for AWS Transcribe streaming.

Raises:

ValueError – If required AWS credentials are missing.

class pipecat.services.aws.utils.AWSTranscribePresignedURL(access_key: str, secret_key: str, session_token: str | None, region: str = 'us-east-1')[source]

Bases: object

Generator for AWS Transcribe presigned WebSocket URLs.

Handles AWS Signature Version 4 signing process to create authenticated WebSocket URLs for streaming transcription requests.

__init__(access_key: str, secret_key: str, session_token: str | None, region: str = 'us-east-1')[source]

Initialize the presigned URL generator.

Parameters:
  • access_key – AWS access key ID.

  • secret_key – AWS secret access key.

  • session_token – AWS session token for temporary credentials (optional).

  • region – AWS region for the service. Defaults to “us-east-1”.

get_request_url(sample_rate: int, language_code: str = '', media_encoding: str = 'pcm', vocabulary_name: str | None = None, vocabulary_filter_name: str | None = None, show_speaker_label: bool = False, enable_channel_identification: bool = False, number_of_channels: int = 1, enable_partial_results_stabilization: bool = False, partial_results_stability: str = '') str[source]

Generate a presigned WebSocket URL for AWS Transcribe.

Parameters:
  • sample_rate – Audio sample rate in Hz.

  • language_code – Language code for transcription.

  • media_encoding – Audio encoding format.

  • vocabulary_name – Custom vocabulary name.

  • vocabulary_filter_name – Vocabulary filter name.

  • show_speaker_label – Whether to include speaker labels.

  • enable_channel_identification – Whether to enable channel identification.

  • number_of_channels – Number of audio channels.

  • enable_partial_results_stabilization – Whether to enable partial result stabilization.

  • partial_results_stability – Stability level for partial results.

Returns:

Presigned WebSocket URL with authentication parameters.

pipecat.services.aws.utils.get_headers(header_name: str, header_value: str) bytearray[source]

Build a header following AWS event stream format.

Parameters:
  • header_name – Name of the header.

  • header_value – Value of the header.

Returns:

Encoded header as a bytearray following AWS event stream protocol.

pipecat.services.aws.utils.build_event_message(payload: bytes) bytes[source]

Build an event message for AWS Transcribe streaming.

Creates a properly formatted AWS event stream message containing audio data for real-time transcription. Follows the AWS event stream protocol with prelude, headers, payload, and CRC checksums.

Parameters:

payload – Raw audio bytes to include in the event message.

Returns:

Complete event message as bytes, ready to send via WebSocket.

pipecat.services.aws.utils.decode_event(message)[source]

Decode an AWS event stream message.

Parses an AWS event stream message to extract headers and payload, verifying CRC checksums for data integrity.

Parameters:

message – Raw event stream message bytes received from AWS.

Returns:

  • headers: Dictionary of parsed headers

  • payload: Dictionary of parsed JSON payload

Return type:

A tuple of (headers, payload) where

Raises:

AssertionError – If CRC checksum verification fails.