当前位置: 首页 > news >正文

网站 改域名上海企业一户式查询

网站 改域名,上海企业一户式查询,大学生服装网站建设策划书,离石商城网站建设系统目录 一. 作用#xff1a; 二. RabbitMQ的5中队列模式#xff1a; 1. 简单模式 2. Work模式 3. 发布/订阅模式 4. 路由模式 5. 主题模式 三. 消息持久化#xff1a; 消息过期时间 ACK应答 四. 同步接收和异步接收#xff1a; 应用场景 五. 基本使用 #xff…目录 一. 作用 二. RabbitMQ的5中队列模式 1. 简单模式 2. Work模式 3. 发布/订阅模式 4. 路由模式 5. 主题模式 三. 消息持久化 消息过期时间 ACK应答  四. 同步接收和异步接收 应用场景 五. 基本使用 引入依赖库 配置文件RabbitMQConfig  创建消息任务类  解析 一. 作用 RabbitMQ主要用于消息队列的实现。 二. RabbitMQ的5中队列模式 1. 简单模式 一个生产者发送方对应一个消费者接收方 2. Work模式 一个生产者对应多个消费者但是只能有一个消费者获得消息排他 3. 发布/订阅模式 一个消费者将消息首先发送到fanout交换器交换器绑定到多个队列然后与之对应的所有消费者都能接收到消息不排他 4. 路由模式 生产者将消息发送到direct交换器交换器按照关键字Key把消息路由到某个队列 5. 主题模式 生产者将消息发送到Topic交换器交换器按照复杂的规则把消息路由到某个队列 三. 消息持久化 消息的可靠性是RabbitMQ的一大特色那么RabbitMQ是如何保证消息可靠性的呢答案就是消息持久化。持久化可以防止在异常情况下丢失数据。除了消息持久化之外甚至交换器和队列都能持久化。也就是说rabbitmq的消息会被存储在磁盘上只有当消费收到消息rabbitmq确认消费者收到消息Acknowledgments--简称ACK后才会将消息从队列中删除。   消息过期时间 如果消费者一直不接收消息消息会一直保存在消息队列当中短期内可能不会有什么影响但是如果经过长时间的积累后消息会变得很多很多 浪费大量的资源内存。 为了应对这种情况就可以对rabbitmq设置消息的过期时间在规定时间内消息没有被接收就会删除掉该消息。 ACK应答  消费者接收到消息后为了让RabbitMQ 知道就需要返回一个ACK应答告诉RabbitMQ消费者已经收到了消息如果收到消息后我们需要删除该消息只需要在ACK应答中加上deliveryTag标志位。 四. 同步接收和异步接收 同步接收指消费者调用方法时会阻塞来等待消息直到消息被成功消费或者队列为空。没有消息等待消息再接着处理。 异步接收 指消费者不会在接收消息时阻塞而是通过回调函数处理消息。消费者在等待消息的同时不会停下可以处理其他任务。当有消息时才来处理消息。 应用场景 同步接收 当消息的处理顺序对业务逻辑非常重要就使用同步接收消费者一次只处理一个消息确保了每条消息的处理顺序。 异步接收当处理消息的时间比较长或者系统的并发量大时采用异步接收会更好。 RabbitMQ还有一个杀手锏——同时使用异步收发和同步收发。 五. 基本使用 引入依赖库 dependency     groupIdcom.rabbitmq/groupId     artifactIdamqp-client/artifactId     version5.9.0/version /dependency dependency     groupIdorg.springframework.boot/groupId     artifactIdspring-boot-starter-amqp/artifactId /dependency  配置文件RabbitMQConfig  import com.rabbitmq.client.ConnectionFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;Configuration public class RabbitMQConfig {Value(${rabbitmq.factoryHost})private String host;Beanpublic ConnectionFactory connectionFactory() {ConnectionFactory factory new ConnectionFactory();factory.setHost(host);factory.setPort(5672);return factory;} } host配置我将rabbitMQ放在虚拟机上的所有ip是虚拟机的地址 创建消息任务类  Slf4j Component public class MessageTask {Autowiredprivate ConnectionFactory factory;Autowiredprivate MessageService messageService;/** 同步发送消息* */public void send(String topic, MessageEntity entity) {//向MongoDB保存消息数据返回消息IDString id messageService.insertMessage(entity);//向RabbitMQ发送消息try(Connection connection factory.newConnection();Channel channel connection.createChannel()){//连接到某个topicchannel.queueDeclare(topic, true, false, false, null);HashMap header new HashMap();header.put(messageId,id);//创建AMQP协议参与对象添加附加属性AMQP.BasicProperties properties new AMQP.BasicProperties().builder().headers(header).build();channel.basicPublish(,topic,properties,entity.getMsg().getBytes());log.debug(消息发送成功);} catch (Exception e){log.error(e.getMessage());throw new EmosException(向MQ发送消息失败);}}/** 异步发送消息* */Async(AsyncTaskExecutor)public void sendAsync(String topic, MessageEntity entity) {send(topic, entity);}/** 同步接收消息* */public int receive(String topic) {int i 0;try (//接收消息数据Connection connection factory.newConnection();Channel channel connection.createChannel()) {// 从队列中获取消息不自动确认channel.queueDeclare(topic, true, false, false, null);//Topic中有多少条数据未知所以使用死循环接收数据直到接收不到消息退出死循环while (true) {//创建响应接收数据禁止自动发送Ack应答GetResponse response channel.basicGet(topic, false);if (response ! null) {AMQP.BasicProperties properties response.getProps();MapString, Object header properties.getHeaders(); //获取附加属性对象String messageId header.get(messageId).toString();byte[] body response.getBody();//获取消息正文String message new String(body);log.debug(从RabbitMQ接收的消息 message);MessageRefEntity entity new MessageRefEntity();entity.setMessageId(messageId);entity.setReceiverId(Integer.parseInt(topic));entity.setReadFlag(false);entity.setLastFlag(true);messageService.insertRef(entity); //把消息存储在MongoDB中//数据保存到MongoDB后才发送Ack应答让Topic删除这条消息long deliveryTag response.getEnvelope().getDeliveryTag();channel.basicAck(deliveryTag, false);i;} else {break; //接收不到消息则退出死循环}}} catch (Exception e) {log.error(执行异常, e);}return i;}/** 异步接收消息* */Asyncpublic int receiveAsync(String topic) {return receive(topic);}/** 同步删除消息* */public void deleteQueue(String topic) {try(//接收消息数据Connection connection factory.newConnection();Channel channel connection.createChannel()){channel.queueDelete(topic);log.debug(成功删除消息队列:topic);} catch (Exception e){log.error(删除消息队列失败:,e);throw new EmosException(删除消息队列失败);}}/** 异步删除消息* */Asyncpublic void deleteAsync(String topic) {deleteQueue(topic);} } 解析 channel.queueDeclare(String queueName, boolean durable, boolean exclusive, boolean autoDelete, MapString, Object arguments);queueName:队列的名称用于标识消息的存储位置。durable true表示队列是持久化的。 false表示队列是非持久化的。 exclusive true队列仅供当前连接使用连接断开时队列会自动删除。 false队列可供多个连接共享。 autoDelete true当队列不再被任何消费者订阅时队列会自动删除。 false队列即使没有消费者订阅也会一直存在直到手动删除。 arguments额外的参数null表示没有额外参数 MapString, Object arguments new HashMap(); arguments.put(x-message-ttl, 60000); // 设置队列中消息的过期时间为 60 秒60000 毫秒 channel.queueDeclare(myQueue, true, false, false, arguments);
http://www.dnsts.com.cn/news/78953.html

相关文章:

  • 天津做艺术品的网站wordpress 源码出售
  • 手机怎么创网站免费下载郑州比较好的外贸公司
  • 南水北调建设管理局网站网站开发前准备
  • 杭州做宠物网站的公司哪家好wordpress 热门搜索
  • 长沙专业做网站电影网站的建设
  • 网站托管费建设评标专家在哪个网站
  • 网站建设套餐报在局网站 作风建设
  • 做文字的网站宣传文案模板
  • 导购分享网站模板经营网站 备案信息
  • 月子中心网站建设需求网站申请
  • 雄安网站制作多少钱青岛做网站哪个公司好
  • 网站建设与设计ppt模板宣传海报怎么制作
  • jsp网站开发的使用表格闲聊app是哪个公司开发
  • 专注番禺网站优化上海这边敲墙拆旧做啥网站的比较多
  • 中国旅游网站的建设网站备案会检查空间
  • 福州网站建设方案优化九江 网站建设公司
  • seo体系网站的建设及优化微信公众平台开发源代码
  • 铁岭 开原网站建设wordpress优化访问速度
  • 崇文网站建设个人网站做跳转怎么弄
  • 陕西省建设网站wordpress古文主题
  • 南宁自助建站模板做创意ppt网站有哪些
  • 云南建设厅和网站河南app定制开发
  • 云南省建设工程档案馆网站wordpress不用邮件验证注册
  • 网站在百度上做推广怎样做镇江微淘软件开发
  • 茂名公司网站设计团队网站制作合肥
  • 网站换域名做301宣传画册设计
  • 如何创造网站html访问wordpress
  • 做网站采集内容太原建站公司有哪些
  • 创建网站的app上网行为管理
  • 江苏建设类高级工程师在那个网站公示信誉好的龙岗网站制作