flows
Pipecat Flows - Structured conversation framework for Pipecat.
This package provides a framework for building structured conversations in Pipecat. The FlowManager handles conversation flows with support for state management, function calling, and cross-provider compatibility.
Pipecat Flows determines conversation structure at runtime, supporting function calling, action execution, and seamless transitions between conversation states.
- class pipecat.flows.FlowManager(*, llm: LLMService | LLMSwitcher, context_aggregator: Any, worker: PipelineWorker | None = None, task: PipelineWorker | None = None, context_strategy: ContextStrategyConfig | None = None, transport: BaseTransport | None = None, global_functions: list[FlowsFunctionSchema | Callable[[...], Awaitable[tuple[Any, NodeConfig | None]]]] | None = None)[source]
Bases:
objectManages conversation flows.
The FlowManager orchestrates conversation flows by managing state transitions, function registration, and message handling across different LLM providers, with comprehensive action handling and error management.
The manager coordinates all aspects of a conversation including LLM context management, function registration, state transitions, and action execution.
- __init__(*, llm: LLMService | LLMSwitcher, context_aggregator: Any, worker: PipelineWorker | None = None, task: PipelineWorker | None = None, context_strategy: ContextStrategyConfig | None = None, transport: BaseTransport | None = None, global_functions: list[FlowsFunctionSchema | Callable[[...], Awaitable[tuple[Any, NodeConfig | None]]]] | None = None)[source]
Initialize the flow manager.
- Parameters:
llm – LLM service or LLMSwitcher.
context_aggregator – Context aggregator for updating user context.
worker – PipelineWorker instance for queueing frames.
task –
PipelineWorker instance for queueing frames.
Deprecated since version 1.5.0: Use
workerinstead. Will be removed in 2.0.0.context_strategy – Context strategy configuration for managing conversation context during transitions.
transport – Transport instance for communication.
global_functions – Optional list of FlowsFunctionSchemas or FlowsDirectFunctions that will be available at every node. These functions are registered once during initialization and automatically included alongside node-specific functions.
- property state: dict[str, Any]
Access the shared state dictionary across nodes.
This property provides access to a persistent dictionary that maintains data across node transitions. It can be used to store and retrieve conversation state, user preferences, or any other data that needs to persist throughout the flow.
- Returns:
- The shared state dictionary that can be used for
reading and writing state data.
- Return type:
Dict[str, Any]
Examples
Setting state:
flow_manager.state["user_name"] = "Alice" flow_manager.state["age"] = 25
Getting state:
name = flow_manager.state.get("user_name", "Unknown") age = flow_manager.state["age"]
Checking for state:
if "user_preferences" in flow_manager.state: preferences = flow_manager.state["user_preferences"]
- property transport: BaseTransport | None
Access the transport instance used for communication.
This property provides access to the transport instance that handles communication with the client (e.g., DailyTransport for Daily rooms). The transport can be used to interact with participants, manage audio/video settings, or access platform-specific features.
- Returns:
- The transport instance if provided during
initialization, None otherwise.
- Return type:
Optional[BaseTransport]
Examples
Accessing transport in action handlers:
async def mute_participant(action: dict, flow_manager: FlowManager): transport = flow_manager.transport if transport and hasattr(transport, 'update_participant'): await transport.update_participant(participant_id, {"canSnd": False})
Working with Daily transport features:
async def get_room_info(action: dict, flow_manager: FlowManager): transport = flow_manager.transport if isinstance(transport, DailyTransport): participants = transport.participants() return {"participant_count": len(participants)}
- property current_node: str | None
Access the identifier of the currently active conversation node.
This property provides access to the current node name/identifier in the conversation flow. It can be used to make decisions based on the current state of the conversation, implement conditional logic, or for debugging and logging purposes.
- Returns:
- The identifier of the current node if a node is active,
None if no node has been set or before initialization.
- Return type:
Optional[str]
Examples
Conditional logic based on current node:
async def participant_joined(action: dict, flow_manager: FlowManager): current = flow_manager.current_node if current == "transferring_to_human_agent": await start_human_agent_interaction(flow_manager) elif current == "collecting_payment": await setup_secure_session(flow_manager)
Logging and debugging:
async def log_conversation_state(action: dict, flow_manager: FlowManager): node = flow_manager.current_node logger.info(f"Current conversation node: {node}") return {"current_node": node}
- property worker: PipelineWorker
Access the pipeline worker instance for frame queueing.
This property provides access to the PipelineWorker instance used by the FlowManager. The worker can be used to queue custom frames directly into the pipeline, enabling advanced flow control and custom frame injection.
- Returns:
- The pipeline worker instance used for frame processing
and queueing operations.
- Return type:
Examples
Queueing frames in handlers:
async def send_custom_notification(action: dict, flow_manager: FlowManager): from pipecat.frames.frames import TTSUpdateSettingsFrame # Queue a TTS settings update frame await flow_manager.worker.queue_frame( TTSUpdateSettingsFrame(settings={"voice": "your-new-voice-id"}) )
- property task: PipelineWorker
Access the pipeline worker instance for frame queueing.
Deprecated since version 1.5.0: Use
workerinstead. Will be removed in 2.0.0.- Returns:
- The pipeline worker instance used for frame processing
and queueing operations.
- Return type:
- async initialize(initial_node: NodeConfig | None = None) None[source]
Initialize the flow manager.
- Parameters:
initial_node – Optional initial node configuration. If provided, the flow will start at this node immediately.
- Raises:
FlowInitializationError – If initialization fails.
Examples
Initialize with an initial node:
flow_manager = FlowManager( ... # Initialization parameters ) await flow_manager.initialize(create_initial_node())
Initialize without an initial node (set later via set_node_from_config):
flow_manager = FlowManager( ... # Initialization parameters ) await flow_manager.initialize()
- get_current_context() list[dict][source]
Get the current conversation context.
- Returns:
List of messages in the current context, including system messages, user messages, and assistant responses.
- Raises:
FlowError – If context aggregator is not available.
- register_action(action_type: str, handler: Callable) None[source]
Register a handler for a specific action type.
- Parameters:
action_type – String identifier for the action (e.g., “tts_say”).
handler – Async or sync function that handles the action.
Example:
async def custom_notification(action: dict): text = action.get("text", "") await notify_user(text) flow_manager.register_action("notify", custom_notification)
- async set_node_from_config(node_config: NodeConfig) None[source]
Set up a new conversation node and transition to it.
Used to manually transition between nodes in a flow.
- Parameters:
node_config – Configuration for the new node.
- Raises:
FlowTransitionError – If manager not initialized.
FlowError – If node setup fails.
- class pipecat.flows.ActionConfig[source]
Bases:
TypedDictConfiguration for an action.
- Parameters:
type – Action type identifier (e.g. “tts_say”, “notify_slack”).
handler – Callable to handle the action.
text – Text to speak for the “tts_say” action, or the optional goodbye message for the “end_conversation” action.
append_text_to_context – For the built-in TTS actions (“tts_say” and “end_conversation”), whether the spoken
textis appended to the LLM context. Defaults to True.
Note
Additional fields are allowed and passed to the handler.
- type: Required[str]
- handler: Callable[[dict[str, Any]], Awaitable[None]] | Callable[[dict[str, Any], FlowManager], Awaitable[None]]
- text: str
- append_text_to_context: bool
- class pipecat.flows.ContextStrategy(*values)[source]
Bases:
EnumStrategy for managing context during node transitions.
- Parameters:
APPEND – Append new messages to existing context (default).
RESET – Reset context with new messages only.
RESET_WITH_SUMMARY –
Reset context but include an LLM-generated summary.
Deprecated since version 1.5.0: Use
LLMSummarizeContextFrameinstead — push it in a pre-action to trigger on-demand summarization during a node transition. See https://docs.pipecat.ai/guides/fundamentals/context-summarization. Will be removed in 2.0.0.
- APPEND = 'append'
- RESET = 'reset'
- RESET_WITH_SUMMARY = 'reset_with_summary'
- class pipecat.flows.ContextStrategyConfig(strategy: ContextStrategy, summary_prompt: str | None = None)[source]
Bases:
objectConfiguration for context management.
- Parameters:
strategy – Strategy to use for context management.
summary_prompt –
Required prompt text when using RESET_WITH_SUMMARY.
Deprecated since version 1.5.0: Use
LLMContextSummaryConfig.summarization_promptinstead. Deprecated together withRESET_WITH_SUMMARY. Will be removed in 2.0.0.
- strategy: ContextStrategy
- summary_prompt: str | None = None
- class pipecat.flows.FlowResult(**kwargs)[source]
Bases:
TypedDictOptional convention TypedDict for
status/errorresults.Deprecated since version 1.5.0: No replacement.
FlowResultis no longer required or referenced by any handler type, and Pipecat’s upstream function-call-result contract isAny— define your ownTypedDictor return any JSON-serializable value. Will be removed in 2.0.0.- Parameters:
status – Status of the function execution.
error – Optional error message if execution failed.
- status: str
- error: str
- class pipecat.flows.FlowsFunctionSchema(name: str, description: str, properties: dict[str, Any], required: list[str], handler: Callable[[], Awaitable[Any]] | Callable[[dict[str, Any]], Awaitable[Any]] | Callable[[dict[str, Any], FlowManager], Awaitable[Any]], cancel_on_interruption: bool = False, timeout_secs: float | None = None)[source]
Bases:
objectFunction schema with Flows-specific properties.
This class extends a standard function schema with the Flows-specific
handlerthat runs when the function is called, plus its call options.- Parameters:
name – Name of the function.
description – Description of the function.
properties – Dictionary defining parameter types and descriptions.
required – List of required parameter names.
handler – Function handler to process the function call.
cancel_on_interruption – Whether to cancel this function call when an interruption occurs. Defaults to False.
timeout_secs – Optional per-tool timeout in seconds, overriding the global
function_call_timeout_secs. Defaults to None (use global timeout).
- name: str
- description: str
- properties: dict[str, Any]
- required: list[str]
- handler: Callable[[], Awaitable[Any]] | Callable[[dict[str, Any]], Awaitable[Any]] | Callable[[dict[str, Any], FlowManager], Awaitable[Any]]
- cancel_on_interruption: bool = False
- timeout_secs: float | None = None
- to_function_schema() FunctionSchema[source]
Convert to a standard FunctionSchema for use with LLMs.
- Returns:
FunctionSchema without flow-specific fields.
- class pipecat.flows.NodeConfig[source]
Bases:
TypedDictConfiguration for a single node in the flow.
- Parameters:
task_messages – List of message dicts defining the current node’s objectives.
name – Name of the node, useful for debug logging when returning a next node from a “consolidated” function.
role_message – The bot’s role/personality as a plain string, sent as the LLM’s system instruction via
LLMUpdateSettingsFrame. When provided, the system instruction persists across node transitions until a new node explicitly setsrole_messageagain.role_messages –
Deprecated list-of-dicts format for the bot’s role/personality.
Deprecated since version 1.5.0: Use
role_message(str) instead. Will be removed in 2.0.0.functions – List of FlowsFunctionSchema definitions or direct functions whose definitions are automatically extracted from their signatures.
pre_actions – Actions to execute before LLM inference.
post_actions – Actions to execute after LLM inference.
context_strategy – Strategy for updating context during transitions.
respond_immediately – Whether to run LLM inference as soon as the node is set (default: True).
Example:
{ "role_message": "You are a helpful assistant...", "task_messages": [ { "role": "developer", "content": "Ask the user for their name..." } ], "functions": [...], "pre_actions": [...], "post_actions": [...], "context_strategy": ContextStrategyConfig(strategy=ContextStrategy.APPEND), "respond_immediately": true, }
- task_messages: Required[list[dict]]
- name: str
- role_message: str
- role_messages: list[dict[str, Any]]
- functions: list[FlowsFunctionSchema | Callable[[...], Awaitable[tuple[Any, NodeConfig | None]]]]
- pre_actions: list[ActionConfig]
- post_actions: list[ActionConfig]
- context_strategy: ContextStrategyConfig
- respond_immediately: bool
- pipecat.flows.flows_tool_options(*, cancel_on_interruption: bool = False, timeout_secs: float | None = None) Callable[[Callable], Callable][source]
Configure a Flows direct function’s call options.
This decorator is optional; use it to override the defaults for a Flows direct function (an async function whose first parameter is
flow_manager).- Parameters:
cancel_on_interruption – Whether to cancel the function call when the user interrupts. Defaults to False.
timeout_secs – Optional per-tool timeout in seconds, overriding the global
function_call_timeout_secs. Defaults to None (use global timeout).
- Returns:
A decorator that attaches the metadata to the function.
Example:
@flows_tool_options(cancel_on_interruption=False, timeout_secs=30) async def long_running_task(flow_manager: FlowManager, query: str): '''Perform a long-running task that should not be cancelled on interruption.''' # ... implementation return {"status": "complete"}, None
- pipecat.flows.flows_direct_function(*, cancel_on_interruption: bool = False, timeout_secs: float | None = None) Callable[[Callable], Callable][source]
Configure a Flows direct function’s call options.
Deprecated since version 1.5.0: Renamed to
flows_tool_options()to align with Pipecat’s@tool_optionsand make clearer that it configures call options. Will be removed in 2.0.0.- Parameters:
cancel_on_interruption – Whether to cancel the function call when the user interrupts. Defaults to False.
timeout_secs – Optional per-tool timeout in seconds, overriding the global
function_call_timeout_secs. Defaults to None (use global timeout).
- Returns:
A decorator that attaches the metadata to the function.
- exception pipecat.flows.FlowError[source]
Bases:
ExceptionBase exception for all flow-related errors.
This is the parent class for all flow system exceptions. Use this for generic flow errors or when a more specific exception doesn’t apply.
- exception pipecat.flows.FlowInitializationError[source]
Bases:
FlowErrorRaised when flow initialization fails.
This exception occurs during flow manager setup, typically due to invalid configuration, missing dependencies, or initialization errors.
- exception pipecat.flows.FlowTransitionError[source]
Bases:
FlowErrorRaised when a state transition fails.
This exception occurs when transitioning between nodes fails due to invalid node configurations, missing target nodes, or transition errors.
- exception pipecat.flows.InvalidFunctionError[source]
Bases:
FlowErrorRaised when an invalid or unavailable function is called.
This exception occurs when attempting to call functions that are not properly registered, have invalid signatures, or cannot be found.
- exception pipecat.flows.ActionError[source]
Bases:
FlowErrorRaised when an action execution fails.
This exception occurs during action execution, including built-in actions like TTS or custom actions, due to invalid configuration or execution errors.
Submodules
- actions
- adapters
- exceptions
- manager
- types
FlowResultFlowArgsLegacyActionHandlerFlowActionHandlerActionConfigContextStrategyContextStrategyConfigNodeConfigConsolidatedFunctionResultZeroArgFunctionHandlerLegacyFunctionHandlerFlowFunctionHandlerFunctionHandlerFlowsDirectFunctionFlowsFunctionSchemaflows_tool_options()flows_direct_function()FlowsDirectFunctionWrapperget_or_generate_node_name()