当前位置: 首页 > news >正文

巅峰网站建设扬州网站建设link5

巅峰网站建设,扬州网站建设link5,wordpress help主题,aso投放平台RocketMQ-消息消费模式 顺序消费RocketMQ-消息消费模式集群模式集群模式的演示(本身就默认)Rocketmq存储队列广播模式顺序消费如何改实现顺序消费RocketMQ-消息消费模式 集群模式 在消费模式为集群的情况下,如果机器是集群的,消息只会给集群中的其中一台机器消费到 集群模… RocketMQ-消息消费模式 顺序消费RocketMQ-消息消费模式集群模式集群模式的演示(本身就默认)Rocketmq存储队列广播模式顺序消费如何改实现顺序消费RocketMQ-消息消费模式 集群模式 在消费模式为集群的情况下,如果机器是集群的,消息只会给集群中的其中一台机器消费到 集群模式的演示(本身就默认) 假设我们生产者生产了十条信息 当我们集群了两台消费者服务器的时候就会每个服务器执行五条 Rocketmq存储队列 在消息中间件每个topic是有4个写和读队列,主要是解决并发性能的问题的 如果只有一个队列保证线程安全必须得给队列进行写操作的时候上锁。 多几个队列降低并发度等待时间就短一些。 为什么是四个队列? 因为大多数服务器只有四核意味着同时最多只能有CPU同时工作 广播模式 在消费模式为集群的情况下如果机器是集群的消费是会给集群中的所有机器所消费到 public class Consumer {public static void main(String[] args) throws Exception {//定义消息消费者(在同一个JVM中,消费者的组名不能重复)DefaultMQPushConsumer consumer new DefaultMQPushConsumer(helloConsumerGroup);//设置nameServer地址consumer.setNamesrvAddr(43.143.161.59:9876);//设置订阅的主题consumer.subscribe(helloTopic,*);//设置消费模式consumer.setMessageModel(MessageModel.BROADCASTING);//设置消息的监听器consumer.setMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for(MessageExt msg:list){String s new String(msg.getBody(), Charset.defaultCharset());System.out.println(线程:Thread.currentThread(),消息的内容:s);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者consumer.start();} }运行结果 生产者发送了十条消息之后消费者集群的每个服务器均收到十条数据 顺序消费 实现生产顺序:12345消费顺序12345 哪些消息要实现顺序消费,就要让那些消息进入到同一个队列当中,对于消费者来说,一个队列对于一个线程 假设我们没有实现顺序消费的时候 创建生产者 1.创建实体类 Setter Getter public class OrderStep {private long orderId;private String desc;Overridepublic String toString() {return OrderStep{ orderId orderId , desc desc \ };} }2.创建测试类 public class OrderUtil {public static ListOrderStep buildOrders(){ListOrderStep orderList new ArrayListOrderStep();OrderStep orderDemo new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc(创建);orderList.add(orderDemo);orderDemo new OrderStep();orderDemo.setOrderId(15103111065L);orderDemo.setDesc(创建);orderList.add(orderDemo);orderDemo new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc(付款);orderList.add(orderDemo);orderDemo new OrderStep();orderDemo.setOrderId(15103117235L);orderDemo.setDesc(创建);orderList.add(orderDemo);orderDemo new OrderStep();orderDemo.setOrderId(15103111065L);orderDemo.setDesc(付款);orderList.add(orderDemo);orderDemo new OrderStep();orderDemo.setOrderId(15103117235L);orderDemo.setDesc(付款);orderList.add(orderDemo);orderDemo new OrderStep();orderDemo.setOrderId(15103111065L);orderDemo.setDesc(完成);orderList.add(orderDemo);orderDemo new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc(推送);orderList.add(orderDemo);orderDemo new OrderStep();orderDemo.setOrderId(15103117235L);orderDemo.setDesc(完成);orderList.add(orderDemo);orderDemo new OrderStep();orderDemo.setOrderId(15103111039L);orderDemo.setDesc(完成);orderList.add(orderDemo);return orderList;} }3.创建生产者类 public class Producer {public static void main(String[] args) throws Exception {DefaultMQProducer producer new DefaultMQProducer(orderlyProducerGroup);producer.setNamesrvAddr(43.143.161.59:9876);producer.start();String topic orderTopic;ListOrderStep orderSteps OrderUtil.buildOrders();for(OrderStep step:orderSteps){Message msg new Message(topic,step.toString().getBytes(Charset.defaultCharset()));producer.sendOneway(msg);}producer.shutdown();} }创建消费者类 public class Consumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(orderlyConsumerGroup);consumer.setNamesrvAddr(43.143.161.59:9876);consumer.subscribe(orderTopic,*);consumer.setMessageModel(MessageModel.BROADCASTING);consumer.setMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for(MessageExt msg:list){String s new String(msg.getBody(), Charset.defaultCharset());System.out.println(线程:Thread.currentThread(),消息的内容:s);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者consumer.start();} }运行结果 可以看出和我们生产数据的顺序完全不同整个订单的顺序都反了 如何改实现顺序消费 生产者类 public class Producer {public static void main(String[] args) throws Exception {DefaultMQProducer producer new DefaultMQProducer(orderlyProducerGroup);producer.setNamesrvAddr(43.143.161.59:9876);producer.start();String topic orderTopic;ListOrderStep orderSteps OrderUtil.buildOrders();//设置队列选择器MessageQueueSelector selector new MessageQueueSelector() {Overridepublic MessageQueue select(ListMessageQueue list, Message message, Object o) {System.out.println(队列个数list.size());Long orderId (Long) o;int index (int)(orderId % list.size());return list.get(index);}};for(OrderStep step:orderSteps){Message msg new Message(topic,step.toString().getBytes(Charset.defaultCharset()));//指定消息选择器,换入的参数producer.send(msg,selector,step.getOrderId());}producer.shutdown();} }消费者类 public class Consumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(orderlyConsumerGroup);consumer.setNamesrvAddr(43.143.161.59:9876);consumer.subscribe(orderTopic,*);//从什么地方开始消费consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//一个队列对应一个线程consumer.setMessageListener(new MessageListenerOrderly() {Overridepublic ConsumeOrderlyStatus consumeMessage(ListMessageExt list, ConsumeOrderlyContext consumeOrderlyContext) {for(MessageExt msg:list){System.out.println(当前线程Thread.currentThread():,队列IDmsg.getQueueId(),消息内容new String(msg.getBody(),Charset.defaultCharset()));}return ConsumeOrderlyStatus.SUCCESS;}});//启动消费者consumer.start();} }
http://www.dnsts.com.cn/news/147772.html

相关文章:

  • 可信网站认证 服务中心建筑八大员报名官网
  • 有记事本做简易网站手表网站哪个最好知乎
  • asp评价网站开发文档二手交易网站开发系统
  • 新华网站建设免费推广的渠道有哪些
  • 学校网站怎么建设威县做网站报价
  • 什么是门户类型的网站营销型网站网站
  • 网站添加 备案wordpress 关闭顶部
  • 海山免费网站建设建设一个电商网站的流程图
  • 网页设计公司婚庆网站模板下载app公司开发价格
  • 网站建设哪些职位陇南网站网站建设
  • 六安网站建设培训上海外贸公司是国企吗
  • 大型网站建设开发设计公司品牌微信网站定制
  • 网站建设 内容wordpress 同步qq空间
  • 域名到期换个公司做网站it培训机构出来的好找工作吗
  • 厦门哪里有教网站建设网站编程 外包类型
  • 文案推广发布网站大全asp网站作业下载
  • 网站制作广告科技型中小企业服务平台
  • 赣州网站建设中心北京软件开发哪家好
  • 海东企业网站建设公司跨境电商网络营销是什么
  • 网站首页不见怎么做乌克兰最新消息今天
  • 山东住房建设部网站wordpress主题xueui
  • 个人网站二级域名做淘宝客网站建设服务器是什么意思
  • 兴义做网站建站合同模板
  • 电商学院建设设计网站网站风格的设计
  • 做电子书的网站很有名后来被关闭了烟台网站建设询问企汇互联专业
  • 非凡免费建网站平台下载个网上销售网站
  • 蓟县网站建设公司wordpress产品系统
  • 网站被攻击如何处理wordpress内页打不开
  • 网站建设自主建设falsh网站模板下载
  • 个人微信网站建设购物网站策划方案