当前位置: 首页 > news >正文

同城型网站开发网站建设与制作设计公司

同城型网站开发,网站建设与制作设计公司,公司网站欢迎语,三五互联网站管理登录网址文章目录 #x1f343;前言#x1f334;扫描线程的实现#x1f332;实现消费消息#x1f333;实现addConsumer()方法#x1f38b;VirtualHost类订阅消息的完善⭕总结 #x1f343;前言 本次开发目标 实现消费消息的核心逻辑 #x1f334;扫描线程的实现 我们先给Cons… 文章目录 前言扫描线程的实现实现消费消息实现addConsumer()方法VirtualHost类订阅消息的完善⭕总结 前言 本次开发目标 实现消费消息的核心逻辑 扫描线程的实现 我们先给ConsumerManager类注入一些基础的属性 我们定义一个构造方法里面就是我们的扫描线程确保我们程序启动时就可以一直对我们的阻塞队列进行扫描。 传入的对象为我们前面所构造的 虚拟机类的对象我们只需要在VirtualHost 类里面添加实例化并传参即可 扫描线程逻辑如下 阻塞队列里元素就取出取出来的元素我们称它为令牌根据该令牌我们可以找到需要被消费消息的队列若该队列存在我们便可以调用相应的消费方法进行消费 我们还需要将该线程设为后台线程 为了线程安全必要的地方我们进行加锁操作。 并让这个线程永远的扫描下去代码实现如下 public ConsumerManager(VirtualHost p) {parent p;scannerThread new Thread(() - {while (true) {try {// 1. 拿到令牌String queueName tokenQueue.take();// 2. 根据令牌, 找到队列MSGQueue queue parent.getMemoryDataCenter().getQueue(queueName);if (queue null) {throw new MqException([ConsumerManager] 取令牌后发现, 该队列名不存在! queueName queueName);}// 3. 从这个队列中消费一个消息.synchronized (queue) {consumeMessage(queue);}} catch (InterruptedException | MqException e) {e.printStackTrace();}}});// 把线程设为后台线程.scannerThread.setDaemon(true);scannerThread.start(); }实现消费消息 扫描后就需要交给线程池进行负责执行回调函数消费消息 流程如下 按照轮询的方式找出消费者若消费者存在则从队列中取出一个消息若该队列中有消息我们则交给线程池来执行回调函数消费消息 在线程池内我们需要做的操作有 把消息放到待确认的集合中. 这个操作势必在执行回调之前.执行回调操作若出现问题则不会执行后续应答操作若正常执行我们则需要进行应答应答又分为自动应答与手动应答如果当前是 “自动应答” , 就可以直接把消息删除了。如果当前是 “手动应答” , 则先不处理, 交给后续消费者调用VirtualHost类 basicAck 方法来处理. 我们先来看一下自动应答的情况 删除消息时我们需要删除三处的存储硬盘、待确认集合、内存中心的消息 需要注意的是在删除硬盘上的数据时我们需要查看该消息是否要持久化若需要持久化则不要删除. 实现代码如下 private void consumeMessage(MSGQueue queue) {// 1. 按照轮询的方式, 找个消费者出来.ConsumerEnv luckyDog queue.chooseConsumer();if (luckyDog null) {// 当前队列没有消费者, 暂时不消费. 等后面有消费者出现再说.return;}// 2. 从队列中取出一个消息Message message parent.getMemoryDataCenter().pollMessage(queue.getName());if (message null) {// 当前队列中还没有消息, 也不需要消费.return;}// 3. 把消息带入到消费者的回调方法中, 丢给线程池执行.workerPool.submit(() - {try {// 1. 把消息放到待确认的集合中. 这个操作势必在执行回调之前.parent.getMemoryDataCenter().addMessageWaitAck(queue.getName(), message);// 2. 真正执行回调操作luckyDog.getConsumer().handleDelivery(luckyDog.getConsumerTag(), message.getBasicProperties(),message.getBody());// 3. 如果当前是 自动应答 , 就可以直接把消息删除了.// 如果当前是 手动应答 , 则先不处理, 交给后续消费者调用 basicAck 方法来处理.if (luckyDog.isAutoAck()) {// 1) 删除硬盘上的消息if (message.getDeliverMode() 2) {parent.getDiskDataCenter().deleteMessage(queue, message);}// 2) 删除上面的待确认集合中的消息parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageId());// 3) 删除内存中消息中心里的消息parent.getMemoryDataCenter().removeMessage(message.getMessageId());System.out.println([ConsumerManager] 消息被成功消费! queueName queue.getName());}} catch (Exception e) {e.printStackTrace();}}); }关于手动应答的实现步骤我们分为以下四步 获取到消息和队列删除硬盘上的数据删除消息中心中的数据删除待确认的集合中的数据 代码实现如下 public boolean basicAck(String queueName, String messageId) {queueName virtualHostName queueName;try {// 1. 获取到消息和队列Message message memoryDataCenter.getMessage(messageId);if (message null) {throw new MqException([VirtualHost] 要确认的消息不存在! messageId messageId);}MSGQueue queue memoryDataCenter.getQueue(queueName);if (queue null) {throw new MqException([VirtualHost] 要确认的队列不存在! queueName queueName);}// 2. 删除硬盘上的数据if (message.getDeliverMode() 2) {diskDataCenter.deleteMessage(queue, message);}// 3. 删除消息中心中的数据memoryDataCenter.removeMessage(messageId);// 4. 删除待确认的集合中的数据memoryDataCenter.removeMessageWaitAck(queueName, messageId);System.out.println([VirtualHost] basicAck 成功! 消息被成功确认! queueName queueName , messageId messageId);return true;} catch (Exception e) {System.out.println([VirtualHost] basicAck 失败! 消息确认失败! queueName queueName , messageId messageId);e.printStackTrace();return false;} }实现addConsumer()方法 该方法是为了实现前面我们添加订阅者的方法 在这个方法里我们需要实现的是 根据队列名查找该队列是否存在若存在构造相应的消费者进行添加并且若该队列里存在未消费的消息我们就直接进行消费掉 代码实现如下 public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {// 找到对应的队列.MSGQueue queue parent.getMemoryDataCenter().getQueue(queueName);if (queue null) {throw new MqException([ConsumerManager] 队列不存在! queueName queueName);}ConsumerEnv consumerEnv new ConsumerEnv(consumerTag,queueName,autoAck,consumer);synchronized (queue) {queue.addConsumerEnv(consumerEnv);// 如果当前队列中已经有了一些消息了, 需要立即就消费掉.int n parent.getMemoryDataCenter().getMessageCount(queueName);for (int i 0; i n; i) {// 这个方法调用一次就消费一条消息.consumeMessage(queue);}} }VirtualHost类订阅消息的完善 直接调用即可代码实现如下 // 订阅消息.// 添加一个队列的订阅者, 当队列收到消息之后, 就要把消息推送给对应的订阅者.// consumerTag: 消费者的身份标识// autoAck: 消息被消费完成后, 应答的方式. 为 true 自动应答. 为 false 手动应答.// consumer: 是一个回调函数. 此处类型设定成函数式接口. 这样后续调用 basicConsume 并且传实参的时候, 就可以写作 lambda 样子了.public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {// 构造一个 ConsumerEnv 对象, 把这个对应的队列找到, 再把这个 Consumer 对象添加到该队列中.queueName virtualHostName queueName;try {consumerManager.addConsumer(consumerTag, queueName, autoAck, consumer);System.out.println([VirtualHost] basicConsume 成功! queueName queueName);return true;} catch (Exception e) {System.out.println([VirtualHost] basicConsume 失败! queueName queueName);e.printStackTrace();return false;}}⭕总结 关于《【消息队列开发】 实现ConsumerManager类——消费消息的核心逻辑》就讲解到这儿感谢大家的支持欢迎各位留言交流以及批评指正如果文章对您有帮助或者觉得作者写的还不错可以点一下关注点赞收藏支持一下
http://www.dnsts.com.cn/news/61198.html

相关文章:

  • 浙江住房和城乡建设网站万能浏览器手机版下载安装
  • 做PHP网站前端网站进不去塑胶网站建设
  • 以前在线做预算的网站东莞网站建设哪里找
  • 网站后台维护一般要怎么做网店美工素材
  • 有没有专门做采购的网站建网站多少钱建个网站需要怎么做
  • 电子商务网站开发技术和工具有哪些网站建设建议书
  • 建设银行新加坡招聘网站电商系统平台开发
  • 企业网站可以备案几个西安app开发
  • 做网站怎样做全页面珠海网站建设公
  • 深圳市龙岗区做网站的公司微信如何自己开发小程序
  • 上海建设银行官网网站首页百度竞价推广运营
  • 网站安全建设思考有道云笔记 wordpress
  • 我是做网站的 怎么才能提高业绩服装定制行业市场分析
  • 怎么判断一个网站做的好网络工程可以从事什么工作
  • h5免费模板网站深圳中国网站制作哪家公司好
  • 河南网站备案所需资料网站建设能挣钱吗
  • 做网站前端ps很重要吗湖南郴州最新消息
  • 网站做下cdn网站建设图片链接方法
  • 设计师的个人网站wordpress4.9.4中文版
  • 国外网站后台模板商城网站后台模板
  • 营销型网站规划抖音代运营合同文件
  • 随机图片网站旅游网站设计的目的与意义
  • 网站站外链接什么是网站的tdk
  • 企业网站的作用网站内部建设和程序
  • 如何制作网站教程视频讲解网站建设转正申请报告
  • 搜搜网站提交电子工程网站大全
  • 慈利网站开发asp 网站数据库连接错误
  • 网站做弹窗广告怎么建微信公众号
  • 怎么删除织梦做的网站网站建设亿码酷出名5
  • 网站后台信息怎么更新一建证挂出去一年多少钱