runner
Deprecated module.
The runner now lives in pipecat.workers.runner as
WorkerRunner. This module re-exports it
so existing from pipecat.pipeline.runner import WorkerRunner imports
keep working, and hosts the deprecated PipelineRunner alias.
New code should import WorkerRunner
from pipecat.workers.runner.
- class pipecat.pipeline.runner.PipelineRunner(*args, **kwargs)[source]
Bases:
WorkerRunnerDeprecated alias for
WorkerRunner.Deprecated since version 1.3.0: Use
WorkerRunnerinstead. The runner now runs workers (of whichPipelineWorkeris one kind), not just pipelines.PipelineRunnerwill be removed in a future release.
- class pipecat.pipeline.runner.WorkerRunner(*, name: str | None = None, bus: WorkerBus | None = None, handle_sigint: bool = True, handle_sigterm: bool = False, force_gc: bool = False, loop: AbstractEventLoop | None = None)[source]
Bases:
BaseObject,BusSubscriberManages worker execution.
Provides a high-level interface for running workers with automatic signal handling (SIGINT/SIGTERM), optional garbage collection, proper cleanup of resources, and a worker bus + registry for multi-worker orchestration.
Two entry points:
add_workers(*workers)()— register one or more workers on the runner’s bus and start them in the background. Workers run concurrently and remaining workers are cancelled when the runner ends.run()— block until the runner ends. By default (auto_end=True) the runner ends once every root worker has finished; passauto_end=Falseto keep the runner up untilend()/cancel()is called.
Event handlers available:
on_ready— fired after the runner has finished its initialization and any added workers have been started.on_error— fired when starting an added worker fails.
- __init__(*, name: str | None = None, bus: WorkerBus | None = None, handle_sigint: bool = True, handle_sigterm: bool = False, force_gc: bool = False, loop: AbstractEventLoop | None = None)[source]
Initialize the worker runner.
- Parameters:
name – Optional name for the runner instance. Defaults to a UUID-based name. Must be unique across runners in a distributed setup.
bus – Optional
WorkerBus. Defaults to a new in-processAsyncQueueBus.handle_sigint – Whether to automatically handle SIGINT signals.
handle_sigterm – Whether to automatically handle SIGTERM signals.
force_gc – Whether to force garbage collection after the main worker completes.
loop – Event loop to use. If None, uses the current running loop.
- property registry: WorkerRegistry
The worker registry this runner owns.
- async add_workers(*workers: BaseWorker) None[source]
Add one or more workers to the runner.
Adding a worker attaches it to the runner’s bus and registry, and starts it in the background. If the runner is not yet running (
add_workerswas called beforerun()), workers are queued and started during run setup; if the runner is already running, each worker starts immediately.Added workers run alongside the main worker and are cancelled when the main worker finishes (or when
end()/cancel()is called).- Parameters:
*workers – One or more workers to add.
- async run(worker: BaseWorker | None = None, *, auto_end: bool = True) None[source]
Run all added workers until the runner is stopped.
By default (
auto_end=True), the runner ends once every root worker has finished — so a single-pipeline bot naturally ends when its pipeline does. Multi-worker bots whose helpers run forever (e.g. waiting for bus messages) end by callingend()/cancel()from an event handler (typically on transport disconnect). For long-lived hosts that add and remove workers over many sessions (e.g. a FastAPI server), passauto_end=Falseso the runner does not exit when no workers are left.- Parameters:
worker –
Optional worker to run.
Deprecated since version 1.3.0: Register the worker with
add_workers()before callingrun()instead. Passingworkerhere will be removed in a future release.auto_end – When
True(the default), the runner ends once every root worker has finished. WhenFalse, the runner blocks untilend()orcancel()is called.
- async stop_when_done() None[source]
Schedule all root pipeline workers to stop when their current processing is complete.
- async end(reason: str | None = None) None[source]
Gracefully end all running workers.
Idempotent; subsequent calls are ignored.
- Parameters:
reason – Optional human-readable reason for ending.
- async cancel(reason: str | None = None) None[source]
Immediately cancel all running workers.
Idempotent; subsequent calls are ignored.
- Parameters:
reason – Optional human-readable reason for cancelling.
- async on_bus_message(message: BusMessage) None[source]
Process incoming bus messages for runner-level concerns.