当前位置: 首页 > news >正文

商城网站建设服务cms开源框架

商城网站建设服务,cms开源框架,wordpress增加管理员,去掉wordpress头像2.1 消息生产和消费介绍使用RocketMQ可以发送普通消息、顺序消息、事务消息#xff0c;顺序消息能实现有序消费#xff0c;事务消息可以解决分布式事务实现数据最终一致。RocketMQ有2种常见的消费模式,分别是DefaultMQPushConsumer和DefaultMQPullConsumer模式#xff0c;这…2.1 消息生产和消费介绍使用RocketMQ可以发送普通消息、顺序消息、事务消息顺序消息能实现有序消费事务消息可以解决分布式事务实现数据最终一致。RocketMQ有2种常见的消费模式,分别是DefaultMQPushConsumer和DefaultMQPullConsumer模式这2种模式字面理解一个是推送消息一个是拉取消息。这里有个误区其实无论是Push还是Pull其本质都是拉取消息只是实现机制不一样。DefaultMQPushConsumer其实并不是broker主动向consumer推送消息而是consumer向broker发出请求保持了一种长链接broker会每5秒会检测一次是否有消息如果有消息则将消息推送给consumer。使用DefaultMQPushConsumer实现消息消费broker会主动记录消息消费的偏移量。DefaultMQPullConsumer是消费方主动去broker拉取数据一般会在本地使用定时任务实现使用它获得消息状态方便、负载均衡性能可控 但消息的及时性差,而且需要手动记录消息消费的偏移量信息 所以在工作中多数情况推荐使用Push模式。RocketMQ发送的消息默认会存储到4个队列中当然创建几个队列存储数据可以自己定义。在这里插入图片描述RocketMQ作为MQ消息中间件ack机制必不可少在RocketMQ中常见的应答状态如下LocalTransactionState:主要针对事务消息的应答状态public enum LocalTransactionState { COMMIT_MESSAGE,//消息提交 ROLLBACK_MESSAGE,//消息回滚 UNKNOW, //未知状态一般用于处理超时等现象}ConsumeConcurrentlyStatus:主要针对消息消费的应答状态public enum ConsumeConcurrentlyStatus { //消息消费成功 CONSUME_SUCCESS, //消息重试一般消息消费失败后RocketMQ为了保证数据的可靠性具有重试机制 RECONSUME_LATER;}重发时间是:(broker.log中有)messageDelayLevel1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h2.2 RocketMQ普通消息生产者2.2.1 工程创建我们先实现一个最基本的消息发送先创建一个springboot工程工程名字叫rocketmq-demo1pom.xml?xml version1.0 encodingUTF-8?project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instance xsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 target_blankhttp://maven.apache.org/xsd/maven-4.0.0.xsd modelVersion4.0.0/modelVersion parent groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-parent/artifactId version2.0.5.RELEASE/version /parent groupIdorg.mentu/groupId artifactIdrocketmq-demo1/artifactId version1.0-SNAPSHOT/version packagingjar/packaging properties project.build.sourceEncodingUTF-8/project.build.sourceEncoding java.version1.8/java.version rocketmq.version4.4.0/rocketmq.version /properties dependencies dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-actuator/artifactId /dependency dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-test/artifactId scopetest/scope /dependency dependency groupIdorg.apache.rocketmq/groupId artifactIdrocketmq-client/artifactId version${rocketmq.version}/version /dependency /dependencies/project2.2.2 消息发送消息发送有这么几个步骤创建DefaultMQProducer设置Namesrv地址开启DefaultMQProducer创建消息Message发送消息关闭DefaultMQProducer我们创建一个Producer类按照上面步骤实现消息发送代码如下在这里插入图片描述public class Producer { //指定namesrv地址 private static String NAMESRV_ADDRESS 192.168.211.143:9876; public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException { //创建一个DefaultMQProducer,需要指定消息发送组 DefaultMQProducer producer new DefaultMQProducer(Test_Quick_Producer_Name); //指定Namesvr地址 producer.setNamesrvAddr(NAMESRV_ADDRESS); //启动Producer producer.start(); //创建消息 Message message new Message( Test_Quick_Topic, //主题 TagA, //标签可以用来做过滤 KeyA, //唯一标识可以用来查找消息 hello rocketmq.getBytes() //要发送的消息字节数组 ); //发送消息 SendResult result producer.send(message); //关闭producer producer.shutdown(); }}我们可以在控制台查看到对应的消息,控制台地址http://localhost:8080/#/message 我们可以看到如下消息在这里插入图片描述注意这里时间查询以消息存储时间为准注意服务器的时间有可能不准确。2.3 RocketMQ普通消息消费者2.3.1 消息消费消费者消费消息有这么几个步骤创建DefaultMQPushConsumer设置namesrv地址设置subscribe这里是要读取的主题信息创建消息监听MessageListener获取消息信息返回消息读取状态创建Consumer类按照上面步骤实现消息消费代码如下在这里插入图片描述上图代码如下public class Consumer { //指定namesrv地址 private static String NAMESRV_ADDRESS 192.168.211.143:9876; public static void main(String[] args) throws MQClientException { //创建DefaultMQPushConsumer DefaultMQPushConsumer consumer new DefaultMQPushConsumer(Test_Quick_Consumer_Name); //设置namesrv地址 consumer.setNamesrvAddr(NAMESRV_ADDRESS); //设置要读取的topic consumer.subscribe( Test_Quick_Topic, //指定要读取的消息主题 TagA); //指定要读取的消息过滤信息,多个标签数据则可以输入tag1 || tag2 || tag3 //创建消息监听 consumer.setMessageListener(new MessageListenerConcurrently() { Override public ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) { try { //获取第1个消息 MessageExt message msgs.get(0); //获取主题 String topic message.getTopic(); //获取标签 String tags message.getTags(); //获取消息 String result new String(message.getBody(),UTF-8); System.out.println(topic:topic,tags:tags,result:result); } catch (UnsupportedEncodingException e) { e.printStackTrace(); //消息重试 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } //消息消费成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //启动消费监听 consumer.start(); }}控制台运行结果topic:Test_Quick_Topic,tags:TagA,result:hello rocketmq2.4 RocketMQ顺序消息消息有序指的是可以按照消息的发送顺序来消费。 RocketMQ可以严格的保证消息有序。但这个顺序不是全局顺序只是分区queue顺序。要全局顺序只能一个分区。如何保证顺序在MQ的模型中顺序需要由3个阶段去保障消息被发送时保持顺序消息被存储时保持和发送的顺序一致消息被消费时保持和存储的顺序一致发送时保持顺序意味着对于有顺序要求的消息用户应该在同一个线程中采用同步的方式发送。存储保持和发送的顺序一致则要求在同一线程中被发送出来的消息A和B存储时在空间上A一定在B之前。而消费保持和存储一致则要求消息A、B到达Consumer之后必须按照先A后B的顺序被处理。在这里插入图片描述2.4.1 消息生产者我们创建一个消息生产者OrderProducer,这里每次发消息都会发到同一个队列中代码如下在这里插入图片描述上图代码如下public class OrderProducer { //nameserver地址 private static String namesrvaddress192.168.211.143:9876;; public static void main(String[] args) throws UnsupportedEncodingException, InterruptedException, RemotingException, MQClientException, MQBrokerException { //创建DefaultMQProducer DefaultMQProducer producer new DefaultMQProducer(order_producer_group_name); //设置namesrv地址 producer.setNamesrvAddr(namesrvaddress); //启动Producer producer.start(); //创建消息 Message message new Message( Topic_Order_Demo, TagOrder, KeyOrder, hello order message!.getBytes(RemotingHelper.DEFAULT_CHARSET)); //发送消息 SendResult result producer.send( message, //要发送的消息 new MessageQueueSelector() { Override public MessageQueue select(ListMessageQueue mqs, Message msg, Object arg) { return mqs.get((Integer) arg); } }, 1);//设置存入第几个队列中,这里是下标从0开始 //关闭Producer producer.shutdown(); }}2.4.2 消息消费者创建一个消息消费者OrderConsumer,消息监听用MessageListenerOrderly来实现顺序消息代码如下在这里插入图片描述上图代码如下public class OrderConsumer { //nameserver地址 private static String namesrvaddress192.168.211.143:9876;; public static void main(String[] args) throws MQClientException { //创建消息消费对象DefaultMQConsumer DefaultMQPushConsumer consumer new DefaultMQPushConsumer(order_consumer_group_name); //设置nameserver地址 consumer.setNamesrvAddr(namesrvaddress); //设置消费顺序 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //设置消息拉取最大数 consumer.setConsumeMessageBatchMaxSize(5); //设置消费主题 consumer.subscribe(Topic_Order_Demo,TagOrder); //消息监听 consumer.setMessageListener(new MessageListenerOrderly() { Override public ConsumeOrderlyStatus consumeMessage(ListMessageExt msgs, ConsumeOrderlyContext context) { try { for (MessageExt msg : msgs) { String topic msg.getTopic(); String tags msg.getTags(); String keys msg.getKeys(); String body new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET); System.out.println(topic:topic,tags:tags,keys:keys,body:body); } } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return ConsumeOrderlyStatus.SUCCESS; } }); //启动Consumer consumer.start(); }}我们打开控制台可以看到消息发送到了第2个队列中了。在这里插入图片描述提示大家在测试顺序消息的时候可以将上面消息生产者连续发送10个或者更多来测试顺序。2.5 RocketMQ事务消息在RocketMQ4.3.0版本后开放了事务消息这一特性对于分布式事务而言最常说的还是二阶段提交协议。2.5.1 RocketMQ事务消息流程RocketMQ的事务消息主要是通过消息的异步处理可以保证本地事务和消息发送同时成功执行或失败从而保证数据的最终一致性这里我们先看看一条事务消息从诞生到结束的整个时间线流程在这里插入图片描述事务消息的成功投递是需要经历三个Topic的分别是Half Topic用于记录所有的prepare消息Op Half Topic记录已经提交了状态的prepare消息Real Topic事务消息真正的Topic,在Commit后会才会将消息写入该Topic从而进行消息的投递2.5.2 事务消息生产者我们创建一个事务消息生产者TransactionProducer,事务消息发送消息对象是TransactionMQProducer为了实现本地事务操作和回查我们需要创建一个监听器监听器需要实现TransactionListener接口实现代码如下监听器TransactionListenerImpl代码如下在这里插入图片描述上图代码如下public class TransactionListenerImpl implements TransactionListener { //存储当前线程对应的事务状态 private ConcurrentHashMapString, Integer localTrans new ConcurrentHashMap(); /*** * 发送prepare消息成功后回调该方法用于执行本地事务 * param msg:回传的消息利用transactionId即可获取到该消息的唯一Id * param arg:调用send方法时传递的参数当send时候若有额外的参数可以传递到send方法中这里能获取到 * return */ Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { //获取线程ID String transactionId msg.getTransactionId(); //初始状态为0 localTrans.put(transactionId,0); try { //此处执行本地事务操作 System.out.println(....执行本地事务); Thread.sleep(70000); System.out.println(....执行完成本地事务); } catch (InterruptedException e) { e.printStackTrace(); //发生异常则回滚消息 localTrans.put(transactionId,2); return LocalTransactionState.UNKNOW; } //修改状态 localTrans.put(transactionId,1); System.out.println(executeLocalTransaction------状态为1); //本地事务操作如果成功了则提交该消息让该消息可见 return LocalTransactionState.UNKNOW; } /*** * 消息回查 * param msg * return */ Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { //获取事务id String transactionId msg.getTransactionId(); //通过事务id获取对应的本地事务执行状态 Integer status localTrans.get(transactionId); System.out.println(消息回查-----status); switch (status){ case 0: return LocalTransactionState.UNKNOW; case 1: return LocalTransactionState.COMMIT_MESSAGE; case 2: return LocalTransactionState.ROLLBACK_MESSAGE; } return LocalTransactionState.UNKNOW; }}创建消息发送对象TransactionProducer,代码如下在这里插入图片描述上图代码如下public class TransactionProducer { //nameserver地址 private static String namesrvaddress192.168.211.143:9876;; public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, InterruptedException { //创建事务消息发送对象 TransactionMQProducer producer new TransactionMQProducer(transaction_producer_group_name); //设置namesrv地址 producer.setNamesrvAddr(namesrvaddress); //创建监听器 TransactionListener transactionListener new TransactionListenerImpl(); //创建线程池 ExecutorService executorService new ThreadPoolExecutor( 2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueueRunnable( 2000), new ThreadFactory() { Override public Thread newThread(Runnable runnable) { Thread thread new Thread(runnable); thread.setName(client-transaction-msg-check-thread); return thread; } } ); //设置线程池 producer.setExecutorService(executorService); //设置监听器 producer.setTransactionListener(transactionListener); //启动producer producer.start(); //创建消息 Message message new Message( TopicTxt_Demo, TagTx, KeyTx1, hello.getBytes(RemotingHelper.DEFAULT_CHARSET)); //发送事务消息,此时消息不可见 TransactionSendResult transactionSendResult producer.sendMessageInTransaction(message, 发送消息回传所需数据); System.out.println(transactionSendResult); //休眠 Thread.sleep(120000); //关闭 producer.shutdown(); }}2.5.3 事务消息事务消息的消费者和普通消费者一样这里我们就不做介绍了直接贴代码public class TransactionConsumer { //nameserver地址 private static String namesrvaddress192.168.211.143:9876;; public static void main(String[] args) throws MQClientException { //创建DefaultMQPushConsumer DefaultMQPushConsumer consumer new DefaultMQPushConsumer(transaction_consumer_group_name); //设置nameserver地址 consumer.setNamesrvAddr(namesrvaddress); //设置每次拉去的消息个数 consumer.setConsumeMessageBatchMaxSize(5); //设置消费顺序 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //设置监听的消息 consumer.subscribe(TopicTxt_Demo,TagTx); //消息监听 consumer.registerMessageListener(new MessageListenerConcurrently() { Override public ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) { try { for (MessageExt msg : msgs) { String topic msg.getTopic(); String tags msg.getTags(); String keys msg.getKeys(); String body new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET); System.out.println(topic:topic,tags:tags,keys:keys,body:body); } } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //启动消费 consumer.start(); }}事务消息参考地址http://rocketmq.apache.org/docs/transaction-example/2.5.4 RocketMQ实现分布式事务流程MQ事务消息解决分布式事务问题但第三方MQ支持事务消息的中间件不多比如RocketMQ他们支持事务消息的方式也是类似于采用的二阶段提交但是市面上一些主流的MQ都是不支持事务消息的比如 RabbitMQ 和 Kafka 都不支持。以阿里的 RocketMQ 中间件为例其思路大致为第一阶段Prepared消息会拿到消息的地址。第二阶段执行本地事务第三阶段通过第一阶段拿到的地址去访问消息并修改状态。也就是说在业务方法内要想消息队列提交两次请求一次发送消息和一次确认消息。如果确认消息发送失败了RocketMQ会定期扫描消息集群中的事务消息这时候发现了Prepared消息它会向消息发送者确认所以生产方需要实现一个check接口RocketMQ会根据发送端设置的策略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。在这里插入图片描述2.6 消息广播/批量发送上面发送消息我们测试的时候可以发现消息只有一个消费者能收到如果我们想实现消息广播让每个消费者都能收到消息也是可以实现的。而且上面发送消息的时候每次都是发送单条Message对象能否批量发送呢答案是可以的。2.6.1 消息生产者创建消息生产者BroadcastingProducer,代码如下在这里插入图片描述上图代码如下public class BroadcastingProducer { //nameserver地址 private static String namesrvaddress192.168.211.143:9876;; public static void main(String[] args) throws UnsupportedEncodingException, MQClientException, RemotingException, InterruptedException, MQBrokerException { //创建DefaultMQProducer DefaultMQProducer producer new DefaultMQProducer(broadcasting_producer_group); //指定nameserver地址 producer.setNamesrvAddr(namesrvaddress); //启动 producer.start(); //创建消息 ListMessage messages new ArrayListMessage(); for (int i 0; i 20 ; i) { Message message new Message( Topic_broadcasting, TagBroad, KeyBroadi, (i--hello brodcasting).getBytes(RemotingHelper.DEFAULT_CHARSET)); //将消息添加到集合中 messages.add(message); } //批量发送消息 producer.send(messages); //关闭 producer.shutdown(); }}2.6.2 消息消费者广播消费模式其实就是每个消费者都能读取到消息我们这里只需要将消费者的消费模式设置成广播模式即可。consumer.setMessageModel(MessageModel.BROADCASTING);,代码如下在这里插入图片描述上图代码如下public class BroadcastingConsumerDemo1 { //广播模式 private static String namesrvaddress192.168.211.143:9876;; public static void main(String[] args) throws MQClientException { //创建DefaultMQConsumer DefaultMQPushConsumer consumer new DefaultMQPushConsumer(broadcasting_consumer_group); //指定nameserver地址 consumer.setNamesrvAddr(namesrvaddress); //指定要消费的消息主体 consumer.subscribe(Topic_broadcasting,*); //指定消费顺序 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //指定一次拉取条数 consumer.setConsumeMessageBatchMaxSize(2); //指定消费模式 集群模式/广播模式 consumer.setMessageModel(MessageModel.BROADCASTING); //创建监听监听消息 consumer.setMessageListener(new MessageListenerConcurrently() { Override public ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { try { String topic msg.getTopic(); String tags msg.getTags(); String keys msg.getKeys(); String body new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET); System.out.println(demo1 topic:topic,tags:tags,keys:keys,body:body); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //启动 consumer.start(); }}运行测试可以发现每个节点都消费了20条消息。效果如下图在这里插入图片描述学习https://blog.csdn.net/Same_Liu/article/details/89517571?spm1001.2014.3001.5506
http://www.dnsts.com.cn/news/213700.html

相关文章:

  • 深圳自适应网站公司在哪里可以学习做网站
  • 网站 别名免费网上教学平台
  • 旅游网站源码下载影视广告公司网页设计
  • 广州网站公司建设移动端网站模板
  • 网站建设需要ui吗富阳市网站
  • 长沙 建站优化中信建设有限责任公司属于央企吗
  • 河南省住房和建设厅网站地产网站建设案例
  • 深圳规划建设局网站茂名网站开发公司推荐
  • asp源代码网站南阳做网站公司哪家好
  • 北京网站建设公司排行企业所得税只对企业征收吗
  • 曼斯特(北京)网站建设公司在线表白网页制作
  • 网站版面设计说明wordpress 禁止注册
  • 国外免费建站好看的学校网站首页
  • 望牛墩东莞网站建设域名访问升级紧急中拿笔记好
  • 做服务的网站吗百度的首页
  • 宝安有效的网站制作专做hip hop音乐的网站
  • 网站优化方案教程呼和浩特网站优化公司
  • 怎样做网站的子网南通手机建站模板
  • 网络游戏制作上海网站seo牛巨微
  • 六安网约车收入怎么样seo如何优化的
  • 站长工具是做什么的798艺术区
  • 商务网站推广技巧包括什么wordpress主动提交
  • ae模板下载网站wordpress分配管理员
  • 网站建设政务新媒体php 网站开发架构
  • 甘肃住房和城乡建设部网站网络公关公司危机公关
  • 网站优化成功案例上海百度关键词推广
  • 任丘网站优化WordPress指定IP访问
  • wordpress多站点是什么意思域名是什么样子
  • 邮箱网站架构域名注册方法
  • 带搜索网站建设视频教程封面上的网站怎么做的