Redis进阶(二)实现消息队列

Redis进阶(二)实现消息队列
最新回答
爱生活爱***

2023-10-17 05:14:14

Redis可通过List和Stream两种数据结构实现消息队列,分别适用于简单和高阶场景,其中Stream结构支持持久化、多播分组消费及消息确认等高级特性。

List结构实现消息队列
  • 核心命令:使用lpush命令发送消息,brpop命令阻塞式消费消息。
  • 特点

    简单高效:基于List的栈操作,生产者通过lpush将消息推入队列头部,消费者通过brpop从队列尾部阻塞获取消息。

    无持久化:队列为空时,Redis中对应的key会被自动删除,消息无法持久化。

    单消费者模式:默认不支持多消费者同时消费,需额外实现负载均衡逻辑。

  • Python实现示例:import redisimport timeredis_client = redis.Redis(host='localhost', port=6379, db=0)QUEUE_NAME = 'list_queue_task'def producer(message): redis_client.lpush(QUEUE_NAME, message) print(f"Produced: {message}")def consumer_blocking(): while True: result = redis_client.brpop(QUEUE_NAME) if result: queue_name, message = result print(f"Consumed: {message.decode('utf-8')}")if __name__ == "__main__": for i in range(5): producer(f"Message {i}") time.sleep(2) consumer_blocking()

Stream结构实现消息队列
  • 核心命令:使用xadd命令发送消息,xread命令阻塞式消费消息,支持消费组和消息确认。
  • 特点

    持久化存储:消息以流形式持久化在Redis中,即使服务器重启也能恢复。

    有序性:消息按时间戳生成唯一ID,保证严格顺序。

    多播与分组消费:支持多个消费者通过消费组(Consumer Group)并行消费,避免重复处理。

    消息确认机制:消费者通过XACK确认消息处理状态,确保至少消费一次。

    阻塞读取:xread支持阻塞模式,无消息时等待指定时间。

    消息回溯:可按消息ID或时间范围查询历史消息,便于补数或调试。

  • Python实现示例:import redisimport timeredis_client = redis.Redis(host='localhost', port=6379, db=0)STREAM_NAME = 'my_stream'def producer(message): redis_client.xadd(STREAM_NAME, {'message': message}) print(f"Produced: {message}")def consumer(): last_id = '0' # 从最早消息开始消费,改为'$'则从最新开始 while True: response = redis_client.xread({STREAM_NAME: last_id}, block=5000, count=1) if response: stream_name, messages = response[0] for msg_id, msg_data in messages: print(f"Consumed: {msg_data[b'message'].decode('utf-8')} (ID: {msg_id.decode('utf-8')})") last_id = msg_id # 更新最后消费ID else: print("No new message, waiting...")if __name__ == "__main__": for i in range(5): producer(f"Message {i}") time.sleep(2.5) consumer()

List与Stream对比
  • 持久化

    List:队列为空时key自动删除,消息不持久化。

    Stream:消息永久存储,可通过XDEL和XTRIM手动清理。

  • 消费模式

    List:默认单消费者,需自行实现负载均衡。

    Stream:支持消费组,多消费者自动分配消息。

  • 消息确认

    List:无确认机制,消费者崩溃可能导致消息丢失。

    Stream:通过XACK确认消费状态,支持消息重试。

  • 适用场景

    List:简单任务队列、临时消息处理。

    Stream:复杂业务系统、需要可靠性和高并发的场景。

总结
  • List结构适合对可靠性要求不高的简单队列场景,实现简单且性能高。
  • Stream结构适合企业级应用,提供持久化、多播消费和消息确认等高级特性,确保消息可靠处理。