云南省城乡与住房建设厅网站,网站建设网页设计公司,五种类型网站,网站建设毕业实践设计报告Redis实现消息队列的方式汇总以及代码实现 前言开始前准备1、添加依赖2、添加配置的Bean 具体实现一、从最简单的开始#xff1a;List 队列代码实现 二、发布订阅模式#xff1a;Pub/Sub1、使用RedisMessageListenerContainer实现订阅2、还可以使用redisTemplate实现订阅 三、… Redis实现消息队列的方式汇总以及代码实现 前言开始前准备1、添加依赖2、添加配置的Bean 具体实现一、从最简单的开始List 队列代码实现 二、发布订阅模式Pub/Sub1、使用RedisMessageListenerContainer实现订阅2、还可以使用redisTemplate实现订阅 三、、 趋于成熟的队列Stream具体java代码实现 总结 前言
经常听到很多人讨论关于「把 Redis 当作队列来用是否合适」的问题。
有些人表示赞成他们认为 Redis 很轻量用作队列很方便。也些人则反对认为 Redis 会「丢」数据最好还是用「专业」的队列中间件更稳妥。
这篇文章就聊一聊把 Redis 当作队列究竟是否合适这个问题。我们会从简单到复杂一步步带你梳理其中的细节把常用的实现方式展现一遍。
开始前准备
1、添加依赖 dependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-data-redis/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdcn.hutool/groupIdartifactIdhutool-all/artifactIdversion5.4.0/version/dependency2、添加配置的Bean
避免不方便用软件查看存储的数据 /*** redisTemplate 序列化使用的jdkSerializeable, 存储二进制字节码, 所以自定义序列化类* param redisConnectionFactory* return*/Beanpublic RedisTemplateString, Object redisTemplate(RedisConnectionFactory redisConnectionFactory) {RedisTemplateString, Object redisTemplate new RedisTemplate();redisTemplate.setConnectionFactory(redisConnectionFactory);// 使用Jackson2JsonRedisSerialize 替换默认序列化Jackson2JsonRedisSerializer jackson2JsonRedisSerializer new Jackson2JsonRedisSerializer(Object.class);ObjectMapper objectMapper new ObjectMapper();objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);jackson2JsonRedisSerializer.setObjectMapper(objectMapper);// 设置value的序列化规则和 key的序列化规则redisTemplate.setKeySerializer(new StringRedisSerializer());//jackson2JsonRedisSerializer就是JSON序列号规则redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);redisTemplate.afterPropertiesSet();return redisTemplate;}具体实现
一、从最简单的开始List 队列
首先我们先从最简单的场景开始讲起。
如果你的业务需求足够简单想把 Redis 当作队列来使用肯定最先想到的就是使用 List 这个数据类型。
因为 List 底层的实现就是一个「链表」在头部和尾部操作元素时间复杂度都是 O(1)这意味着它非常符合消息队列的模型。
如果把 List 当作队列你可以这么来用。
代码实现
生产者端读取
RestController
RequestMapping(/redis01)
public class RedisTest1 {Autowiredprivate RedisTemplateString, Object redisTemplate;//LPUSH 发布消息GetMapping(/set)public void set(String code){redisTemplate.opsForList().leftPush(code,code);}// RPOP 拉取消息GetMapping(/get1)public String get1(String key){Object code redisTemplate.opsForList().rightPop(key);if (code!null){return code.toString();}return redis中没数据;}实现模型 这个模型也非常简单容易理解。 但这里有个小问题当队列中已经没有消息了消费者在执行 RPOP 时会返回 NULL。 一般在编写消费者时会采用一个死循环这个实现方式就是不断去队列中拉取数据。 GetMapping(/get2)public String get2(String key) throws InterruptedException {while (true){Object code redisTemplate.opsForList().rightPop(key);System.out.println(code);// 读取到消息退出没读到继续循环if (code!null){return code.toString();}}}如果此时队列为空那消费者依旧会频繁拉取消息这会造成「CPU 空转」不仅浪费 CPU 资源还会对 Redis 造成压力。
怎么解决这个问题呢
也很简单当队列为空时我们可以「休眠」一会再去尝试拉取消息。代码可以修改成这样 GetMapping(/get2)public String get2(String key) throws InterruptedException {while (true){Object code redisTemplate.opsForList().rightPop(key);System.out.println(code);// 读取到消息退出没读到继续循环if (code!null){return code.toString();}Thread.sleep(2000);}}这就解决了 CPU 空转问题。
这个问题虽然解决了但又带来另外一个问题当消费者在休眠等待时有新消息来了那消费者处理新消息就会存在「延迟」。
假设设置的休眠时间是 2s那新消息最多存在 2s 的延迟。
要想缩短这个延迟只能减小休眠的时间。但休眠时间越小又有可能引发 CPU 空转问题。
鱼和熊掌不可兼得。
那如何做既能及时处理新消息还能避免 CPU 空转呢
Redis 是否存在这样一种机制如果队列为空消费者在拉取消息时就「阻塞等待」一旦有新消息过来就通知我的消费者立即处理新消息呢
幸运的是Redis 确实提供了「阻塞式」拉取消息的命令BRPOP / BLPOP这里的 B 指的是阻塞Block。 在java中也已经封装好了调用pop方法时直接设置一个过期时间就行 GetMapping(/get3)public String get3(String key) throws InterruptedException {Object code redisTemplate.opsForList().rightPop(key,0, TimeUnit.SECONDS);if (codenull){return 数据读取超时;}return code.toString();}使用 BRPOP 这种阻塞式方式拉取消息时还支持传入一个「超时时间」如果设置为 0则表示不设置超时直到有新消息才返回否则会在指定的超时时间后返回 NULL。
这个方案不错既兼顾了效率还避免了 CPU 空转问题一举两得。 注意如果设置的超时时间太长这个连接太久没有活跃过可能会被 Redis Server 判定为无效连接之后 Redis Server 会强制把这个客户端踢下线。所以采用这种方案客户端要有重连机制。 解决了消息处理不及时的问题你可以再思考一下这种队列模型有什么缺点
我们一起来分析一下
不支持重复消费消费者拉取消息后这条消息就从 List 中删除了无法被其它消费者再次消费即不支持多个消费者消费同一批数据消息丢失消费者拉取到消息后如果发生异常宕机那这条消息就丢失了 第一个问题是功能上的使用 List 做消息队列它仅仅支持最简单的一组生产者对应一组消费者不能满足多组生产者和消费者的业务场景。
第二个问题就比较棘手了因为从 List 中 POP 一条消息出来后这条消息就会立即从链表中删除了。也就是说无论消费者是否处理成功这条消息都没办法再次消费了。
这也意味着如果消费者在处理消息时异常宕机那这条消息就相当于丢失了。
针对这 2 个问题怎么解决呢我们一个个来看。
二、发布订阅模式Pub/Sub
从名字就能看出来这个模块是 Redis 专门是针对「发布/订阅」这种队列模型设计的。
它正好可以解决前面提到的第一个问题重复消费。
即多组生产者、消费者的场景我们来看它是如何做的。
Redis 提供了 PUBLISH / SUBSCRIBE 命令来完成发布、订阅的操作。 依赖继续用前面的就行
1、使用RedisMessageListenerContainer实现订阅 通过实现MessageListener接口来处理接收到的消息。这允许您在Spring应用程序中以更高级的方式处理消息例如使用依赖注入和其他Spring功能。它还支持基于注解的消息监听器使消息处理更加简洁和灵活。该方式是Spring Data Redis库提供的方法用于在Spring应用程序中使用Redis的发布订阅功能。它需要创建一个MessageListenerContainer对象并通过调用addMessageListener方法来添加消息监听器。 添加监控器用于监听通道 为了便于更加直观的对比测试我添加了两个
/*** author zhengfuping* version 1.0* description: TODO 配置监控器* date 2023/7/28 17:10*/
Component
public class RedisMessaeListener1 implements MessageListener {Overridepublic void onMessage(Message message, byte[] pattern) {String channel new String(message.getChannel());String body new String(message.getBody());System.out.println(监听器1号消息: body 通道QQ: channel);}
}/*########################*//*** author zhengfuping* version 1.0* description: TODO 配置监控器1* date 2023/8/2 11:24*/
Component
public class RedisMessaeListener2 implements MessageListener {Overridepublic void onMessage(Message message, byte[] pattern) {String channel new String(message.getChannel());String body new String(message.getBody());System.out.println(监听器2号消息: body 通道QQ: channel);}
}
配置订阅可以单和多个 用于绑定主题通道和监听器该发送是
/*** author zhengfuping* version 1.0* description: TODO 使用RedisMessageListenerContainer直接注入到bean进行监听* date 2023/7/28 15:43*/
Configuration
public class RedisPubSubExample {Autowiredprivate RedisMessaeListener1 redisMessaeListener1;Autowiredprivate RedisMessaeListener2 redisMessaeListener2;/*** 订阅三个频道* author zhengfuping* date 2023/8/2 11:19* param redisConnectionFactory redis线程工厂* return RedisMessageListenerContainer*/
// Beanpublic RedisMessageListenerContainer subscribeToChannel(RedisConnectionFactory redisConnectionFactory){RedisMessageListenerContainer listenerContainer new RedisMessageListenerContainer();listenerContainer.setConnectionFactory(redisConnectionFactory);ListTopic list new ArrayList();list.add(new PatternTopic(TEST01));list.add(new PatternTopic(TEST02));list.add(new PatternTopic(TEST03));/** redisMessaeListener 消息监听器* list 订阅的主题可以单个和多个*/listenerContainer.addMessageListener(redisMessaeListener1,list);listenerContainer.addMessageListener(redisMessaeListener2,new PatternTopic(TEST01));return listenerContainer;}
}
向指定频道发送消息 /*** PUBLISH 发送消息到指定频道* author zhengfuping* date 2023/8/2 11:14* param channel 通道* param name 数据* param age* return Object*/GetMapping(/pub)public Object pub(String channel,String name,Integer age) {User user new User(name, age);redisTemplate.convertAndSend(channel,user);return user;}2、还可以使用redisTemplate实现订阅 该方式是Redis客户端的方法用于在独立的Redis客户端中直接使用发布订阅功能。它需要创建一个Redis连接对象并通过调用subscribe方法来订阅一个或多个频道。 如果您只是在独立的Redis客户端中使用发布订阅功能并且不需要使用Spring的其他功能则可以选择connection.subscribe /*** 自行添加订阅*/GetMapping(/sub)public void sub(String channel) {RedisConnection connection redisTemplate.getConnectionFactory().getConnection();/** MessageListener:监听器直接使用内部类实现绑定监听可以把数据传递出去* channel 订阅频道*/connection.subscribe((message, pattern) - {String channel1 new String(message.getChannel());String body new String(message.getBody());System.out.println(subscribe方式监听消息: body 通道QQ: channel1);}, channel.getBytes());// connection.close();}发送消息 GetMapping(/pub)public Object pub(String channel,String name,Integer age) {User user new User(name, age);redisTemplate.convertAndSend(channel,user);return user;}最终监听到的结果
三、、 趋于成熟的队列Stream
我们来看 Stream 是如何解决上面这些问题的。
我们依旧从简单到复杂依次来看 Stream 在做消息队列时是如何处理的
首先Stream 通过 XADD 和 XREAD 完成最简单的生产、消费模型
生产者发布 2 条消息
// *表示让Redis自动生成消息ID
127.0.0.1:6379 XADD queue * name zhangsan
1618469123380-0
127.0.0.1:6379 XADD queue * name lisi
1618469127777-0消费者拉取消息
// 从开头读取5条消息0-0表示从开头读取
127.0.0.1:6379 XREAD COUNT 5 STREAMS queue 0-0
1) 1) queue2) 1) 1) 1618469123380-02) 1) name2) zhangsan2) 1) 1618469127777-02) 1) name2) lisi流程图
具体java代码实现
先配置监听消息类
Slf4j
Component
public class ListenerMessage implements StreamListenerString, MapRecordString, String, String {Overridepublic void onMessage(MapRecordString, String, String entries) {log.info(接受到来自redis的消息);System.out.println(message id entries.getId());System.out.println(stream entries.getStream());System.out.println(body entries.getValue());}
}添加工具类实现初始化
Component
Slf4j
public class RedisStreamUtil {Autowiredprivate RedisTemplateString,Object redisTemplate;/*** author zhengfuping 添加数据* param streamKey* param map* return RecordId*/public RecordId addStream(String streamKey,MapString, Object map){RecordId recordId redisTemplate.opsForStream().add(streamKey, map);return recordId;}/*** 用来创建绑定流和组*/public void addGroup(String key, String groupName){redisTemplate.opsForStream().createGroup(key,groupName);}/*** 用来判断key是否存在*/public boolean hasKey(String key){if(keynull){return false;}else{return redisTemplate.hasKey(key);}}/*** 用来删除掉消费了的消息*/public void delField(String key,String recordIds){redisTemplate.opsForStream().delete(key,recordIds);}/*** 用来初始化 实现绑定*/public void initStream(String key, String group){//判断key是否存在如果不存在则创建boolean hasKey hasKey(key);if(!hasKey){MapString,Object map new HashMap();map.put(field,value);RecordId recordId addStream(key, map);addGroup(key,group); //把Stream和gropu绑定delField(key,recordId.getValue());log.info(stream:{}-group:{} initialize success,key,group);}}
}添加配置类配置Stream
/*** author zhengfuping* version 1.0* description: TODO 添加配置类配置Stream*/
Configuration
Slf4j
public class RedisStreamConfig {Autowiredprivate RedisStreamUtil redisStream;Autowiredprivate ListenerMessage listenerMessage;Beanpublic Subscription subscription(RedisConnectionFactory factory){
// 代码中的var是使用了Lombok的可变局部变量。主要是为了方便
// StreamMessageListenerContainer: 消息侦听容器不能在外部实现。创建后StreamMessageListenerContainer可以订阅Redis流并使用传入的消息var options StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder().pollTimeout(Duration.ofSeconds(1)).build();redisStream.initStream(mystream,mygroup); //调用初始化var listenerContainer StreamMessageListenerContainer.create(factory,options);/** 注意这里接受到消息后会被自动的确认如果不想自动确认请使用其他的创建订阅方式* 消费组 consumer group 它不能为null Consumer类型* stream offset stream的偏移量StreamOffset 类型* listener 不能为null StreamListenerK,V 类型*/var subscription listenerContainer.receiveAutoAck(Consumer.from(mygroup,huhailong),StreamOffset.create(mystream, ReadOffset.lastConsumed()),listenerMessage);listenerContainer.start();return subscription;}
}
调用测试
/*** author zhengfuping* version 1.0* description: TODO* date 2023/8/2 16:06*/
RestController
RequestMapping(/redisStream)
public class RedisStreamTest {Autowiredprivate RedisStreamUtil redisStream;GetMapping(add)public void add(String key,String data){MapString, Object map new HashMap();map.put(key,data);
// 添加数据到mystream流中RecordId recordId redisStream.addStream(mystream, map);
// 删除流中消费了的指定key的数据redisStream.delField(mystream,recordId.getValue());}
} Stream的好处在于可以写入到 RDB 和 AOF 做持久化。 Stream是新增加的数据类型它与其它数据类型一样每个写操作也都会写入到 RDB 和 AOF 中。 我们只需要配置好持久化策略这样的话就算 Redis 宕机重启Stream 中的数据也可以从 RDB 或 AOF 中恢复回来。 总结
好了总结一下。这篇文章我们从「Redis 能否用作队列」这个角度出发介绍了 List、Pub/Sub、Stream 在做队列的使用方式以及它们各自的优劣。
之后又把 Redis 和专业的消息队列中间件做对比发现 Redis 的不足之处。
最后我们得出 Redis 做队列的合适场景。