中国建设企业银行官网站,电子商务网站系统,免费影视网站入口大全,根据网站软件做报告Redis 实现消息队列 文章目录 Redis 实现消息队列导引1. 基于List结构的消息队列2. 基于PubSub的消息队列3. 基于Stream的消息队列(推荐)3.1 XADD3.2 XREAD3.3 XGROUP 导引
消息队列(Message Queue)#xff0c;从概念上来理解就是用来存放消息的队列#xff0c;最简单的消息…Redis 实现消息队列 文章目录 Redis 实现消息队列导引1. 基于List结构的消息队列2. 基于PubSub的消息队列3. 基于Stream的消息队列(推荐)3.1 XADD3.2 XREAD3.3 XGROUP 导引
消息队列(Message Queue)从概念上来理解就是用来存放消息的队列最简单的消息队列模型包括以下三个角色
生产者发送消息到消息队列消息队列存储和管理信息也被称为消息代理(Message Broker)消费者从消息队列中获取消息并处理消息
而Redis也为我们提供了三种不同的方式来实现消息队列
List结构基于List结构模拟消息队列PubSub基本的点对点消息模型Stream比较完善的消息队列模型推荐
1. 基于List结构的消息队列
这种方式比较简单因为Redis的list数据结构是一个双向链表很容易模拟出队列的效果。 队列是入口和出口不在一边对此我们可以利用LPUSH 结合 BRPOP或者 RPUSH 结合 BLPOP 来实现先进先出的效果 注这里使用BRPOP而不是RPOP是因为BRPOP能够实现阻塞的效果而RPOP不能
使用该方式实现消息队列的优缺点如下
优点
利用Redis存储不受限与JVM内存上限基于Redis的持久化机制数据安全性有保证能够满足消息有序性
缺点:
无法避免消息丢失只支持单消费者
2. 基于PubSub的消息队列
PubSub(发布订阅)是Redis2.0版本引入的消息传递模型消费者可以订阅一个或多个channel(频道)生产者向对应channel发送消息后所有订阅者都能收到相关消息
它有以下命令 SUBSCRIBE channel [channel]订阅一个或多个频道 PUBLISH channel msg向一个频道发送消息 PSUBSCRIBE pattern[pattern]订阅与pattern格式匹配的所有频道
具体操作如下所示 该方式实现的消息队列支持多消费者的使用但也存在着以下弊端
不能支持数据持久化一旦redis宕机数据就会丢失无法避免消息丢失消息堆积有上限超出上限后数据会丢失
3. 基于Stream的消息队列(推荐)
Stream是Redis5.0引入的一种新数据类型能够实现功能完善的消息队列因为它本身就是一个消息队列所以我们可以直接通过命令来使用它
3.1 XADD 作用发送消息 其中 key队列名称 [NOMKSTREAM]如果队列不存在是否自动创建队列默认是自动创建 [MAXLEN|MINID [|~] threshold [LIMIT count]]设置消息队列的最大消息数量 *|ID消息的唯一id表示由Redis自动生成格式是“时间戳-递增数字”一般推荐使用来自动生成 field value [field value …]发送到队列中的消息以键值对的格式录入可以多个同时录入
举个栗子 创建一个名为 users 的队列并向其中发送一个消息内容是{nameJson, age25}并使用Redis自动生成ID 3.2 XREAD 作用读取消息 其中
[COUNT count]指定每次读取消息的最大数量[BLOCK milliseconds]当队列中没有消息时阻塞指定时长单位为秒STREAMS key [key …]要从哪个队列中读取消息key就是队列名可以指定多个队列ID [ID …]起始ID只返回大于该ID的消息其中0代表从第一个消息开始$代表从最新的消息开始
举个栗子 读取users队列中的第一个消息 注在上述测试中我们只往users队列中添加了一个消息这个时候如果ID使用$来获取最新消息且设置了阻塞等待的话此时读取信息将在阻塞时间过后返回空: 3.3 XGROUP 消费者组一个消费者组中可以有多个消费者来操作同一个消息队列 通常由以下命令组成 创建消费者组: XGROUP Create key groupName ID [MKSTREAM]其中 key队列名称groupName消费者组名称ID起始ID标识0代表队列中第一个消息$代表队列最后一个消息MKSTREAM队列不存在时自动创建队列 注这里要求队列key已经存在才能创建消费者组否则需要开启MKSTREAM让其自动创建新的队列 删除指定的消费者组 XGROUP Destroy key groupName给指定消费者组添加消费者 XGROUP CREATECONSUMER key groupName consumerName删除指定消费者组中的消费者 XGROUP DELCONSUMER key groupName consumerName从消费者组中读取消息 XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]其中 group消费者组名称consumer消费者名称如果消费者不存在会根据该名称自动创建一个消费者count本次查询的最大数量BLOCK milliseconds阻塞时间没有消息时会进行等待以毫秒为单位NOACK选择后无需手动ACK获取到消息后自动确认一般不建议设置当我们获取完消息后需要手动确认ackSTREAMS key指定队列名称ID获取消息的起始ID它有以下情况 “”表示从下一个未消费的消息开始其它根据指定id从pending-list中获取消息pending-list用于专门存放那些已消费但未确认的消息例如此时ID为0表示获取pending-list中的第一个消息. 注同一个消费者组中的消费者读取同一个消息队列时若ID使用来读取则下一个读取的消息一定是前面的消费者没有读取到的消息直到消息队列中的消息都被读取过后最后一个读取的消费者返回nil 举个栗子 我们创建一个队列叫list再添加几条消息 创建一个消费者组g1监听list消息队列 通过XREADGROUP命令为消费者组g1添加消费者c1、c2、c3来读取list队列消息
可以看到同一个消费者组中的消费者它们都在获取同一个队列中的消息且ID使用来读取下一个读取的消息一定是前面的消费者没有读取到的消息直到消息全部被读完后只返回nil
注每读取完一条消息我们需要对它进行手动确认使其从pending-list中移除使用下述命令可以查看已读取但还未确认的消息
XPENDING key group [[IDLE min-idle-time] start end count [consumer]]我们查看一下list中还有多少未确认的消息 好的都没被确认所以需要我们手动去确认消息使其从pending-list中移除操作命令如下
XACK key group ID [ID ...]上述的ID为添加消息时自动创建并返回的ID 这样所有已读取的消息就会从pending-list中移除了
这里贴上该消息队列在Java中的实现方式
//获取消息队列中的信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000ms STEAMS list
ListMapRecordString, Object, Object list stringRedisTemplate.opsForStream().read(Consumer.from(g1, c1),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(list, ReadOffset.lastConsumed())
);//ACK确认 SACK list g1 id
// 注因为这里我们只从消息队列中获取一条信息(COUNT 1)所以list.get()使用索引0即可
stringRedisTemplate.opsForStream().acknowledge(list, g1, list.get(0).getId()); //获取pending_list中的消息 XREADGROUP GROUP g1 c1 COUNT 1 STEAMS list 0
ListMapRecordString, Object, Object list stringRedisTemplate.opsForStream().read(Consumer.from(g1, c1),StreamReadOptions.empty().count(1),StreamOffset.create(list, ReadOffset.from(0))
);
// 上述代码可以配合循环实现被消费者组不断监听的消息队列以上便是对Redis实现消息队列的介绍了如果内容对大家有帮助的话请给这篇文章一个三连关注吧( •̀ ω •́ )✧✨