flows

Pipecat Flows - Structured conversation framework for Pipecat.

This package provides a framework for building structured conversations in Pipecat. The FlowManager handles conversation flows with support for state management, function calling, and cross-provider compatibility.

Pipecat Flows determines conversation structure at runtime, supporting function calling, action execution, and seamless transitions between conversation states.

class pipecat.flows.FlowManager(*, llm: LLMService | LLMSwitcher, context_aggregator: Any, worker: PipelineWorker | None = None, task: PipelineWorker | None = None, context_strategy: ContextStrategyConfig | None = None, transport: BaseTransport | None = None, global_functions: list[FlowsFunctionSchema | Callable[[...], Awaitable[tuple[Any, NodeConfig | None]]]] | None = None)[source]

Bases: object

Manages conversation flows.

The FlowManager orchestrates conversation flows by managing state transitions, function registration, and message handling across different LLM providers, with comprehensive action handling and error management.

The manager coordinates all aspects of a conversation including LLM context management, function registration, state transitions, and action execution.

__init__(*, llm: LLMService | LLMSwitcher, context_aggregator: Any, worker: PipelineWorker | None = None, task: PipelineWorker | None = None, context_strategy: ContextStrategyConfig | None = None, transport: BaseTransport | None = None, global_functions: list[FlowsFunctionSchema | Callable[[...], Awaitable[tuple[Any, NodeConfig | None]]]] | None = None)[source]

Initialize the flow manager.

Parameters:
  • llm – LLM service or LLMSwitcher.

  • context_aggregator – Context aggregator for updating user context.

  • worker – PipelineWorker instance for queueing frames.

  • task

    PipelineWorker instance for queueing frames.

    Deprecated since version 1.5.0: Use worker instead. Will be removed in 2.0.0.

  • context_strategy – Context strategy configuration for managing conversation context during transitions.

  • transport – Transport instance for communication.

  • global_functions – Optional list of FlowsFunctionSchemas or FlowsDirectFunctions that will be available at every node. These functions are registered once during initialization and automatically included alongside node-specific functions.

property state: dict[str, Any]

Access the shared state dictionary across nodes.

This property provides access to a persistent dictionary that maintains data across node transitions. It can be used to store and retrieve conversation state, user preferences, or any other data that needs to persist throughout the flow.

Returns:

The shared state dictionary that can be used for

reading and writing state data.

Return type:

Dict[str, Any]

Examples

Setting state:

flow_manager.state["user_name"] = "Alice"
flow_manager.state["age"] = 25

Getting state:

name = flow_manager.state.get("user_name", "Unknown")
age = flow_manager.state["age"]

Checking for state:

if "user_preferences" in flow_manager.state:
    preferences = flow_manager.state["user_preferences"]
property transport: BaseTransport | None

Access the transport instance used for communication.

This property provides access to the transport instance that handles communication with the client (e.g., DailyTransport for Daily rooms). The transport can be used to interact with participants, manage audio/video settings, or access platform-specific features.

Returns:

The transport instance if provided during

initialization, None otherwise.

Return type:

Optional[BaseTransport]

Examples

Accessing transport in action handlers:

async def mute_participant(action: dict, flow_manager: FlowManager):
    transport = flow_manager.transport
    if transport and hasattr(transport, 'update_participant'):
        await transport.update_participant(participant_id, {"canSnd": False})

Working with Daily transport features:

async def get_room_info(action: dict, flow_manager: FlowManager):
    transport = flow_manager.transport
    if isinstance(transport, DailyTransport):
        participants = transport.participants()
        return {"participant_count": len(participants)}
property current_node: str | None

Access the identifier of the currently active conversation node.

This property provides access to the current node name/identifier in the conversation flow. It can be used to make decisions based on the current state of the conversation, implement conditional logic, or for debugging and logging purposes.

Returns:

The identifier of the current node if a node is active,

None if no node has been set or before initialization.

Return type:

Optional[str]

Examples

Conditional logic based on current node:

async def participant_joined(action: dict, flow_manager: FlowManager):
    current = flow_manager.current_node
    if current == "transferring_to_human_agent":
        await start_human_agent_interaction(flow_manager)
    elif current == "collecting_payment":
        await setup_secure_session(flow_manager)

Logging and debugging:

async def log_conversation_state(action: dict, flow_manager: FlowManager):
    node = flow_manager.current_node
    logger.info(f"Current conversation node: {node}")
    return {"current_node": node}
property worker: PipelineWorker

Access the pipeline worker instance for frame queueing.

This property provides access to the PipelineWorker instance used by the FlowManager. The worker can be used to queue custom frames directly into the pipeline, enabling advanced flow control and custom frame injection.

Returns:

The pipeline worker instance used for frame processing

and queueing operations.

Return type:

PipelineWorker

Examples

Queueing frames in handlers:

async def send_custom_notification(action: dict, flow_manager: FlowManager):
    from pipecat.frames.frames import TTSUpdateSettingsFrame

    # Queue a TTS settings update frame
    await flow_manager.worker.queue_frame(
        TTSUpdateSettingsFrame(settings={"voice": "your-new-voice-id"})
    )
property task: PipelineWorker

Access the pipeline worker instance for frame queueing.

Deprecated since version 1.5.0: Use worker instead. Will be removed in 2.0.0.

Returns:

The pipeline worker instance used for frame processing

and queueing operations.

Return type:

PipelineWorker

async initialize(initial_node: NodeConfig | None = None) None[source]

Initialize the flow manager.

Parameters:

initial_node – Optional initial node configuration. If provided, the flow will start at this node immediately.

Raises:

FlowInitializationError – If initialization fails.

Examples

Initialize with an initial node:

flow_manager = FlowManager(
    ... # Initialization parameters
)
await flow_manager.initialize(create_initial_node())

Initialize without an initial node (set later via set_node_from_config):

flow_manager = FlowManager(
    ... # Initialization parameters
)
await flow_manager.initialize()
get_current_context() list[dict][source]

Get the current conversation context.

Returns:

List of messages in the current context, including system messages, user messages, and assistant responses.

Raises:

FlowError – If context aggregator is not available.

register_action(action_type: str, handler: Callable) None[source]

Register a handler for a specific action type.

Parameters:
  • action_type – String identifier for the action (e.g., “tts_say”).

  • handler – Async or sync function that handles the action.

Example:

async def custom_notification(action: dict):
    text = action.get("text", "")
    await notify_user(text)

flow_manager.register_action("notify", custom_notification)
async set_node_from_config(node_config: NodeConfig) None[source]

Set up a new conversation node and transition to it.

Used to manually transition between nodes in a flow.

Parameters:

node_config – Configuration for the new node.

Raises:
class pipecat.flows.ActionConfig[source]

Bases: TypedDict

Configuration for an action.

Parameters:
  • type – Action type identifier (e.g. “tts_say”, “notify_slack”).

  • handler – Callable to handle the action.

  • text – Text to speak for the “tts_say” action, or the optional goodbye message for the “end_conversation” action.

  • append_text_to_context – For the built-in TTS actions (“tts_say” and “end_conversation”), whether the spoken text is appended to the LLM context. Defaults to True.

Note

Additional fields are allowed and passed to the handler.

type: Required[str]
handler: Callable[[dict[str, Any]], Awaitable[None]] | Callable[[dict[str, Any], FlowManager], Awaitable[None]]
text: str
append_text_to_context: bool
class pipecat.flows.ContextStrategy(*values)[source]

Bases: Enum

Strategy for managing context during node transitions.

Parameters:
  • APPEND – Append new messages to existing context (default).

  • RESET – Reset context with new messages only.

  • RESET_WITH_SUMMARY

    Reset context but include an LLM-generated summary.

    Deprecated since version 1.5.0: Use LLMSummarizeContextFrame instead — push it in a pre-action to trigger on-demand summarization during a node transition. See https://docs.pipecat.ai/guides/fundamentals/context-summarization. Will be removed in 2.0.0.

APPEND = 'append'
RESET = 'reset'
RESET_WITH_SUMMARY = 'reset_with_summary'
class pipecat.flows.ContextStrategyConfig(strategy: ContextStrategy, summary_prompt: str | None = None)[source]

Bases: object

Configuration for context management.

Parameters:
  • strategy – Strategy to use for context management.

  • summary_prompt

    Required prompt text when using RESET_WITH_SUMMARY.

    Deprecated since version 1.5.0: Use LLMContextSummaryConfig.summarization_prompt instead. Deprecated together with RESET_WITH_SUMMARY. Will be removed in 2.0.0.

strategy: ContextStrategy
summary_prompt: str | None = None
class pipecat.flows.FlowResult(**kwargs)[source]

Bases: TypedDict

Optional convention TypedDict for status/error results.

Deprecated since version 1.5.0: No replacement. FlowResult is no longer required or referenced by any handler type, and Pipecat’s upstream function-call-result contract is Any — define your own TypedDict or return any JSON-serializable value. Will be removed in 2.0.0.

Parameters:
  • status – Status of the function execution.

  • error – Optional error message if execution failed.

status: str
error: str
class pipecat.flows.FlowsFunctionSchema(name: str, description: str, properties: dict[str, Any], required: list[str], handler: Callable[[], Awaitable[Any]] | Callable[[dict[str, Any]], Awaitable[Any]] | Callable[[dict[str, Any], FlowManager], Awaitable[Any]], cancel_on_interruption: bool = False, timeout_secs: float | None = None)[source]

Bases: object

Function schema with Flows-specific properties.

This class extends a standard function schema with the Flows-specific handler that runs when the function is called, plus its call options.

Parameters:
  • name – Name of the function.

  • description – Description of the function.

  • properties – Dictionary defining parameter types and descriptions.

  • required – List of required parameter names.

  • handler – Function handler to process the function call.

  • cancel_on_interruption – Whether to cancel this function call when an interruption occurs. Defaults to False.

  • timeout_secs – Optional per-tool timeout in seconds, overriding the global function_call_timeout_secs. Defaults to None (use global timeout).

name: str
description: str
properties: dict[str, Any]
required: list[str]
handler: Callable[[], Awaitable[Any]] | Callable[[dict[str, Any]], Awaitable[Any]] | Callable[[dict[str, Any], FlowManager], Awaitable[Any]]
cancel_on_interruption: bool = False
timeout_secs: float | None = None
to_function_schema() FunctionSchema[source]

Convert to a standard FunctionSchema for use with LLMs.

Returns:

FunctionSchema without flow-specific fields.

class pipecat.flows.NodeConfig[source]

Bases: TypedDict

Configuration for a single node in the flow.

Parameters:
  • task_messages – List of message dicts defining the current node’s objectives.

  • name – Name of the node, useful for debug logging when returning a next node from a “consolidated” function.

  • role_message – The bot’s role/personality as a plain string, sent as the LLM’s system instruction via LLMUpdateSettingsFrame. When provided, the system instruction persists across node transitions until a new node explicitly sets role_message again.

  • role_messages

    Deprecated list-of-dicts format for the bot’s role/personality.

    Deprecated since version 1.5.0: Use role_message (str) instead. Will be removed in 2.0.0.

  • functions – List of FlowsFunctionSchema definitions or direct functions whose definitions are automatically extracted from their signatures.

  • pre_actions – Actions to execute before LLM inference.

  • post_actions – Actions to execute after LLM inference.

  • context_strategy – Strategy for updating context during transitions.

  • respond_immediately – Whether to run LLM inference as soon as the node is set (default: True).

Example:

{
    "role_message": "You are a helpful assistant...",
    "task_messages": [
        {
            "role": "developer",
            "content": "Ask the user for their name..."
        }
    ],
    "functions": [...],
    "pre_actions": [...],
    "post_actions": [...],
    "context_strategy": ContextStrategyConfig(strategy=ContextStrategy.APPEND),
    "respond_immediately": true,
}
task_messages: Required[list[dict]]
name: str
role_message: str
role_messages: list[dict[str, Any]]
functions: list[FlowsFunctionSchema | Callable[[...], Awaitable[tuple[Any, NodeConfig | None]]]]
pre_actions: list[ActionConfig]
post_actions: list[ActionConfig]
context_strategy: ContextStrategyConfig
respond_immediately: bool
pipecat.flows.flows_tool_options(*, cancel_on_interruption: bool = False, timeout_secs: float | None = None) Callable[[Callable], Callable][source]

Configure a Flows direct function’s call options.

This decorator is optional; use it to override the defaults for a Flows direct function (an async function whose first parameter is flow_manager).

Parameters:
  • cancel_on_interruption – Whether to cancel the function call when the user interrupts. Defaults to False.

  • timeout_secs – Optional per-tool timeout in seconds, overriding the global function_call_timeout_secs. Defaults to None (use global timeout).

Returns:

A decorator that attaches the metadata to the function.

Example:

@flows_tool_options(cancel_on_interruption=False, timeout_secs=30)
async def long_running_task(flow_manager: FlowManager, query: str):
    '''Perform a long-running task that should not be cancelled on interruption.'''
    # ... implementation
    return {"status": "complete"}, None
pipecat.flows.flows_direct_function(*, cancel_on_interruption: bool = False, timeout_secs: float | None = None) Callable[[Callable], Callable][source]

Configure a Flows direct function’s call options.

Deprecated since version 1.5.0: Renamed to flows_tool_options() to align with Pipecat’s @tool_options and make clearer that it configures call options. Will be removed in 2.0.0.

Parameters:
  • cancel_on_interruption – Whether to cancel the function call when the user interrupts. Defaults to False.

  • timeout_secs – Optional per-tool timeout in seconds, overriding the global function_call_timeout_secs. Defaults to None (use global timeout).

Returns:

A decorator that attaches the metadata to the function.

exception pipecat.flows.FlowError[source]

Bases: Exception

Base exception for all flow-related errors.

This is the parent class for all flow system exceptions. Use this for generic flow errors or when a more specific exception doesn’t apply.

exception pipecat.flows.FlowInitializationError[source]

Bases: FlowError

Raised when flow initialization fails.

This exception occurs during flow manager setup, typically due to invalid configuration, missing dependencies, or initialization errors.

exception pipecat.flows.FlowTransitionError[source]

Bases: FlowError

Raised when a state transition fails.

This exception occurs when transitioning between nodes fails due to invalid node configurations, missing target nodes, or transition errors.

exception pipecat.flows.InvalidFunctionError[source]

Bases: FlowError

Raised when an invalid or unavailable function is called.

This exception occurs when attempting to call functions that are not properly registered, have invalid signatures, or cannot be found.

exception pipecat.flows.ActionError[source]

Bases: FlowError

Raised when an action execution fails.

This exception occurs during action execution, including built-in actions like TTS or custom actions, due to invalid configuration or execution errors.

Submodules