Source code for pipecat.bus.serializers.json

#
# Copyright (c) 2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

"""JSON-based bus message serializer with pluggable type adapters."""

import base64
import dataclasses
import importlib
import json
from enum import Enum
from functools import cache
from typing import Any

from loguru import logger
from pydantic import BaseModel

from pipecat.bus.adapters.base import TypeAdapter
from pipecat.bus.messages import BusMessage
from pipecat.bus.serializers.base import MessageSerializer

# JSON-native types that don't need an adapter.
_JSON_NATIVE = (str, int, float, bool, type(None))


[docs] class JSONMessageSerializer(MessageSerializer): """Serialize bus messages as JSON with pluggable type adapters. Handles JSON-native types, enums, bytes, dataclasses, and any type with a registered ``TypeAdapter`` (e.g. ``LLMContext``, ``ToolsSchema``). Adapters for common Pipecat types are registered by default. Additional type adapters can be registered via ``register_adapter()``. Example:: serializer = JSONMessageSerializer() data = serializer.serialize(message) restored = serializer.deserialize(data) """
[docs] def __init__(self): """Create a serializer with default adapters for `LLMContext` and `ToolsSchema`.""" from pipecat.adapters.schemas.tools_schema import ToolsSchema from pipecat.bus.adapters import LLMContextAdapter, ToolsSchemaAdapter from pipecat.processors.aggregators.llm_context import LLMContext self._adapters: dict[type, TypeAdapter] = { LLMContext: LLMContextAdapter(), ToolsSchema: ToolsSchemaAdapter(), }
[docs] def register_adapter(self, type_: type, adapter: TypeAdapter) -> None: """Register a type adapter. Args: type_: The type to handle. adapter: The adapter that serializes/deserializes instances of this type. """ self._adapters[type_] = adapter
[docs] def serialize(self, message: BusMessage) -> bytes: """Convert a bus message to JSON bytes. Args: message: The bus message to serialize. Returns: UTF-8 encoded JSON bytes. """ data = self._serialize_value(message) return json.dumps(data, separators=(",", ":")).encode("utf-8")
[docs] def deserialize(self, data: bytes) -> BusMessage | None: """Reconstruct a bus message from JSON bytes. Args: data: The JSON bytes produced by `serialize()`. Returns: The reconstructed `BusMessage`, or None if deserialization fails. """ payload = json.loads(data) return self._deserialize_value(payload)
def _serialize_value(self, value: Any) -> Any: """Recursively serialize a value to a JSON-compatible representation.""" if isinstance(value, _JSON_NATIVE): return value if isinstance(value, Enum): return { "__type__": f"{type(value).__module__}.{type(value).__name__}", "__data__": value.name, } if isinstance(value, dict): return {k: self._serialize_value(v) for k, v in value.items()} if isinstance(value, list): return [self._serialize_value(v) for v in value] if isinstance(value, bytes): return {"__type__": "bytes", "__data__": base64.b64encode(value).decode("ascii")} if isinstance(value, BaseModel): return { "__type__": f"{type(value).__module__}.{type(value).__name__}", "__data__": { k: self._serialize_value(v) for k, v in value.__dict__.items() if v is not None }, } if callable(value): return None adapter = self._find_adapter(type(value)) if adapter is not None: return { "__type__": f"{type(value).__module__}.{type(value).__name__}", "__data__": adapter.serialize(value, self._serialize_value), } if dataclasses.is_dataclass(value) and not isinstance(value, type): fields = {} for f in dataclasses.fields(value): v = getattr(value, f.name) if v is None: continue serialized = self._serialize_value(v) if serialized is not None: fields[f.name] = serialized return { "__type__": f"{type(value).__module__}.{type(value).__name__}", "__data__": fields, } logger.warning( f"JSONMessageSerializer: skipping field with unserializable type {type(value).__name__}" ) return None def _deserialize_value(self, value: Any) -> Any: """Recursively deserialize a value from its JSON representation.""" if isinstance(value, _JSON_NATIVE): return value if isinstance(value, list): return [self._deserialize_value(v) for v in value] if isinstance(value, dict): if "__type__" in value and "__data__" in value: return self._deserialize_typed(value["__type__"], value["__data__"]) return {k: self._deserialize_value(v) for k, v in value.items()} return value def _deserialize_typed(self, type_name: str, data: Any) -> Any: """Deserialize a tagged value using its fully qualified type name.""" if type_name == "bytes": return base64.b64decode(data) cls = _resolve_type(type_name) if cls is None: logger.warning(f"JSONMessageSerializer: could not resolve type {type_name}") return None if issubclass(cls, Enum): return cls[data] adapter = self._find_adapter(cls) if adapter is not None: return adapter.deserialize(data, self._deserialize_value, target_type=cls) if isinstance(data, dict) and issubclass(cls, BaseModel): return cls.model_validate({k: self._deserialize_value(v) for k, v in data.items()}) if dataclasses.is_dataclass(cls) and isinstance(data, dict): init_fields = {f.name: f for f in dataclasses.fields(cls) if f.init} init_kwargs = {} post_init = {} for key, value in data.items(): deserialized = self._deserialize_value(value) if key in init_fields: init_kwargs[key] = deserialized else: post_init[key] = deserialized for name, f in init_fields.items(): if name not in init_kwargs: if ( f.default is dataclasses.MISSING and f.default_factory is dataclasses.MISSING ): init_kwargs[name] = None obj = cls(**init_kwargs) for key, value in post_init.items(): setattr(obj, key, value) return obj logger.warning(f"JSONMessageSerializer: no adapter registered for type {type_name}") return None def _find_adapter(self, type_: type) -> TypeAdapter | None: """Find an adapter for a type, checking parent classes via MRO.""" for cls in type_.__mro__: if cls in self._adapters: return self._adapters[cls] return None
@cache def _resolve_type(qualified_name: str) -> type | None: """Resolve a fully qualified type name to its class. Args: qualified_name: Dotted path like ``"pipecat.frames.frames.TextFrame"``. Returns: The resolved class, or None if it cannot be found. """ module_path, _, class_name = qualified_name.rpartition(".") if not module_path: return None try: module = importlib.import_module(module_path) return getattr(module, class_name, None) except ImportError: return None