worker

Pipeline worker implementation for managing frame processing pipelines.

This module provides the main PipelineWorker class that orchestrates pipeline execution, frame routing, lifecycle management, and monitoring capabilities including heartbeats, idle detection, and observer integration.

class pipecat.pipeline.worker.IdleFrameObserver(*, idle_event: Event, idle_timeout_frames: tuple[type[Frame], ...])[source]

Bases: BaseObserver

Idle timeout observer.

This observer waits for specific frames being generated in the pipeline. If the frames are generated the given asyncio event is set. If the event is not set it means the pipeline is probably idle.

__init__(*, idle_event: Event, idle_timeout_frames: tuple[type[Frame], ...])[source]

Initialize the observer.

Parameters:
  • idle_event – The event to set if the idle timeout frames are being pushed.

  • idle_timeout_frames – A tuple with the frames that should set the event when received

async on_push_frame(data: FramePushed)[source]

Callback executed when a frame is pushed in the pipeline.

Parameters:

data – The frame push event data.

class pipecat.pipeline.worker.PipelineParams(*, audio_in_sample_rate: int = 16000, audio_out_sample_rate: int = 24000, enable_heartbeats: bool = False, enable_metrics: bool = False, enable_usage_metrics: bool = False, heartbeats_period_secs: float = 1.0, heartbeats_monitor_secs: float = 10.0, report_only_initial_ttfb: bool = False, send_initial_empty_metrics: bool = True, start_metadata: dict[str, ~typing.Any]=<factory>)[source]

Bases: BaseModel

Configuration parameters for pipeline execution.

These parameters are usually passed to all frame processors through StartFrame. For other generic pipeline worker parameters use PipelineWorker constructor arguments instead.

Parameters:
  • audio_in_sample_rate – Input audio sample rate in Hz.

  • audio_out_sample_rate – Output audio sample rate in Hz.

  • enable_heartbeats – Whether to enable heartbeat monitoring.

  • enable_metrics – Whether to enable metrics collection.

  • enable_usage_metrics – Whether to enable usage metrics.

  • heartbeats_period_secs – Period between heartbeats in seconds.

  • heartbeats_monitor_secs – Timeout (in seconds) before warning about missed heartbeats. Defaults to 10 seconds.

  • report_only_initial_ttfb – Whether to report only initial time to first byte.

  • send_initial_empty_metrics – Whether to send initial empty metrics.

  • start_metadata – Additional metadata for pipeline start.

audio_in_sample_rate: int
audio_out_sample_rate: int
enable_heartbeats: bool
enable_metrics: bool
enable_usage_metrics: bool
heartbeats_period_secs: float
heartbeats_monitor_secs: float
report_only_initial_ttfb: bool
send_initial_empty_metrics: bool
start_metadata: dict[str, Any]
class pipecat.pipeline.worker.PipelineWorker(pipeline: ~pipecat.pipeline.base_pipeline.BasePipeline, *, active: bool = True, additional_span_attributes: dict | None = None, app_resources: ~typing.Any = None, bridged: tuple[str, ...] | None = None, cancel_on_idle_timeout: bool = True, cancel_runner_on_idle_timeout: bool = True, cancel_timeout_secs: float = 20.0, check_dangling_tasks: bool = True, clock: ~pipecat.clocks.base_clock.BaseClock | None = None, conversation_id: str | None = None, enable_tracing: bool = False, enable_turn_tracking: bool = True, enable_rtvi: bool = True, exclude_frames: tuple[type[~pipecat.frames.frames.Frame], ...] | None = None, idle_timeout_frames: tuple[type[~pipecat.frames.frames.Frame], ...] = (<class 'pipecat.frames.frames.BotSpeakingFrame'>, <class 'pipecat.frames.frames.UserSpeakingFrame'>), idle_timeout_secs: float | None = 300, name: str | None = None, observers: list[~pipecat.observers.base_observer.BaseObserver] | None = None, params: ~pipecat.pipeline.worker.PipelineParams | None = None, rtvi_processor: ~pipecat.processors.frameworks.rtvi.processor.RTVIProcessor | None = None, rtvi_observer_params: ~pipecat.processors.frameworks.rtvi.observer.RTVIObserverParams | None = None, task_manager: ~pipecat.utils.asyncio.task_manager.BaseTaskManager | None = None, tool_resources: ~typing.Any = None)[source]

Bases: BaseWorker

Manages the execution of a pipeline, handling frame processing and worker lifecycle.

This class orchestrates pipeline execution with comprehensive monitoring, event handling, and lifecycle management. It provides event handlers for various pipeline states and frame types, idle detection, heartbeat monitoring, and observer integration.

Event handlers available:

  • on_frame_reached_upstream: Called when upstream frames reach the source

  • on_frame_reached_downstream: Called when downstream frames reach the sink

  • on_idle_timeout: Called when pipeline is idle beyond timeout threshold

  • on_pipeline_started: Called when pipeline starts with StartFrame

  • on_pipeline_finished: Called after the pipeline has reached any terminal state.

    This includes:

    • StopFrame: pipeline was stopped (processors keep connections open)

    • EndFrame: pipeline ended normally

    • CancelFrame: pipeline was cancelled

    Use this event for cleanup, logging, or post-processing tasks. Users can inspect the frame if they need to handle specific cases.

  • on_pipeline_error: Called when an error occurs with ErrorFrame

Example:

@worker.event_handler("on_frame_reached_upstream")
async def on_frame_reached_upstream(worker, frame):
    ...

@worker.event_handler("on_idle_timeout")
async def on_pipeline_idle_timeout(worker):
    ...

@worker.event_handler("on_pipeline_started")
async def on_pipeline_started(worker, frame):
    ...

@worker.event_handler("on_pipeline_finished")
async def on_pipeline_finished(worker, frame):
    ...

@worker.event_handler("on_pipeline_error")
async def on_pipeline_error(worker, frame):
    ...
__init__(pipeline: ~pipecat.pipeline.base_pipeline.BasePipeline, *, active: bool = True, additional_span_attributes: dict | None = None, app_resources: ~typing.Any = None, bridged: tuple[str, ...] | None = None, cancel_on_idle_timeout: bool = True, cancel_runner_on_idle_timeout: bool = True, cancel_timeout_secs: float = 20.0, check_dangling_tasks: bool = True, clock: ~pipecat.clocks.base_clock.BaseClock | None = None, conversation_id: str | None = None, enable_tracing: bool = False, enable_turn_tracking: bool = True, enable_rtvi: bool = True, exclude_frames: tuple[type[~pipecat.frames.frames.Frame], ...] | None = None, idle_timeout_frames: tuple[type[~pipecat.frames.frames.Frame], ...] = (<class 'pipecat.frames.frames.BotSpeakingFrame'>, <class 'pipecat.frames.frames.UserSpeakingFrame'>), idle_timeout_secs: float | None = 300, name: str | None = None, observers: list[~pipecat.observers.base_observer.BaseObserver] | None = None, params: ~pipecat.pipeline.worker.PipelineParams | None = None, rtvi_processor: ~pipecat.processors.frameworks.rtvi.processor.RTVIProcessor | None = None, rtvi_observer_params: ~pipecat.processors.frameworks.rtvi.observer.RTVIObserverParams | None = None, task_manager: ~pipecat.utils.asyncio.task_manager.BaseTaskManager | None = None, tool_resources: ~typing.Any = None)[source]

Initialize the PipelineWorker.

Parameters:
  • pipeline – The pipeline to execute.

  • active – Whether the worker starts active. Forwarded to BaseWorker.

  • additional_span_attributes – Optional dictionary of attributes to propagate as OpenTelemetry conversation span attributes.

  • app_resources – Optional application-defined bag of anything your application code may want to share across this session (DB handles, HTTP clients, etc.), passed by reference. Pipecat passes it through untouched and exposes it on the worker itself as worker.app_resources and passes it to tool handlers as FunctionCallParams.app_resources. The framework never copies or clears this object; the caller retains their handle and can read any mutations after the worker finishes.

  • bridged – Bridge configuration. None means the pipeline is not bridged. An empty tuple () wraps the pipeline with bus edge processors that accept frames from all bridges. A tuple of names like ("voice",) accepts only frames from those bridges. The bus comes from attach() (called by the runner).

  • cancel_on_idle_timeout – Whether reaching the idle timeout should cancel the pipeline worker. When False, the idle event still fires on_idle_timeout but the worker is left alone (and cancel_runner_on_idle_timeout is ignored too: opting out of local cancellation also opts out of the runner-wide cancel).

  • cancel_runner_on_idle_timeout – When cancel_on_idle_timeout is also True, whether reaching the idle timeout should also cancel the entire WorkerRunner. The worker is always cancelled first; when this is True the worker also emits a BusCancelMessage so the runner broadcasts cancellation to every other root worker. Defaults to True so a multi-worker bot’s helpers shut down with the main pipeline; set to False for a sidecar PipelineWorker that should self-cancel on idle without bringing down its peers.

  • cancel_timeout_secs – Timeout (in seconds) to wait for cancellation to happen cleanly.

  • check_dangling_tasks – Whether to check for processors’ tasks finishing properly.

  • clock – Clock implementation for timing operations.

  • conversation_id – Optional custom ID for the conversation.

  • enable_rtvi – Whether to automatically add RTVI support to the pipeline.

  • enable_tracing – Whether to enable tracing.

  • enable_turn_tracking – Whether to enable turn tracking.

  • exclude_frames – When bridged is set, extra frame types that should not cross the bus (lifecycle frames are always excluded).

  • idle_timeout_frames – A tuple with the frames that should trigger an idle timeout if not received within idle_timeout_seconds.

  • idle_timeout_secs – Timeout (in seconds) to consider pipeline idle or None. If a pipeline is idle the pipeline worker will be cancelled automatically.

  • name – Optional worker name (used for worker-style addressing on the bus).

  • observers – List of observers for monitoring pipeline execution.

  • params – Configuration parameters for the pipeline.

  • rtvi_observer_params – The RTVI observer parameter to use if RTVI is enabled.

  • rtvi_processor – The RTVI processor to add if RTVI is enabled.

  • task_manager – Optional worker manager for handling asyncio tasks.

  • tool_resources

    Deprecated alias for app_resources.

    Deprecated since version 1.2.0: Use app_resources instead. tool_resources will be removed in a future version.

property params: PipelineParams

Get the pipeline parameters for this worker.

Returns:

The pipeline parameters configuration.

property bridged: bool

Whether this pipeline is bridged onto the bus.

property app_resources: Any

Get the application-defined resources passed to this worker.

This is the same object passed to the constructor as app_resources. Tool handlers can also access it via FunctionCallParams.app_resources. The framework returns the original reference; mutations are visible to all callers.

Returns:

The application-defined resources, or None if none were passed.

property pipeline: BasePipeline

Get the full pipeline managed by this pipeline worker.

This will also include any internal processors added by the pipeline worker.

Returns:

The pipeline managed by the pipeline worker.

property turn_tracking_observer: TurnTrackingObserver | None

Get the turn tracking observer if enabled.

Returns:

The turn tracking observer instance or None if not enabled.

property turn_trace_observer: TurnTraceObserver | None

Get the turn trace observer if enabled.

Returns:

The turn trace observer instance or None if not enabled.

property rtvi: RTVIProcessor

Get the RTVI processor if RTVI is enabled.

Returns:

The RTVI processor added to the pipeline when RTVI is enabled.

property reached_upstream_types: tuple[type[Frame], ...]

Get the currently configured upstream frame type filters.

Returns:

Tuple of frame types that trigger the on_frame_reached_upstream event.

property reached_downstream_types: tuple[type[Frame], ...]

Get the currently configured downstream frame type filters.

Returns:

Tuple of frame types that trigger the on_frame_reached_downstream event.

add_observer(observer: BaseObserver)[source]

Add an observer to monitor pipeline execution.

Parameters:

observer – The observer to add to the pipeline monitoring.

async remove_observer(observer: BaseObserver)[source]

Remove an observer from pipeline monitoring.

Parameters:

observer – The observer to remove from pipeline monitoring.

set_reached_upstream_filter(types: tuple[type[Frame], ...])[source]

Set which frame types trigger the on_frame_reached_upstream event.

Parameters:

types – Tuple of frame types to monitor for upstream events.

set_reached_downstream_filter(types: tuple[type[Frame], ...])[source]

Set which frame types trigger the on_frame_reached_downstream event.

Parameters:

types – Tuple of frame types to monitor for downstream events.

add_reached_upstream_filter(types: tuple[type[Frame], ...])[source]

Add frame types to trigger the on_frame_reached_upstream event.

Parameters:

types – Tuple of frame types to add to upstream monitoring.

add_reached_downstream_filter(types: tuple[type[Frame], ...])[source]

Add frame types to trigger the on_frame_reached_downstream event.

Parameters:

types – Tuple of frame types to add to downstream monitoring.

has_finished() bool[source]

Check if the pipeline worker has finished execution.

This indicates whether the worker has finished, meaning all processors have stopped.

Returns:

True if all processors have stopped and the worker is complete.

async stop_when_done()[source]

Schedule the pipeline to stop after processing all queued frames.

Sends an EndFrame to gracefully terminate the pipeline once all current processing is complete.

async cancel(*, reason: str | None = None)[source]

Request the running pipeline to cancel.

Parameters:

reason – Optional reason to indicate why the pipeline is being cancelled.

async run(params: WorkerParams)[source]

Start and manage the pipeline execution until completion or cancellation.

Parameters:

params – Configuration parameters for pipeline execution.

async queue_frame(frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM)[source]

Queue a single frame to be pushed through the pipeline.

Downstream frames are pushed from the beginning of the pipeline. Upstream frames are pushed from the end of the pipeline.

Parameters:
  • frame – The frame to be processed.

  • direction – The direction to push the frame. Defaults to downstream.

async queue_frames(frames: Iterable[Frame] | AsyncIterable[Frame], direction: FrameDirection = FrameDirection.DOWNSTREAM)[source]

Queue multiple frames to be pushed through the pipeline.

Downstream frames are pushed from the beginning of the pipeline. Upstream frames are pushed from the end of the pipeline.

Parameters:
  • frames – An iterable or async iterable of frames to be processed.

  • direction – The direction to push the frames. Defaults to downstream.

async on_bus_message(message: BusMessage) None[source]

Handle outbound bus messages: TTS playback and RTVI UI translation.

Runs the base lifecycle/job dispatch first. A BusTTSSpeakMessage targeted at this worker is queued as a TTSSpeakFrame (pipelines without a TTS service let it flow through). When this worker owns the RTVI processor, UI carriers produced by a UIWorker (BusUIDataMessage subclasses) are translated into RTVI frames by _handle_ui_bus_message; other workers skip the translation.

class pipecat.pipeline.worker.PipelineTask(*args, **kwargs)[source]

Bases: PipelineWorker

Deprecated alias for PipelineWorker.

Deprecated since version 1.3.0: Use PipelineWorker instead. PipelineTask will be removed in a future release.

__init__(*args, **kwargs)[source]

Initialize the pipeline worker (deprecated).

class pipecat.pipeline.worker.PipelineTaskParams(loop: AbstractEventLoop)[source]

Bases: WorkerParams

Deprecated alias for WorkerParams.

Deprecated since version 1.3.0: Use WorkerParams instead. PipelineTaskParams will be removed in a future release.