当前位置: 首页 > news >正文

番禺网站 建设信科网络湖南百度seo

番禺网站 建设信科网络,湖南百度seo,石家庄网站建设加q.479185700,漳州网站开发什么是延迟队列指消息发送到某个队列后#xff0c;在指定多长时间之后才能被消费。应用场景RocketMQ 延迟队列定时消息#xff08;延迟队列#xff09;是指消息发送到broker后#xff0c;不会立即被消费#xff0c;等待特定时间投递给真正的topic。broker有配置项messageD…什么是延迟队列指消息发送到某个队列后在指定多长时间之后才能被消费。应用场景RocketMQ 延迟队列定时消息延迟队列是指消息发送到broker后不会立即被消费等待特定时间投递给真正的topic。broker有配置项messageDelayLevel默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m8m 9m 10m 20m 30m 1h 2h”18个level。可以配置自定义messageDelayLevel。需要注意的是 messageDelayLevel是broker的属性不属于某个topic。发消息时设置delayLevel等级即可msg.setDelayLevel(level)level有以下三种情况level 0消息为非延迟消息1levelmaxLevel消息延迟特定时间例如level1延迟1slevel maxLevel则level maxLevel例如level20延迟2h在 RocketMQ中定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中并根据delayTimeLevel存入特定的queuequeueId delayTimeLevel – 1即一个queue只存相同延迟的消息保证具有相同发送延迟的消息能够顺序消费。broker会调度地消费SCHEDULE_TOPIC_XXXX将消息写入真实的topic。需要注意的是定时消息会在第一次写入和调度写入真实topic时都会计数因此发送数量、tps都会变高。RocketMQ 延迟队列和RabbitMQ延迟队列相比RocketMQ直接一步到位功能类似于RabbitMQ的延迟交换器插件、而RabbitMQ提供了延迟队列但如果是基于消息设置每个消息设置不同的延迟时间会产生前面的消息早已过期但后面的消息还存在消息队列中故此RabbitMQ提供了延迟交换器插件而RocketMQ 延迟队列设计就比较好了。具体示例生产者// 实例化生产者并指定生产组名称DefaultMQProducer producer newDefaultMQProducer(myproducer_group_topic_name_delay_01);//设置实例名称一个jvm中有多个生产者可以根据实例名区分//默认defaultproducer.setInstanceName(topic_delay);// 指定nameserver的地址producer.setNamesrvAddr(localhost:9876);//设置同步重试次数producer.setRetryTimesWhenSendFailed(2);//设置异步发送次数//producer.setRetryTimesWhenSendAsyncFailed(2);// 初始化生产者producer.start();for(int i 0; i 20; i){Message message newMessage(topic_name_delay,(key i).getBytes(utf-8));//设置延迟消费时间 设置延迟时间级别0,18,0表示不延迟18表示延迟2h大于18的都是2hmessage.setDelayTimeLevel(i);// 1 同步发送 如果发送失败会根据重试次数重试SendResult send producer.send(message);SendStatus sendStatus send.getSendStatus();System.out.println(sendStatus.toString());}消费者/*** 推消息消费*/DefaultMQPushConsumer defaultMQPushConsumer newDefaultMQPushConsumer(consumer_group_delay_01);// 指定nameserver的地址defaultMQPushConsumer.setNamesrvAddr(localhost:9876);defaultMQPushConsumer.subscribe(topic_name_delay,*);// 1 提高消费并行度defaultMQPushConsumer.setConsumeThreadMax(10);defaultMQPushConsumer.setConsumeThreadMin(1);// 2 以批量方式进行 消费// 设置消息批处理的一个批次中消息的最大个数defaultMQPushConsumer.setConsumeMessageBatchMaxSize(10);//设置重试次数 默认16次defaultMQPushConsumer.setMaxReconsumeTimes(1);// 添加消息监听器一旦有消息推送过来就进行消费defaultMQPushConsumer.setMessageListener(newMessageListenerConcurrently(){OverridepublicConsumeConcurrentlyStatusconsumeMessage(ListMessageExt msgs,ConsumeConcurrentlyContext context){//final MessageQueue messageQueue context.getMessageQueue();for(MessageExt msg : msgs){System.out.println(msg);try{System.out.println(newString(msg.getBody(),utf-8));}catch(UnsupportedEncodingException e){e.printStackTrace();}}// 消息消费成功returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 消息消费失败// return ConsumeConcurrentlyStatus.RECONSUME_LATER;}});消费消息按照0到18级别来0 表示不延迟1表示延迟1s大于等于18表示延迟2h按照级别一次类推默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m8m 9m 10m 20m 30m 1h 2h”18个level。这里只拷贝0-8 的打印日志可以自己等待确认。MessageExt [queueId0, storeSize195, queueOffset19, sysFlag0, bornTimestamp1628949643548, bornHost/192.168.0.103:55518, storeTimestamp1628949643554, storeHost/192.168.0.103:10911, msgIdC0A8006700002A9F0000000000029A8A, commitLogOffset170634, bodyCRC858365373, reconsumeTimes0, preparedTransactionOffset0, toString()Message{topic‘topic_name_delay’, flag0, properties{MIN_OFFSET0, MAX_OFFSET20, CONSUME_START_TIME1628949650433, UNIQ_KEYC0A8006748D07C53A9EB47ABD51C0000, CLUSTERDefaultCluster, WAITtrue, DELAY0}, body[107, 101, 121, 61, 48], transactionId‘null’}]key0MessageExt [queueId1, storeSize234, queueOffset22, sysFlag0, bornTimestamp1628949643558, bornHost/192.168.0.103:55518, storeTimestamp1628949644566, storeHost/192.168.0.103:10911, msgIdC0A8006700002A9F000000000002ACF8, commitLogOffset175352, bodyCRC1143909675, reconsumeTimes0, preparedTransactionOffset0, toString()Message{topic‘topic_name_delay’, flag0, properties{MIN_OFFSET0, REAL_TOPICtopic_name_delay, MAX_OFFSET23, CONSUME_START_TIME1628949650435, UNIQ_KEYC0A8006748D07C53A9EB47ABD5260001, CLUSTERDefaultCluster, WAITtrue, DELAY1, REAL_QID1}, body[107, 101, 121, 61, 49], transactionId‘null’}]key1MessageExt [queueId2, storeSize234, queueOffset18, sysFlag0, bornTimestamp1628949643561, bornHost/192.168.0.103:55518, storeTimestamp1628949648566, storeHost/192.168.0.103:10911, msgIdC0A8006700002A9F000000000002ADE2, commitLogOffset175586, bodyCRC1562901649, reconsumeTimes0, preparedTransactionOffset0, toString()Message{topic‘topic_name_delay’, flag0, properties{MIN_OFFSET0, REAL_TOPICtopic_name_delay, MAX_OFFSET19, CONSUME_START_TIME1628949650436, UNIQ_KEYC0A8006748D07C53A9EB47ABD5290002, CLUSTERDefaultCluster, WAITtrue, DELAY2, REAL_QID2}, body[107, 101, 121, 61, 50], transactionId‘null’}]key2MessageExt [queueId3, storeSize234, queueOffset17, sysFlag0, bornTimestamp1628949643566, bornHost/192.168.0.103:55518, storeTimestamp1628949653569, storeHost/192.168.0.103:10911, msgIdC0A8006700002A9F000000000002AECC, commitLogOffset175820, bodyCRC706792455, reconsumeTimes0, preparedTransactionOffset0, toString()Message{topic‘topic_name_delay’, flag0, properties{MIN_OFFSET0, REAL_TOPICtopic_name_delay, MAX_OFFSET18, CONSUME_START_TIME1628949653572, UNIQ_KEYC0A8006748D07C53A9EB47ABD52E0003, CLUSTERDefaultCluster, WAITtrue, DELAY3, REAL_QID3}, body[107, 101, 121, 61, 51], transactionId‘null’}]key3MessageExt [queueId0, storeSize234, queueOffset20, sysFlag0, bornTimestamp1628949643568, bornHost/192.168.0.103:55518, storeTimestamp1628949673574, storeHost/192.168.0.103:10911, msgIdC0A8006700002A9F000000000002B0A0, commitLogOffset176288, bodyCRC876894628, reconsumeTimes0, preparedTransactionOffset0, toString()Message{topic‘topic_name_delay’, flag0, properties{MIN_OFFSET0, REAL_TOPICtopic_name_delay, MAX_OFFSET21, CONSUME_START_TIME1628949673577, UNIQ_KEYC0A8006748D07C53A9EB47ABD5300004, CLUSTERDefaultCluster, WAITtrue, DELAY4, REAL_QID0}, body[107, 101, 121, 61, 52], transactionId‘null’}]key4MessageExt [queueId1, storeSize234, queueOffset23, sysFlag0, bornTimestamp1628949643570, bornHost/192.168.0.103:55518, storeTimestamp1628949703574, storeHost/192.168.0.103:10911, msgIdC0A8006700002A9F000000000002B18A, commitLogOffset176522, bodyCRC1128491314, reconsumeTimes0, preparedTransactionOffset0, toString()Message{topic‘topic_name_delay’, flag0, properties{MIN_OFFSET0, REAL_TOPICtopic_name_delay, MAX_OFFSET24, CONSUME_START_TIME1628949703577, UNIQ_KEYC0A8006748D07C53A9EB47ABD5320005, CLUSTERDefaultCluster, WAITtrue, DELAY5, REAL_QID1}, body[107, 101, 121, 61, 53], transactionId‘null’}]key5MessageExt [queueId2, storeSize234, queueOffset20, sysFlag0, bornTimestamp1628949643572, bornHost/192.168.0.103:55518, storeTimestamp1628949763575, storeHost/192.168.0.103:10911, msgIdC0A8006700002A9F000000000002B35E, commitLogOffset176990, bodyCRC1514813576, reconsumeTimes0, preparedTransactionOffset0, toString()Message{topic‘topic_name_delay’, flag0, properties{MIN_OFFSET0, REAL_TOPICtopic_name_delay, MAX_OFFSET21, CONSUME_START_TIME1628949763580, UNIQ_KEYC0A8006748D07C53A9EB47ABD5340006, CLUSTERDefaultCluster, WAITtrue, DELAY6, REAL_QID2}, body[107, 101, 121, 61, 54], transactionId‘null’}]key6MessageExt [queueId3, storeSize234, queueOffset19, sysFlag0, bornTimestamp1628949643574, bornHost/192.168.0.103:55518, storeTimestamp1628949823580, storeHost/192.168.0.103:10911, msgIdC0A8006700002A9F000000000002B532, commitLogOffset177458, bodyCRC760023070, reconsumeTimes0, preparedTransactionOffset0, toString()Message{topic‘topic_name_delay’, flag0, properties{MIN_OFFSET0, REAL_TOPICtopic_name_delay, MAX_OFFSET20, CONSUME_START_TIME1628949823583, UNIQ_KEYC0A8006748D07C53A9EB47ABD5360007, CLUSTERDefaultCluster, WAITtrue, DELAY7, REAL_QID3}, body[107, 101, 121, 61, 55], transactionId‘null’}]key7MessageExt [queueId0, storeSize234, queueOffset22, sysFlag0, bornTimestamp1628949643576, bornHost/192.168.0.103:55518, storeTimestamp1628949883582, storeHost/192.168.0.103:10911, msgIdC0A8006700002A9F000000000002B706, commitLogOffset177926, bodyCRC1039275407, reconsumeTimes0, preparedTransactionOffset0, toString()Message{topic‘topic_name_delay’, flag0, properties{MIN_OFFSET0, REAL_TOPICtopic_name_delay, MAX_OFFSET23, CONSUME_START_TIME1628949883586, UNIQ_KEYC0A8006748D07C53A9EB47ABD5380008, CLUSTERDefaultCluster, WAITtrue, DELAY8, REAL_QID0}, body[107, 101, 121, 61, 56], transactionId‘null’}]key8同样我们在控制台可以看到存放的消息
http://www.dnsts.com.cn/news/179721.html

相关文章:

  • 梧州网站制作网站为什么被挂马
  • 广州建网站哪儿济南兴田德润简介网站上的专题 怎么设计
  • html 5电影网站源码自己搭建公网ip服务器
  • 鞋子网站建设策划书如何给网站备案
  • 没有备案的网站 推广2022年企业所得税最新标准
  • wordpress正在执行例行维护_请一分钟后回来.上海网站seo策划
  • php网站建设的公司潍坊注册公司流程和费用标准
  • 网站建设公司正规吗如何创建网站平台的详细步骤
  • 建手机网站款软件吉林省建设部网站
  • 东莞网站优化如何北京手机软件开发
  • 浦北网站建设凡科网站设计
  • 百度网站怎么做的云南app制作
  • 网站建设的常用软件有哪些退役厅门户网站建设中标公告
  • 网站建设实现后台数据导出excel番禺市桥网站建设公司
  • 个人网站建设法律规定数字营销经典案例
  • 无锡网站建设维护建设网上银行官方网站
  • 台州市建站公司怎么样自己做网站接订单
  • 企业网站设计开发服务垂直型电商网站如何做
  • 站长工具2023最新国产wordpress 维基插件
  • 公司网站建设工作好品牌设计公司
  • 花生壳软件做的网站泉州网站排名优化
  • 有没有专业做电视测评的网站舟山市建设局网站
  • 昆山住房和城乡建设部网站网络设计的三个层次
  • 报纸网站建设防盗网站人做清洁
  • 本地网站建设软件使用wordpress函数
  • wordpress删除模板文件快速seo软件
  • 网站设计要如何做支付功能怎样设置个人网站
  • 百度安全网站检测自建外贸网站
  • 网站这么做优化英文网站建设成都
  • jsp项目个人网站开发做网站网站的