饮料网站建设,成都优化官网推广,实验教学网站的建设研究,中国房地产网站一、概述 RocketMQ可以一次性发送一组消息#xff0c;那么这一组消息会被当做一个消息进行消费。
二、案例代码
2.1、pom 同系列五
2.2、RocketMQConstant 同系列五
2.3、BatchConsumer
package org.star.batch.consumer;import cn.hutool.core.util.StrUtil;
import lom…一、概述 RocketMQ可以一次性发送一组消息那么这一组消息会被当做一个消息进行消费。
二、案例代码
2.1、pom 同系列五
2.2、RocketMQConstant 同系列五
2.3、BatchConsumer
package org.star.batch.consumer;import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.star.constants.RocketMQConstant;import java.util.List;/*** Author: 一叶浮萍归大海* Date: 2023/8/30 09:40* Description: 批量消息消费者*/
Slf4j
public class BatchConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(BatchConsumerGroup);consumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);consumer.subscribe(BatchTopic,*);consumer.setMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt list, ConsumeConcurrentlyContext context) {if (CollectionUtils.isNotEmpty(list)) {String body StrUtil.utf8Str(list.get(0).getBody());log.info(Thread.currentThread().getName() 收到消息 body:{},body);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();log.info(BatchConsumer start success!);}}2.4、BatchProducer
package org.star.batch.producer;import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.star.constants.RocketMQConstant;import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;/*** Author: 一叶浮萍归大海* Date: 2023/8/30 09:31* Description: 批量消息生产者* 批量消息是指将多条消息合并成一个批量消息一次发送出去这样的好处是可以减少网络IO提升吞吐量。* 注意事项* 1、消息大小不能超过4MB虽然源码注释不能超过1MB但是实际使用不超过4MB即可建议保持在1MB左右* 2、相同的Topic* 3、相同的waitStoreMsgOK* 4、不能是延迟消息事务消息等*/
Slf4j
public class BatchProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer new DefaultMQProducer(BatchProducerGroup);producer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);producer.start();log.info(BatchProducer start success);ListMessage messages new ArrayList();for (int i 0; i 3; i) {Message message new Message(BatchTopic, (我是批量消息消息编号[ i ]).getBytes(StandardCharsets.UTF_8));messages.add(message);}SendResult result producer.send(messages);log.info(BatchProducer sendStatus:{},queueId:{},msgId:{}, result.getSendStatus(), result.getMessageQueue().getQueueId(), result.getMsgId());producer.shutdown();}}2.5、控制台打印
# 生产者端
22:49:44.236 [main] INFO org.star.batch.producer.BatchProducer - BatchProducer start success
22:49:45.625 [main] INFO org.star.batch.producer.BatchProducer - BatchProducer sendStatus:SEND_OK,queueId:0,msgId:C0A81FB287E018B4AAC20F32C5870000,C0A81FB287E018B4AAC20F32C5880001,C0A81FB287E018B4AAC20F32C5880002# 消费者端
22:47:20.127 [main] INFO org.star.batch.consumer.BatchConsumer - BatchConsumer start success!
22:49:45.644 [ConsumeMessageThread_2] INFO org.star.batch.consumer.BatchConsumer - ConsumeMessageThread_2 收到消息 body:我是批量消息消息编号[1]
22:49:45.644 [ConsumeMessageThread_3] INFO org.star.batch.consumer.BatchConsumer - ConsumeMessageThread_3 收到消息 body:我是批量消息消息编号[2]
22:49:45.644 [ConsumeMessageThread_1] INFO org.star.batch.consumer.BatchConsumer - ConsumeMessageThread_1 收到消息 body:我是批量消息消息编号[0]