巅峰网站建设,扬州网站建设link5,wordpress help主题,aso投放平台RocketMQ-消息消费模式 顺序消费RocketMQ-消息消费模式集群模式集群模式的演示(本身就默认)Rocketmq存储队列广播模式顺序消费如何改实现顺序消费RocketMQ-消息消费模式
集群模式
在消费模式为集群的情况下,如果机器是集群的,消息只会给集群中的其中一台机器消费到
集群模…
RocketMQ-消息消费模式 顺序消费RocketMQ-消息消费模式集群模式集群模式的演示(本身就默认)Rocketmq存储队列广播模式顺序消费如何改实现顺序消费RocketMQ-消息消费模式
集群模式
在消费模式为集群的情况下,如果机器是集群的,消息只会给集群中的其中一台机器消费到
集群模式的演示(本身就默认) 假设我们生产者生产了十条信息 当我们集群了两台消费者服务器的时候就会每个服务器执行五条 Rocketmq存储队列
在消息中间件每个topic是有4个写和读队列,主要是解决并发性能的问题的 如果只有一个队列保证线程安全必须得给队列进行写操作的时候上锁。 多几个队列降低并发度等待时间就短一些。 为什么是四个队列? 因为大多数服务器只有四核意味着同时最多只能有CPU同时工作
广播模式
在消费模式为集群的情况下如果机器是集群的消费是会给集群中的所有机器所消费到
public class Consumer {public static void main(String[] args) throws Exception {//定义消息消费者(在同一个JVM中,消费者的组名不能重复)DefaultMQPushConsumer consumer new DefaultMQPushConsumer(helloConsumerGroup);//设置nameServer地址consumer.setNamesrvAddr(43.143.161.59:9876);//设置订阅的主题consumer.subscribe(helloTopic,*);//设置消费模式consumer.setMessageModel(MessageModel.BROADCASTING);//设置消息的监听器consumer.setMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for(MessageExt msg:list){String s new String(msg.getBody(), Charset.defaultCharset());System.out.println(线程:Thread.currentThread(),消息的内容:s);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者consumer.start();}
}运行结果 生产者发送了十条消息之后消费者集群的每个服务器均收到十条数据 顺序消费
实现生产顺序:12345消费顺序12345 哪些消息要实现顺序消费,就要让那些消息进入到同一个队列当中,对于消费者来说,一个队列对于一个线程 假设我们没有实现顺序消费的时候 创建生产者 1.创建实体类 Setter
Getter
public class OrderStep {private long orderId;private String desc;Overridepublic String toString() {return OrderStep{ orderId orderId , desc desc \ };}
}2.创建测试类 public class OrderUtil {public static ListOrderStep buildOrders(){ListOrderStep orderList new ArrayListOrderStep();OrderStep orderDemo new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc(创建);orderList.add(orderDemo);orderDemo new OrderStep();orderDemo.setOrderId(15103111065L);orderDemo.setDesc(创建);orderList.add(orderDemo);orderDemo new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc(付款);orderList.add(orderDemo);orderDemo new OrderStep();orderDemo.setOrderId(15103117235L);orderDemo.setDesc(创建);orderList.add(orderDemo);orderDemo new OrderStep();orderDemo.setOrderId(15103111065L);orderDemo.setDesc(付款);orderList.add(orderDemo);orderDemo new OrderStep();orderDemo.setOrderId(15103117235L);orderDemo.setDesc(付款);orderList.add(orderDemo);orderDemo new OrderStep();orderDemo.setOrderId(15103111065L);orderDemo.setDesc(完成);orderList.add(orderDemo);orderDemo new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc(推送);orderList.add(orderDemo);orderDemo new OrderStep();orderDemo.setOrderId(15103117235L);orderDemo.setDesc(完成);orderList.add(orderDemo);orderDemo new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc(完成);orderList.add(orderDemo);return orderList;}
}3.创建生产者类 public class Producer {public static void main(String[] args) throws Exception {DefaultMQProducer producer new DefaultMQProducer(orderlyProducerGroup);producer.setNamesrvAddr(43.143.161.59:9876);producer.start();String topic orderTopic;ListOrderStep orderSteps OrderUtil.buildOrders();for(OrderStep step:orderSteps){Message msg new Message(topic,step.toString().getBytes(Charset.defaultCharset()));producer.sendOneway(msg);}producer.shutdown();}
}创建消费者类
public class Consumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(orderlyConsumerGroup);consumer.setNamesrvAddr(43.143.161.59:9876);consumer.subscribe(orderTopic,*);consumer.setMessageModel(MessageModel.BROADCASTING);consumer.setMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for(MessageExt msg:list){String s new String(msg.getBody(), Charset.defaultCharset());System.out.println(线程:Thread.currentThread(),消息的内容:s);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者consumer.start();}
}运行结果 可以看出和我们生产数据的顺序完全不同整个订单的顺序都反了
如何改实现顺序消费 生产者类 public class Producer {public static void main(String[] args) throws Exception {DefaultMQProducer producer new DefaultMQProducer(orderlyProducerGroup);producer.setNamesrvAddr(43.143.161.59:9876);producer.start();String topic orderTopic;ListOrderStep orderSteps OrderUtil.buildOrders();//设置队列选择器MessageQueueSelector selector new MessageQueueSelector() {Overridepublic MessageQueue select(ListMessageQueue list, Message message, Object o) {System.out.println(队列个数list.size());Long orderId (Long) o;int index (int)(orderId % list.size());return list.get(index);}};for(OrderStep step:orderSteps){Message msg new Message(topic,step.toString().getBytes(Charset.defaultCharset()));//指定消息选择器,换入的参数producer.send(msg,selector,step.getOrderId());}producer.shutdown();}
}消费者类 public class Consumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(orderlyConsumerGroup);consumer.setNamesrvAddr(43.143.161.59:9876);consumer.subscribe(orderTopic,*);//从什么地方开始消费consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//一个队列对应一个线程consumer.setMessageListener(new MessageListenerOrderly() {Overridepublic ConsumeOrderlyStatus consumeMessage(ListMessageExt list, ConsumeOrderlyContext consumeOrderlyContext) {for(MessageExt msg:list){System.out.println(当前线程Thread.currentThread():,队列IDmsg.getQueueId(),消息内容new String(msg.getBody(),Charset.defaultCharset()));}return ConsumeOrderlyStatus.SUCCESS;}});//启动消费者consumer.start();}
}