当前位置: 首页 > news >正文

视频网站制作教程视频网页设计页面设计主要技术

视频网站制作教程视频,网页设计页面设计主要技术,视频制作教学,wordpress显示用户列表文章目录 顺序消息应用场景消息组#xff08;MessageGroup#xff09;顺序性生产的顺序性MQ 存储的顺序性消费的顺序性 rocketmq-client-java 示例#xff08;gRPC 协议#xff09;1. 创建 FIFO 主题生产者代码消费者代码解决办法解决后执行结果 rocketmq-client 示例… 文章目录 顺序消息应用场景消息组MessageGroup顺序性生产的顺序性MQ 存储的顺序性消费的顺序性 rocketmq-client-java 示例gRPC 协议1. 创建 FIFO 主题生产者代码消费者代码解决办法解决后执行结果 rocketmq-client 示例Remoting 协议生产者MessageQueueSelector 详解 消费者 顺序消息应用场景 在有序事件处理、撮合交易、数据实时增量同步等场景下异构系统间需要维持强一致的状态同步上游的事件变更需要按照顺序传递到下游进行处理。在这类场景下使用 RocketMQ 的顺序消息可以有效保证数据传输的顺序性。比如同一个用户的操作一定是先生成订单、再进行支付、扣减库存、生成物流信息等。 消息组MessageGroup RocketMQ 顺序消息的顺序关系通过消息组MessageGroup判定和识别发送顺序消息时需要为每条消息设置归属的消息组相同消息组的多条消息之间遵循先进先出的顺序关系不同消息组、无消息组的消息之间不涉及顺序性。 基于消息组的顺序判定逻辑支持按照业务逻辑做细粒度拆分可以在满足业务局部顺序的前提下提高系统的并行度和吞吐能力。 顺序性 RocketMQ 的消息的顺序性分为两部分生产顺序性和消费顺序性。 生产的顺序性 生产的顺序性就是必须保证每个消息在生成时是顺序的且顺序的发送到 MQ 服务器。要保证生产的顺序需要满足以下条件 单一生产者消息生产的顺序性仅支持单一生产者不同生产者分布在不同的系统即使设置相同的消息组不同生产者之间产生的消息也无法判定其先后顺序。串行发送Apache RocketMQ 生产者客户端支持多线程安全访问但如果生产者使用多线程并行发送则不同线程间产生的消息将无法判定其先后顺序。 MQ 存储的顺序性 MQ 按顺序收到消息后会保证设置了同一消息组的消息按照发送顺序存储在同一队列中。服务端顺序存储逻辑如下 相同消息组的消息按照先后顺序被存储在同一个队列。不同消息组的消息可以混合在同一个队列中且不保证连续。 消费的顺序性 消费的顺序性是消费者在消费的时候要严格按照 MQ 中的存储顺序来执行。 消费者保证执行的顺序 PushConsumer 类型消费者RocketMQ 会保证消息按照存储顺序一条一条投递给消费者SimpleConsumer 类型消费者需要业务实现方自行保证消费的顺序。消费消息时需要严格按照接收—处理—应答的语义处理消息避免因异步处理导致消息乱序。 重试策略 Apache RocketMQ 顺序消息投递仅在重试次数限定范围内即一条消息如果一直重试失败超过最大重试次数后将不再重试跳过这条消息消费不会一直阻塞后续消息处理。 所以对于需要严格保证消费顺序的场景请务设置合理的重试次数避免参数不合理导致消息乱序。 rocketmq-client-java 示例gRPC 协议 1. 创建 FIFO 主题 本示例我们模拟多个用户的一系列操作并多个消息组区分不同的顺序消息。要求每个用户的消息按顺序执行不同用户的消息之间不做必要关联。 $ ./mqadmin updatetopic -n localhost:9876 -c DefaultCluster -t MY_FIFO_TOPIC -o true -a message.typeFIFO注意这里比普通消息和顺序消息多了一个 -o 参数表示 order 的意思。 生产者代码 import com.yyoo.mq.rocket.MyMQProperties; import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.ClientServiceProvider; import org.apache.rocketmq.client.apis.message.Message; import org.apache.rocketmq.client.apis.producer.Producer; import org.apache.rocketmq.client.apis.producer.SendReceipt;import java.io.IOException;public class FifoProducerDemo {public static void main(String[] args) throws ClientException, IOException {// 用于提供生产者、消费者、消息对应的构建类 BuilderClientServiceProvider provider ClientServiceProvider.loadService();// 构建配置类包含端点位置、认证以及连接超时等的配置ClientConfiguration configuration ClientConfiguration.newBuilder()// endpoints 即为 proxy 的地址多个用分号隔开。如xxx:8081;xxx:8081.setEndpoints(MyMQProperties.ENDPOINTS).build();// 构建生产者Producer producer provider.newProducerBuilder()// Topics 列表生产者和主题是多对多的关系同一个生产者可以向多个主题发送消息.setTopics(MY_FIFO_TOPIC).setClientConfiguration(configuration)// 构建生产者此方法会抛出 ClientException 异常.build();for(int i 1; i 10;i) {String msgGroup user ; // 表示有两个用户String keys key_ i;// 构建消息类Message message provider.newMessageBuilder()// 设置消息发送到的主题.setTopic(MY_FIFO_TOPIC)// 设置消息索引键可根据关键字精确查找某条消息。其一般为业务上的唯一值。如订单id.setKeys(keys)// 设置消息Tag表示为创建订单.setTag(ORDER_CREATE)// 设置消息组.setMessageGroup(msgGroup)// 消息体单条消息的传输负载不宜过大。所以此处的字节大小最好有个限制.setBody(({\success\:true,\msg\:\ msgGroup keys \}).getBytes()).build();// 发送消息此处最好进行异常处理对消息的状态进行一个记录try {SendReceipt sendReceipt producer.send(message);System.out.println(keys);System.out.println(Send message successfully, messageId sendReceipt.getMessageId());} catch (ClientException e) {System.out.println(Failed to send message);}}// 发送完关闭生产者// producer.close();}} 发送顺序消息时消息一定要设置消息组同一消息组的消息将会按服务器接收的顺序进行消费。 注发送顺序消息前需要设置 NameServer 中的配置 orderMessageEnable 和 returnOrderTopicConfigToBroker 为 true。特别是 orderMessageEnable 默认为 false。建议在启动 namesrv 的时候使用自定义配置在自定义配置中配置选项为true即可。 # namesrv.conf 为我们自定义的配置文件 nohup sh bin/mqnamesrv -c conf/namesrv.conf 消费者代码 import com.yyoo.mq.rocket.MyMQProperties; import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.ClientServiceProvider; import org.apache.rocketmq.client.apis.consumer.ConsumeResult; import org.apache.rocketmq.client.apis.consumer.FilterExpression; import org.apache.rocketmq.client.apis.consumer.FilterExpressionType; import org.apache.rocketmq.client.apis.consumer.PushConsumer;import java.nio.ByteBuffer; import java.util.Collections;public class FifoConsumerDemo {public static void main(String[] args) throws ClientException {// 用于提供生产者、消费者、消息对应的构建类 BuilderClientServiceProvider provider ClientServiceProvider.loadService();// 构建配置类包含端点位置、认证以及连接超时等的配置ClientConfiguration configuration ClientConfiguration.newBuilder()// endpoints 即为 proxy 的地址多个用分号隔开。如xxx:8081;xxx:8081.setEndpoints(MyMQProperties.ENDPOINTS).build();// 设置过滤条件这里为使用 tag 进行过滤String tag ORDER_CREATE;FilterExpression filterExpression new FilterExpression(tag, FilterExpressionType.TAG);// 构建消费者PushConsumer pushConsumer provider.newPushConsumerBuilder().setClientConfiguration(configuration)// 设置消费者分组.setConsumerGroup(MY_FIFO_GROUP)// 设置主题与消费者之间的订阅关系.setSubscriptionExpressions(Collections.singletonMap(MY_FIFO_TOPIC, filterExpression)).setMessageListener(messageView - {System.out.println(messageView);ByteBuffer rs messageView.getBody();byte[] rsByte new byte[rs.limit()];rs.get(rsByte);System.out.println(Message body new String(rsByte));// 处理消息并返回消费结果。System.out.println(Consume message successfully, messageId messageView.getMessageId());return ConsumeResult.SUCCESS;}).build();System.out.println(pushConsumer);// 如果不需要再使用 PushConsumer可关闭该实例。// pushConsumer.close();}} 注多验证几次后会发现消费执行并没有严格的按照顺序执行查找源码后发现PushConsumer 的 builder 在构建 PushConsumer 的时候有个 Settings 对象该对象的主题配置信息是从服务器获取获取后有一个 isFifo 参数此参数对应是否顺序消费但是目前此值一直为false。此问题为消费者分组的问题Remoting 协议方式无此问题因为两种 Client 的实现是不一样的。 解决办法 在 MQ bin目录执行如下命令即可具体的相关说明我们将在后续章节中《RocketMQ 消费者分类与分组》详细说明。 $ ./mqadmin updateSubGroup -n 127.0.0.1:9876 -g MY_FIFO_GROUP -o true -c DefaultCluster解决后执行结果 MessageViewImpl{messageId010456E5ECA6F32F6C051313D700000001, topicMY_FIFO_TOPIC, bornHostDESKTOP-S1DMOAD, bornTimestamp1694595543268, endpointsipv4:192.168.1.1:8081, deliveryAttempt1, tagORDER_CREATE, keys[key_2], messageGroupuser1, deliveryTimestampnull, properties{__SHARDINGKEYuser1}} MessageViewImpl{messageId010456E5ECA6F32F6C051313D700000000, topicMY_FIFO_TOPIC, bornHostDESKTOP-S1DMOAD, bornTimestamp1694595543178, endpointsipv4:192.168.1.1:8081, deliveryAttempt1, tagORDER_CREATE, keys[key_1], messageGroupuser2, deliveryTimestampnull, properties{__SHARDINGKEYuser2}} Message body{success:true,msg:user1key_2} Message body{success:true,msg:user2key_1} Consume message successfully, messageId010456E5ECA6F32F6C051313D700000000 Consume message successfully, messageId010456E5ECA6F32F6C051313D700000001 MessageViewImpl{messageId010456E5ECA6F32F6C051313D700000002, topicMY_FIFO_TOPIC, bornHostDESKTOP-S1DMOAD, bornTimestamp1694595543279, endpointsipv4:192.168.1.1:8081, deliveryAttempt1, tagORDER_CREATE, keys[key_3], messageGroupuser1, deliveryTimestampnull, properties{__SHARDINGKEYuser1}} Message body{success:true,msg:user1key_3} Consume message successfully, messageId010456E5ECA6F32F6C051313D700000002 MessageViewImpl{messageId010456E5ECA6F32F6C051313D700000004, topicMY_FIFO_TOPIC, bornHostDESKTOP-S1DMOAD, bornTimestamp1694595543294, endpointsipv4:192.168.1.1:8081, deliveryAttempt1, tagORDER_CREATE, keys[key_5], messageGroupuser2, deliveryTimestampnull, properties{__SHARDINGKEYuser2}} Message body{success:true,msg:user2key_5} Consume message successfully, messageId010456E5ECA6F32F6C051313D700000004 MessageViewImpl{messageId010456E5ECA6F32F6C051313D700000003, topicMY_FIFO_TOPIC, bornHostDESKTOP-S1DMOAD, bornTimestamp1694595543288, endpointsipv4:192.168.1.1:8081, deliveryAttempt1, tagORDER_CREATE, keys[key_4], messageGroupuser1, deliveryTimestampnull, properties{__SHARDINGKEYuser1}} MessageViewImpl{messageId010456E5ECA6F32F6C051313D700000005, topicMY_FIFO_TOPIC, bornHostDESKTOP-S1DMOAD, bornTimestamp1694595543301, endpointsipv4:192.168.1.1:8081, deliveryAttempt1, tagORDER_CREATE, keys[key_6], messageGroupuser2, deliveryTimestampnull, properties{__SHARDINGKEYuser2}} Message body{success:true,msg:user1key_4} Message body{success:true,msg:user2key_6} Consume message successfully, messageId010456E5ECA6F32F6C051313D700000005 Consume message successfully, messageId010456E5ECA6F32F6C051313D700000003 MessageViewImpl{messageId010456E5ECA6F32F6C051313D700000006, topicMY_FIFO_TOPIC, bornHostDESKTOP-S1DMOAD, bornTimestamp1694595543313, endpointsipv4:192.168.1.1:8081, deliveryAttempt1, tagORDER_CREATE, keys[key_7], messageGroupuser1, deliveryTimestampnull, properties{__SHARDINGKEYuser1}} Message body{success:true,msg:user1key_7} Consume message successfully, messageId010456E5ECA6F32F6C051313D700000006 MessageViewImpl{messageId010456E5ECA6F32F6C051313D700000007, topicMY_FIFO_TOPIC, bornHostDESKTOP-S1DMOAD, bornTimestamp1694595543320, endpointsipv4:192.168.1.1:8081, deliveryAttempt1, tagORDER_CREATE, keys[key_8], messageGroupuser1, deliveryTimestampnull, properties{__SHARDINGKEYuser1}} Message body{success:true,msg:user1key_8} Consume message successfully, messageId010456E5ECA6F32F6C051313D700000007 MessageViewImpl{messageId010456E5ECA6F32F6C051313D700000008, topicMY_FIFO_TOPIC, bornHostDESKTOP-S1DMOAD, bornTimestamp1694595543331, endpointsipv4:192.168.1.1:8081, deliveryAttempt1, tagORDER_CREATE, keys[key_9], messageGroupuser1, deliveryTimestampnull, properties{__SHARDINGKEYuser1}} Message body{success:true,msg:user1key_9} Consume message successfully, messageId010456E5ECA6F32F6C051313D700000008 MessageViewImpl{messageId010456E5ECA6F32F6C051313D700000009, topicMY_FIFO_TOPIC, bornHostDESKTOP-S1DMOAD, bornTimestamp1694595543340, endpointsipv4:192.168.1.1:8081, deliveryAttempt1, tagORDER_CREATE, keys[key_10], messageGroupuser1, deliveryTimestampnull, properties{__SHARDINGKEYuser1}} Message body{success:true,msg:user1key_10} Consume message successfully, messageId010456E5ECA6F32F6C051313D700000009注意user1 和 user2 的操作顺序是一致的。因为我们不需要保证 user1 的操作必须在 user2 之前只需要保证他们各自的操作为顺序的就可以。 rocketmq-client 示例Remoting 协议 生产者 import com.yyoo.mq.rocket.MyMQProperties; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.shaded.commons.lang3.RandomUtils;import java.util.List;public class FifoProducerDemo {/*** 生产者分组*/private static final String PRODUCER_GROUP FIFO_PRODUCER_GROUP;/*** 主题*/private static final String TOPIC MY_FIFO_TOPIC;public static void main(String[] args) throws MQClientException {/** 创建生产者并使用生产者分组初始化*/DefaultMQProducer producer new DefaultMQProducer(PRODUCER_GROUP);/** NamesrvAddr 的地址多个用分号隔开。如xxx:9876;xxx:9876*/producer.setNamesrvAddr(MyMQProperties.NAMESRV_ADDR);/** 发送消息超时时间默认即为 3000*/producer.setSendMsgTimeout(3000);/** 启动生产者此方法抛出 MQClientException*/producer.start();/** 发送消息*/for (int i 1; i 10; i) {try {Message msg new Message();msg.setTopic(TOPIC);// 设置消息索引键可根据关键字精确查找某条消息。msg.setKeys(messageKey);// 设置消息Tag用于消费端根据指定Tag过滤消息。msg.setTags(ORDER_CREATE);// 设置消息体msg.setBody((顺序消息 i).getBytes());// 这里 userId 取值为 1,2,3模拟有3个用户的顺序操作int userId RandomUtils.nextInt(1,4);SendResult sendResult producer.send(msg, new MessageQueueSelector() {Overridepublic MessageQueue select(ListMessageQueue mqs, Message msg, Object arg) {// 这个arg就是对应userIdInteger userId (Integer)arg;// 我们按队列的数量对每个user进行分组int index userId % mqs.size();// 同一个user的消息放入同一个队列return mqs.get(index);}},userId);System.out.printf(%s%n, sendResult);} catch (Exception e) {e.printStackTrace();System.out.println(消息发送失败i i);}}// 如果生产者不再使用则调用关闭// 异步发送消息注意异步发送消息建议此处不关闭或者在sleep一段时间后再关闭// 因为异步 SendCallback 执行的时候shutdow可能已经执行了生产者被关闭了// producer.shutdown();}} MessageQueueSelector 详解 public interface MessageQueueSelector {MessageQueue select(final ListMessageQueue mqs, final Message msg, final Object arg); }mqs队列列表我们前面说了默认 8 个队列 msg当前消息 arg为我们 send 方法传的第三个参数示例中就是 userId MessageQueueSelector 意为队列选择器Remoting 协议客户端中没有 消息组的概念所以需要我们手动的为消息进行分组将需要严格顺序的消息放在同一个队列这个接口就是完成此任务的而且分组的逻辑需要我们自己实现。实际应用中我们可以使用 用户id、订单id等来为顺序消息分组。 消费者 import com.yyoo.mq.rocket.MyMQProperties; import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class FifoConsumerDemo {public static void main(String[] args) throws MQClientException {// 初始化 consumerDefaultMQPushConsumer consumer new DefaultMQPushConsumer(REMOTING_FIFO_CONSUMER_GROUP);// 设置 namesrv 地址consumer.setNamesrvAddr(MyMQProperties.NAMESRV_ADDR);// 设置从开头开始读取消息consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);// 设置订阅的主题以及过滤tagconsumer.subscribe(MY_FIFO_TOPIC, ORDER_CREATE || TagA || TagD || messageTag);consumer.registerMessageListener(new MessageListenerOrderly() {Overridepublic ConsumeOrderlyStatus consumeMessage(ListMessageExt msgs, ConsumeOrderlyContext context) {System.out.printf(%s Receive New Messages: %s %n, Thread.currentThread().getName(), msgs);for(MessageExt msg : msgs){System.out.println(new String(msg.getBody()));}return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.printf(Consumer Started.%n);}} 注意顺序消息消费者的监听类型为 MessageListenerOrderly 注意与我们前面的示例 MessageListenerConcurrently 进行区分。
http://www.dnsts.com.cn/news/128344.html

相关文章:

  • h5游戏网站开发怎么把wordpress字去掉
  • 电商资源网站百度打广告怎么收费
  • php带数据库的网站wordpress 飘窗
  • 怎么建立一个网站搜关键词会跳出沈阳哪里做网站
  • 免费行情软件网站下载安装企业网站改版
  • 大型网站后台登录地址一般是如何设置的wordpress添加小说板块
  • 百度权重10的网站大连微网站建设
  • 高明网站设计哪家服务好做网站基本东西
  • 网站标签的作用淄博中企动力
  • 襄阳市建设局网站个人网页设计模板图片手机版
  • 设计网站国外合肥房产网官网首页
  • 学生管理系统网站开福区城乡建设局网站
  • 与狗做网站合肥
  • 什么是微网站系统出入成都最新通知今天
  • 租一个国外的服务器 建设网站手机 网站 导航菜单 代码
  • 商务网站建设考试无锡军自考网站建设
  • 网红营销网站做公众号还是网站
  • 电影订票网站开发中山楼市最新消息
  • 上海定制建站网站建设山东住建部和城乡建设官网
  • 专业网站建设哪家好网站或站点的第一个网页
  • 手机网站外链佛山骏域网站建设
  • 企业网站需要注意什么陕西省建设总工会网站
  • wordpress成品网站云部落旅游网网站建设目的
  • 农业信息门户网站建设方案wordpress custom fields
  • 中国建设银行官网站e路通下载专做写字楼出租的网站
  • 老薛主机wordpress模板惠州网站seo收费
  • 上海集团网站建设咨询肥西县建设局官方网站
  • 政务网站建设工作计划结尾企业网站建设目标
  • 广东专业网站建设公司网站改版应该怎么做
  • 母了猜猜看游戏做网站南昌市住房和城乡建设网站