worker
Pipeline worker implementation for managing frame processing pipelines.
This module provides the main PipelineWorker class that orchestrates pipeline execution, frame routing, lifecycle management, and monitoring capabilities including heartbeats, idle detection, and observer integration.
- class pipecat.pipeline.worker.IdleFrameObserver(*, idle_event: Event, idle_timeout_frames: tuple[type[Frame], ...])[source]
Bases:
BaseObserverIdle timeout observer.
This observer waits for specific frames being generated in the pipeline. If the frames are generated the given asyncio event is set. If the event is not set it means the pipeline is probably idle.
- __init__(*, idle_event: Event, idle_timeout_frames: tuple[type[Frame], ...])[source]
Initialize the observer.
- Parameters:
idle_event – The event to set if the idle timeout frames are being pushed.
idle_timeout_frames – A tuple with the frames that should set the event when received
- async on_push_frame(data: FramePushed)[source]
Callback executed when a frame is pushed in the pipeline.
- Parameters:
data – The frame push event data.
- class pipecat.pipeline.worker.PipelineParams(*, audio_in_sample_rate: int = 16000, audio_out_sample_rate: int = 24000, enable_heartbeats: bool = False, enable_metrics: bool = False, enable_usage_metrics: bool = False, heartbeats_period_secs: float = 1.0, heartbeats_monitor_secs: float = 10.0, report_only_initial_ttfb: bool = False, send_initial_empty_metrics: bool = True, start_metadata: dict[str, ~typing.Any]=<factory>)[source]
Bases:
BaseModelConfiguration parameters for pipeline execution.
These parameters are usually passed to all frame processors through StartFrame. For other generic pipeline worker parameters use PipelineWorker constructor arguments instead.
- Parameters:
audio_in_sample_rate – Input audio sample rate in Hz.
audio_out_sample_rate – Output audio sample rate in Hz.
enable_heartbeats – Whether to enable heartbeat monitoring.
enable_metrics – Whether to enable metrics collection.
enable_usage_metrics – Whether to enable usage metrics.
heartbeats_period_secs – Period between heartbeats in seconds.
heartbeats_monitor_secs – Timeout (in seconds) before warning about missed heartbeats. Defaults to 10 seconds.
report_only_initial_ttfb – Whether to report only initial time to first byte.
send_initial_empty_metrics – Whether to send initial empty metrics.
start_metadata – Additional metadata for pipeline start.
- audio_in_sample_rate: int
- audio_out_sample_rate: int
- enable_heartbeats: bool
- enable_metrics: bool
- enable_usage_metrics: bool
- heartbeats_period_secs: float
- heartbeats_monitor_secs: float
- report_only_initial_ttfb: bool
- send_initial_empty_metrics: bool
- start_metadata: dict[str, Any]
- class pipecat.pipeline.worker.PipelineWorker(pipeline: ~pipecat.pipeline.base_pipeline.BasePipeline, *, active: bool = True, additional_span_attributes: dict | None = None, app_resources: ~typing.Any = None, bridged: tuple[str, ...] | None = None, cancel_on_idle_timeout: bool = True, cancel_runner_on_idle_timeout: bool = True, cancel_timeout_secs: float = 20.0, check_dangling_tasks: bool = True, clock: ~pipecat.clocks.base_clock.BaseClock | None = None, conversation_id: str | None = None, enable_tracing: bool = False, enable_turn_tracking: bool = True, enable_rtvi: bool = True, exclude_frames: tuple[type[~pipecat.frames.frames.Frame], ...] | None = None, idle_timeout_frames: tuple[type[~pipecat.frames.frames.Frame], ...] = (<class 'pipecat.frames.frames.BotSpeakingFrame'>, <class 'pipecat.frames.frames.UserSpeakingFrame'>), idle_timeout_secs: float | None = 300, name: str | None = None, observers: list[~pipecat.observers.base_observer.BaseObserver] | None = None, params: ~pipecat.pipeline.worker.PipelineParams | None = None, rtvi_processor: ~pipecat.processors.frameworks.rtvi.processor.RTVIProcessor | None = None, rtvi_observer_params: ~pipecat.processors.frameworks.rtvi.observer.RTVIObserverParams | None = None, task_manager: ~pipecat.utils.asyncio.task_manager.BaseTaskManager | None = None, tool_resources: ~typing.Any = None)[source]
Bases:
BaseWorkerManages the execution of a pipeline, handling frame processing and worker lifecycle.
This class orchestrates pipeline execution with comprehensive monitoring, event handling, and lifecycle management. It provides event handlers for various pipeline states and frame types, idle detection, heartbeat monitoring, and observer integration.
Event handlers available:
on_frame_reached_upstream: Called when upstream frames reach the source
on_frame_reached_downstream: Called when downstream frames reach the sink
on_idle_timeout: Called when pipeline is idle beyond timeout threshold
on_pipeline_started: Called when pipeline starts with StartFrame
- on_pipeline_finished: Called after the pipeline has reached any terminal state.
This includes:
StopFrame: pipeline was stopped (processors keep connections open)
EndFrame: pipeline ended normally
CancelFrame: pipeline was cancelled
Use this event for cleanup, logging, or post-processing tasks. Users can inspect the frame if they need to handle specific cases.
on_pipeline_error: Called when an error occurs with ErrorFrame
Example:
@worker.event_handler("on_frame_reached_upstream") async def on_frame_reached_upstream(worker, frame): ... @worker.event_handler("on_idle_timeout") async def on_pipeline_idle_timeout(worker): ... @worker.event_handler("on_pipeline_started") async def on_pipeline_started(worker, frame): ... @worker.event_handler("on_pipeline_finished") async def on_pipeline_finished(worker, frame): ... @worker.event_handler("on_pipeline_error") async def on_pipeline_error(worker, frame): ...
- __init__(pipeline: ~pipecat.pipeline.base_pipeline.BasePipeline, *, active: bool = True, additional_span_attributes: dict | None = None, app_resources: ~typing.Any = None, bridged: tuple[str, ...] | None = None, cancel_on_idle_timeout: bool = True, cancel_runner_on_idle_timeout: bool = True, cancel_timeout_secs: float = 20.0, check_dangling_tasks: bool = True, clock: ~pipecat.clocks.base_clock.BaseClock | None = None, conversation_id: str | None = None, enable_tracing: bool = False, enable_turn_tracking: bool = True, enable_rtvi: bool = True, exclude_frames: tuple[type[~pipecat.frames.frames.Frame], ...] | None = None, idle_timeout_frames: tuple[type[~pipecat.frames.frames.Frame], ...] = (<class 'pipecat.frames.frames.BotSpeakingFrame'>, <class 'pipecat.frames.frames.UserSpeakingFrame'>), idle_timeout_secs: float | None = 300, name: str | None = None, observers: list[~pipecat.observers.base_observer.BaseObserver] | None = None, params: ~pipecat.pipeline.worker.PipelineParams | None = None, rtvi_processor: ~pipecat.processors.frameworks.rtvi.processor.RTVIProcessor | None = None, rtvi_observer_params: ~pipecat.processors.frameworks.rtvi.observer.RTVIObserverParams | None = None, task_manager: ~pipecat.utils.asyncio.task_manager.BaseTaskManager | None = None, tool_resources: ~typing.Any = None)[source]
Initialize the PipelineWorker.
- Parameters:
pipeline – The pipeline to execute.
active – Whether the worker starts active. Forwarded to
BaseWorker.additional_span_attributes – Optional dictionary of attributes to propagate as OpenTelemetry conversation span attributes.
app_resources – Optional application-defined bag of anything your application code may want to share across this session (DB handles, HTTP clients, etc.), passed by reference. Pipecat passes it through untouched and exposes it on the worker itself as
worker.app_resourcesand passes it to tool handlers asFunctionCallParams.app_resources. The framework never copies or clears this object; the caller retains their handle and can read any mutations after the worker finishes.bridged – Bridge configuration.
Nonemeans the pipeline is not bridged. An empty tuple()wraps the pipeline with bus edge processors that accept frames from all bridges. A tuple of names like("voice",)accepts only frames from those bridges. The bus comes fromattach()(called by the runner).cancel_on_idle_timeout – Whether reaching the idle timeout should cancel the pipeline worker. When
False, the idle event still fireson_idle_timeoutbut the worker is left alone (andcancel_runner_on_idle_timeoutis ignored too: opting out of local cancellation also opts out of the runner-wide cancel).cancel_runner_on_idle_timeout – When
cancel_on_idle_timeoutis alsoTrue, whether reaching the idle timeout should also cancel the entireWorkerRunner. The worker is always cancelled first; when this isTruethe worker also emits aBusCancelMessageso the runner broadcasts cancellation to every other root worker. Defaults toTrueso a multi-worker bot’s helpers shut down with the main pipeline; set toFalsefor a sidecarPipelineWorkerthat should self-cancel on idle without bringing down its peers.cancel_timeout_secs – Timeout (in seconds) to wait for cancellation to happen cleanly.
check_dangling_tasks – Whether to check for processors’ tasks finishing properly.
clock – Clock implementation for timing operations.
conversation_id – Optional custom ID for the conversation.
enable_rtvi – Whether to automatically add RTVI support to the pipeline.
enable_tracing – Whether to enable tracing.
enable_turn_tracking – Whether to enable turn tracking.
exclude_frames – When
bridgedis set, extra frame types that should not cross the bus (lifecycle frames are always excluded).idle_timeout_frames – A tuple with the frames that should trigger an idle timeout if not received within idle_timeout_seconds.
idle_timeout_secs – Timeout (in seconds) to consider pipeline idle or None. If a pipeline is idle the pipeline worker will be cancelled automatically.
name – Optional worker name (used for worker-style addressing on the bus).
observers – List of observers for monitoring pipeline execution.
params – Configuration parameters for the pipeline.
rtvi_observer_params – The RTVI observer parameter to use if RTVI is enabled.
rtvi_processor – The RTVI processor to add if RTVI is enabled.
task_manager – Optional worker manager for handling asyncio tasks.
tool_resources –
Deprecated alias for
app_resources.Deprecated since version 1.2.0: Use
app_resourcesinstead.tool_resourceswill be removed in a future version.
- property params: PipelineParams
Get the pipeline parameters for this worker.
- Returns:
The pipeline parameters configuration.
- property bridged: bool
Whether this pipeline is bridged onto the bus.
- property app_resources: Any
Get the application-defined resources passed to this worker.
This is the same object passed to the constructor as
app_resources. Tool handlers can also access it viaFunctionCallParams.app_resources. The framework returns the original reference; mutations are visible to all callers.- Returns:
The application-defined resources, or
Noneif none were passed.
- property pipeline: BasePipeline
Get the full pipeline managed by this pipeline worker.
This will also include any internal processors added by the pipeline worker.
- Returns:
The pipeline managed by the pipeline worker.
- property turn_tracking_observer: TurnTrackingObserver | None
Get the turn tracking observer if enabled.
- Returns:
The turn tracking observer instance or None if not enabled.
- property turn_trace_observer: TurnTraceObserver | None
Get the turn trace observer if enabled.
- Returns:
The turn trace observer instance or None if not enabled.
- property rtvi: RTVIProcessor
Get the RTVI processor if RTVI is enabled.
- Returns:
The RTVI processor added to the pipeline when RTVI is enabled.
- property reached_upstream_types: tuple[type[Frame], ...]
Get the currently configured upstream frame type filters.
- Returns:
Tuple of frame types that trigger the on_frame_reached_upstream event.
- property reached_downstream_types: tuple[type[Frame], ...]
Get the currently configured downstream frame type filters.
- Returns:
Tuple of frame types that trigger the on_frame_reached_downstream event.
- add_observer(observer: BaseObserver)[source]
Add an observer to monitor pipeline execution.
- Parameters:
observer – The observer to add to the pipeline monitoring.
- async remove_observer(observer: BaseObserver)[source]
Remove an observer from pipeline monitoring.
- Parameters:
observer – The observer to remove from pipeline monitoring.
- set_reached_upstream_filter(types: tuple[type[Frame], ...])[source]
Set which frame types trigger the on_frame_reached_upstream event.
- Parameters:
types – Tuple of frame types to monitor for upstream events.
- set_reached_downstream_filter(types: tuple[type[Frame], ...])[source]
Set which frame types trigger the on_frame_reached_downstream event.
- Parameters:
types – Tuple of frame types to monitor for downstream events.
- add_reached_upstream_filter(types: tuple[type[Frame], ...])[source]
Add frame types to trigger the on_frame_reached_upstream event.
- Parameters:
types – Tuple of frame types to add to upstream monitoring.
- add_reached_downstream_filter(types: tuple[type[Frame], ...])[source]
Add frame types to trigger the on_frame_reached_downstream event.
- Parameters:
types – Tuple of frame types to add to downstream monitoring.
- has_finished() bool[source]
Check if the pipeline worker has finished execution.
This indicates whether the worker has finished, meaning all processors have stopped.
- Returns:
True if all processors have stopped and the worker is complete.
- async stop_when_done()[source]
Schedule the pipeline to stop after processing all queued frames.
Sends an EndFrame to gracefully terminate the pipeline once all current processing is complete.
- async cancel(*, reason: str | None = None)[source]
Request the running pipeline to cancel.
- Parameters:
reason – Optional reason to indicate why the pipeline is being cancelled.
- async run(params: WorkerParams)[source]
Start and manage the pipeline execution until completion or cancellation.
- Parameters:
params – Configuration parameters for pipeline execution.
- async queue_frame(frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM)[source]
Queue a single frame to be pushed through the pipeline.
Downstream frames are pushed from the beginning of the pipeline. Upstream frames are pushed from the end of the pipeline.
- Parameters:
frame – The frame to be processed.
direction – The direction to push the frame. Defaults to downstream.
- async queue_frames(frames: Iterable[Frame] | AsyncIterable[Frame], direction: FrameDirection = FrameDirection.DOWNSTREAM)[source]
Queue multiple frames to be pushed through the pipeline.
Downstream frames are pushed from the beginning of the pipeline. Upstream frames are pushed from the end of the pipeline.
- Parameters:
frames – An iterable or async iterable of frames to be processed.
direction – The direction to push the frames. Defaults to downstream.
- async on_bus_message(message: BusMessage) None[source]
Handle outbound bus messages: TTS playback and RTVI UI translation.
Runs the base lifecycle/job dispatch first. A
BusTTSSpeakMessagetargeted at this worker is queued as aTTSSpeakFrame(pipelines without a TTS service let it flow through). When this worker owns the RTVI processor, UI carriers produced by aUIWorker(BusUIDataMessagesubclasses) are translated into RTVI frames by_handle_ui_bus_message; other workers skip the translation.
- class pipecat.pipeline.worker.PipelineTask(*args, **kwargs)[source]
Bases:
PipelineWorkerDeprecated alias for
PipelineWorker.Deprecated since version 1.3.0: Use
PipelineWorkerinstead.PipelineTaskwill be removed in a future release.
- class pipecat.pipeline.worker.PipelineTaskParams(loop: AbstractEventLoop)[source]
Bases:
WorkerParamsDeprecated alias for
WorkerParams.Deprecated since version 1.3.0: Use
WorkerParamsinstead.PipelineTaskParamswill be removed in a future release.