网站建设竞争对手分析,哪些网站百度不收录,百度给做网站收费多少,分类用wordpress1.1 什么是MQ? 消息队列#xff08;Message Queue#xff09;#xff0c;是基础数据结构中 “先进先出” 的一种数据结构。 一般用来解决应用解耦、异步消息、流量削峰等问题#xff0c;实现高性能、高可用、可伸缩和最终一致性架构。
RabbitMQ可以理解为一个邮箱#x…1.1 什么是MQ? 消息队列Message Queue是基础数据结构中 “先进先出” 的一种数据结构。 一般用来解决应用解耦、异步消息、流量削峰等问题实现高性能、高可用、可伸缩和最终一致性架构。
RabbitMQ可以理解为一个邮箱或者一个邮局或者是一个邮递员保证 “张三” 的信件最终传递给 “李四”。
RabbitMQ与上述所描述的邮局邮箱、邮递员的主要区别在于它不处理纸张而是接受、存储和转发二进制数据块消息。
1.2.MQ的应用场景 跨系统间的调用 比如说发送短息的功能信息在自己应用中处理完成之后调用第三方发送短信接口假如第三方接口有问题或者延迟比较大就会影响自己接口的响应时间用户的体验就会很差并且第三方也是不可控的这时候就可以把发送短信的操作放到队列中执行如果第一次发送消息失败还可以重复执行 这些操作对用户都是无感知的避免因为第三方系统出现的问题导致自己系统出现问题系统内的异步调用 比如发送评论后 异步进行更新排行榜的功能 比如批量发送消息的功能加入给100w用户发送消息时间肯定会很长改为异步在后台慢慢消费用户就会无感知并且很快的通知发送消息方已经发送完成了消息驱动的场景 比如当满足一个条件以后触发后面的一系列操作这个时候用程序实现起来比较麻烦这个时候使用消息队列来实现就会相当的简单跨语言之间的调用 因为消息队列是和语言无关的也不是函数之间的调用而且消息队列也不要求生产端和消费端同时在线所以很轻松的实现跨语言间的调用
1.3 MQ是怎么实现消息传递的 生产者产生消息并把传输的数据消息放在队列中用队列机制来实现消息传递。消费者可以到指定的队列拉取消息或者订阅相应的队列由MQ服务端给其推送消息。
1.4 MQ的几个主要特性 解耦一个业务需要多个模块共同实现或一条消息有多个系统对应处理只需要在主业务完成以后发送一条MQ其余模块消费MQ消息即可实现业务降低模块之间的耦合。异步主业务执行结束后从属业务通过MQ异步处理减少业务的响应时间提高用户体验。削峰高并发情况下业务异步处理提供高峰期业务处理能力避免系统瘫痪。
1.5.MQ的缺点 系统可用性降低。依赖服务越多服务越容易挂掉。需要考虑MQ瘫痪的情况。系统的复杂性提高。需要考虑消息丢失、消息重复消费、消息传递的顺序性。业务一致性。主业务和从属业务一致性的处理。
1.6RabbitMQ的相关术语术语 Producer生产者用来发送消息到队列Consumer消费者从队列中取出消息消费掉Queue存储消息的容器它就是队列它就是消息的载体每个消息都会通过队列让消费者来消费Channel消息通道可以和rabbitmq建立一个链接一个链接中也可以有多个的通道Exchange交换机决定交换机以什么规则发送到队列中Routing Key路由的关键字交换机就是通过它来决定发送到哪个队列中就是通过它来路由的
1.7RabbitMQ的工作模式5种
1.7.1简单工作模式 这里不需要交换机应用场景1对1的聊天
1.7.2 work工作模式 左边是一个生产者发送消息到队列中多右边是多个消费者竞争消费消息在高并发场景下容易出现同一个消息被多个消费者消费的问题需要注意可以在业务上增加唯一的键值来避免
应用场景红包
1.7.3 订阅模式每个队列的消息都是一样的 左边是一个生产者把消息发送给交换机交换机分别把消息发送到多个队列中最后由消费者来消费
1.7.4 路由模式根据routing key发送到不同的消息队列中(使用的是定向类型的交换机) 左边是一个生产者发送了一条消息交换机根据发送的路由key,发送到相匹配的队列中由消费者来消费
应用场景日志不同等级的日志有不同的方法所以会放到不同的队列中error走上吗的队列info走下面的队列
1.7.5 主题模式根据routing key分类发送到不同的消息队列中 主题模式使用的topic类型的交换机和上面的路由模式类似主要通过通配符和#来判断消息发送到哪个队列中发送到哪个队列中是通过和#来通过routing key 来决定的其中#匹配一个或多个单词*匹配一个单词
2. go使用RabbitMQ
安装RabbitMQ扩展包 go get github.com/streadway/amqp 首先编写demo,简单模式和工作模式 RabbitMq其本质是tcp链接并且是基于内部通道进行的通信所以一个完整的连接分为连接与创建通道两部分。
连接地址的格式是这种形式 amqp://admin:123456127.0.0.1:5672/ services创建mq.go实现封装 生产者流程
在 Golang 中创建 rabbitmq 生产者基本步骤是 连接 Connection 创建 Channel 创建或连接一个交换器 创建或连接一个队列 交换器绑定队列 投递消息 关闭 Channel 关闭 Connection
创建连接
// connection
connection, err : amqp.Dial(amqp://guest:guestlocalhost:5672/)创建通道
// channel
channel, err : connection.Channel()创建交换器
err channel.ExchangeDeclare(e1, direct, true, false, false, true, nil)参数依次说明 name 交换机名称 kind 交换机类型 durable 持久化标识 autoDelete 是否自动删除 internal 是否是内置交换机 noWait 是否等待服务器确认 args 其它配置
参数说明要点
autoDelete :
自动删除功能必须要在交换器曾经绑定过队列或者交换器的情况下处于不再使用的时候才会自动删除如果是刚刚创建的尚未绑定队列或者交换器的交换器或者早已创建只是未进行队列或者交换器绑定的交换器是不会自动删除的。
internal :
内置交换器是一种特殊的交换器这种交换器不能直接接收生产者发送的消息只能作为类似于队列的方式绑定到另一个交换器来接收这个交换器中路由的消息内置交换器同样可以绑定队列和路由消息只是其接收消息的来源与普通交换器不同。
noWait
当 noWait 为 true 时声明时无需等待服务器的确认。
该通道可能由于错误而关闭。 添加一个 NotifyClose 侦听器应对任何异常。创建交换器还有一个差不多的方法( ExchangeDeclarePassive )他主要是假定交换已存在并尝试连接到不存在的交换将导致 RabbitMQ 引发异常可用于检测交换器的存在。
创建队列
q, err : channel.QueueDeclare(q1, true, false, false, true, nil)参数说明 name 队列名称 durable 持久化 autoDelete 自动删除 exclusive 排他 noWait 是否等待服务器确认 args Table 其它配置
参数说明要点
exclusive 排他 排他队列只对首次创建它的连接可见排他队列是基于连接 Connection 可见的并且该连接内的所有信道 Channel都可以访问这个排他队列在这个连接断开之后该队列自动删除由此可见这个队列可以说是绑到连接上的对同一服务器的其他连接不可见。
同一连接中不允许建立同名的排他队列的这种排他优先于持久化即使设置了队列持久化在连接断开后该队列也会自动删除。
非排他队列不依附于连接而存在同一服务器上的多个连接都可以访问这个队列。
autoDelete 设置是否自动删除。为 true 则设置队列为自动删除。
自动删除的前提是:至少有一个消费者连接到这个队列之后所有与这个队列连接的消费者都断开时才会自动删除。
不能把这个参数错误地理解为:“当连接到此队列的所有客户端断开时这个队列自动删除”因为生产者客户端创建这个队列或者没有消费者客户端与这个队列连接时都不会自动删除这个队列。
创建队列还有一个差不多的方法( QueueDeclarePassive )他主要是假定队列已存在并尝试连接到不存在的队列将导致 RabbitMQ 引发异常可用于检测队列的存在。
绑定交换器和队列
err channel.QueueBind(q1, q1Key, e1, true, nil)参数解析 name 队列名称 key BindingKey 根据交换机类型来设定 exchange 交换机名称 noWait 是否等待服务器确认 args Table 其它配置
绑定交换器可选
err channel.ExchangeBind(dest, q1Key, src, false, nil)参数解析 destination 目的交换器 key RoutingKey 路由键 source 源交换器 noWait 是否等待服务器确认 args Table 其它参数
生产者发送消息至交换器 source 中交换器 source 根据路由键找到与其匹配的另一个交换器 destination 井把消息转发到 destination 中进而存储在 destination 绑定的队列 queue 中某种程度上来说 destination 交换器可以看作一个队列。
投递消息
err channel.Publish(e1, q1Key, true, false, amqp.Publishing{Timestamp: time.Now(),DeliveryMode: amqp.Persistent, //Msg set as persistentContentType: text/plain,Body: []byte(Hello Golang and AMQP(Rabbitmq)!),
})参数解析 exchange 交换器名称 key RouterKey 路由键 mandatory 是否为无法路由的消息进行返回处理 immediate 是否对路由到无消费者队列的消息进行返回处理 RabbitMQ 3.0 废弃 msg 消息体
参数说明要点
mandatory
消息发布的时候设置消息的 mandatory 属性用于设置消息在发送到交换器之后无法路由到队列的情况对消息的处理方式设置为 true 表示将消息返回到生产者否则直接丢弃消息。
immediate
参数告诉服务器至少将该消息路由到一个队列中否则将消息返回给生产者。 imrnediate 参数告诉服务器如果该消息关联的队列上有消费者则立刻投递如果所有匹配的队列上都没有消费者则直接将消息返还给生产者不用将消息存入队列而等待消费者了。
RabbitMQ 3.0版本开始去掉了对 imrnediate 参数的支持。
其中 amqp.Publishing 的 DeliveryMode 如果设为 amqp.Persistent 则消息会持久化。需要注意的是如果需要消息持久化 Queue 也是需要设定为持久化才有效。
消费者流程
在 Golang 中创建 rabbitmq 消费者基本步骤是 连接 Connection 创建 Channel 创建或连接一个交换器 创建或连接一个队列 交换器绑定队列 消费消息 关闭 Channel 关闭 Connection
消费者的步骤和生产者流程基本类似只是将生产者流程中的投递消息变为消费消息。