redis

Redis pub/sub worker bus for distributed workers.

class pipecat.bus.network.redis.RedisBus(*, redis: Redis, serializer: MessageSerializer | None = None, channel: str = 'pipecat:bus', **kwargs)[source]

Bases: WorkerBus

Distributed worker bus backed by Redis pub/sub.

Publishes serialized messages to a Redis channel for cross-process communication. BusLocalMessage messages bypass Redis and are delivered directly to local subscribers.

Requires the redis[hiredis] package (redis.asyncio).

Example:

from redis.asyncio import Redis

redis = Redis.from_url("redis://localhost:6379")
bus = RedisBus(redis=redis, channel="my-session")
__init__(*, redis: Redis, serializer: MessageSerializer | None = None, channel: str = 'pipecat:bus', **kwargs)[source]

Initialize the RedisBus.

Parameters:
  • redis – A redis.asyncio.Redis client instance.

  • serializer – The MessageSerializer for encoding/decoding messages. Defaults to JSONMessageSerializer.

  • channel – The Redis pub/sub channel name. Defaults to "pipecat:bus".

  • **kwargs – Additional arguments passed to WorkerBus.

async start()[source]

Subscribe to Redis channel and start the reader task.

async stop()[source]

Stop the reader task and unsubscribe from Redis.

async publish(message: BusMessage) None[source]

Publish a message to the Redis channel.

Parameters:

message – The bus message to publish.