青岛城阳网站设计,最好用的网站推广经验,龙华住房和建设局网站,销售管理软件永久免费文章目录 批量消息BatchProducer.javaBatchConsumer.java 批量消息
批量发送可以提⾼发送性能#xff0c;但有⼀定的限制#xff1a; topic 相同 waitStoreMsgOK 相同 #xff08;⾸先我们建设消息的iswaitstoremsgoktrue(默认为true), 如果没有异常,我们将始终收到O… 文章目录 批量消息BatchProducer.javaBatchConsumer.java 批量消息
批量发送可以提⾼发送性能但有⼀定的限制 topic 相同 waitStoreMsgOK 相同 ⾸先我们建设消息的iswaitstoremsgoktrue(默认为true), 如果没有异常,我们将始终收到OKorg.apache.rocketmq.common.message.Message#isWaitStoreMsgOK 不支持延时发送 ⼀批消息的大小不能⼤于 4M(DefaultMQProducer.maxMessageSize) 大小限制需要特殊注意因为消息是动态的不注意的话就可能超限就会报错 计算消息的大小 (topic body key value) * N) * 吞吐量
int tmpSize message.getTopic().length() message.getBody().length;
MapString, String properties message.getProperties();
for (Map.EntryString, String entry : properties.entrySet()) {tmpSize entry.getKey().length() entry.getValue().length();
}BatchProducer.java
package com.example.rocketmq.demo.batch;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;public class BatchProducer {public static void main(String[] args) throws Exception {//Instantiate with a producer group name.DefaultMQProducer producer newDefaultMQProducer(please_rename_unique_group_name);// Specify name server addresses.producer.setNamesrvAddr(localhost:9876);//Launch the instance.producer.start();String topic TopicTest;ListMessage messages new ArrayList();messages.add(new Message(topic, TagA, OrderID001, Hello world 0.getBytes()));messages.add(new Message(topic, TagA, OrderID002, Hello world 1.getBytes()));messages.add(new Message(topic, TagA, OrderID003, Hello world 2.getBytes()));//then you could split the large list into small ones:ListSplitter splitter new ListSplitter(messages);while (splitter.hasNext()) {try {ListMessage listItem splitter.next();SendResult sendResult producer.send(listItem);System.out.printf(%s%n, sendResult);} catch (Exception e) {e.printStackTrace();//handle the error}}//Shut down once the producer instance is not longer in use.producer.shutdown();}
}class ListSplitter implements IteratorListMessage {private final int SIZE_LIMIT 1024 * 1024 * 4;private final ListMessage messages;private int currIndex;public ListSplitter(ListMessage messages) {this.messages messages;}Override public boolean hasNext() {return currIndex messages.size();}Override public ListMessage next() {int nextIndex currIndex;int totalSize 0;for (; nextIndex messages.size(); nextIndex) {Message message messages.get(nextIndex);//计算消息的大小 (topic body key value) * N) * 吞吐量int tmpSize message.getTopic().length() message.getBody().length;//属性值的添加MapString, String properties message.getProperties();for (Map.EntryString, String entry : properties.entrySet()) {//key valuetmpSize entry.getKey().length() entry.getValue().length();}tmpSize tmpSize 20; //for log overheadif (tmpSize SIZE_LIMIT) {//it is unexpected that single message exceeds the SIZE_LIMIT//here just let it go, otherwise it will block the splitting processif (nextIndex - currIndex 0) {//if the next sublist has no element, add this one and then break, otherwise just breaknextIndex;}break;}if (tmpSize totalSize SIZE_LIMIT) {break;} else {totalSize tmpSize;}}ListMessage subList messages.subList(currIndex, nextIndex);currIndex nextIndex;return subList;}
}
BatchConsumer.java
package com.example.rocketmq.demo.batch;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class BatchConsumer {public static void main(String[] args) throws InterruptedException, MQClientException {// Instantiate with specified consumer group name.DefaultMQPushConsumer consumer new DefaultMQPushConsumer(please_rename_unique_group_name);// Specify name server addresses.consumer.setNamesrvAddr(localhost:9876);// Subscribe one more more topics to consume.consumer.subscribe(TopicTest, *);// Register callback to execute on arrival of messages fetched from brokers.consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs,ConsumeConcurrentlyContext context) {System.out.printf(%s Receive New Messages: %s %n, Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//Launch the consumer instance.consumer.start();System.out.printf(Consumer Started.%n);}
}