营销网站制作流程,网站集约化建设较好的城市,抖音代运营公司排名前十,温岭新站seo1、发布订阅
订阅模式#xff0c;消息被路由投递给多个队列#xff0c;一个消息被多个消费者获取。 1#xff09; 可以有多个消费者 2#xff09; 每个消费者有自己的queue#xff08;队列#xff09; 3#xff09; 每个队列都要绑定到Exchange#xff08;交换机消息被路由投递给多个队列一个消息被多个消费者获取。 1 可以有多个消费者 2 每个消费者有自己的queue队列 3 每个队列都要绑定到Exchange交换机 4 生产者发送的消息只能发送到交换机交换机来决定要发给哪个队列生产者无法决定。 5 交换机把消息发送给绑定过的所有队列 6 队列的消费者都能拿到消息。实现一条消息被多个消费者消费
相关场景:邮件群发,群聊天,广播(广告)
2、Exchanges交换器
消息产生者将消息放入交换机,交换机发布订阅把消息发送到所有消息队列中,对应消息队列的消费者拿到消息进行消费
有几种交换器类型可用direct, topic, headers 和 fanout。我们将集中讨论最后一个——fanout。
2.1 创建交换器
创建一个这种类型的交换器并给它起个名字叫logs
err ch.ExchangeDeclare(logs, // namefanout, // typetrue, // durablefalse, // auto-deletedfalse, // internalfalse, // no-waitnil, // arguments
)
fanout扇出交换器非常简单。正如你可能从名称中猜测的那样它只是将接收到的所有消息广播到它知道的所有队列中。
2.2 临时队列
也是自动删除队列吗和普通队列在使用上没有什么区别唯一的区别是当消费者断开连接时队列将会被删除。自动删除队列允许的消费者没有限制也就是说当这个队列上最后一个消费者断开连接才会执行删除。
自动删除队列只需要在声明队列时设置属性auto-delete标识为true即可。系统声明的随机队列缺省就是自动删除的。
q, err : ch.QueueDeclare(, // 空字符串作为队列名称false, // 非持久队列false, // delete when unusedtrue, // 独占队列当前声明队列的连接关闭后即被删除false, // no-waitnil, // arguments
)
上述方法返回时生成的队列实例包含RabbitMQ生成的随机队列名称。例如它可能看起来像amq.gen-JzTY20BRgKO-HjmUJj0wLg。
2.3 交换器与队列绑定 交换器和队列之间的关系称为绑定。
err ch.QueueBind(q.Name, // queue name, // routing keylogs, // exchangefalse,nil,
)从现在开始logs交换器将会把消息添加到我们的队列中。
2.4 发布消息到交换机
例如发布到fanout交换器
body : bodyFrom(os.Args)
err ch.Publish(logs, // exchange, // routing keyfalse, // mandatoryfalse, // immediateamqp.Publishing{ContentType: text/plain,Body: []byte(body),}) 3 完整代码 产生日志消息的生产程序与上一教程看起来没有太大不同。最重要的变化是我们现在希望将消息发布到logs交换器而不是空的消息交换器。发送时我们需要提供一个routingKey但是对于fanout型交换器它的值可以被忽略传空字符串。下面是emit_log.go脚本的代码
package mainimport (logosstringsgithub.com/streadway/amqp
)func failOnError(err error, msg string) {if err ! nil {log.Fatalf(%s: %s, msg, err)}
}func main() {conn, err : amqp.Dial(amqp://guest:guestlocalhost:5672/)failOnError(err, Failed to connect to RabbitMQ)defer conn.Close()ch, err : conn.Channel()failOnError(err, Failed to open a channel)defer ch.Close()err ch.ExchangeDeclare(logs, // namefanout, // typetrue, // durablefalse, // auto-deletedfalse, // internalfalse, // no-waitnil, // arguments)failOnError(err, Failed to declare an exchange)body : bodyFrom(os.Args)err ch.Publish(logs, // exchange, // routing keyfalse, // mandatoryfalse, // immediateamqp.Publishing{ContentType: text/plain,Body: []byte(body),})failOnError(err, Failed to publish a message)log.Printf( [x] Sent %s, body)
}func bodyFrom(args []string) string {var s stringif (len(args) 2) || os.Args[1] {s hello} else {s strings.Join(args[1:], )}return s
}emit_logs.go源码
如你所见在建立连接之后我们声明了交换器。此步骤是必需的因为禁止发布到不存在的交换器。
如果没有队列绑定到交换器那么消息将丢失但这对我们来说是ok的。如果没有消费者在接收我们可以安全地丢弃该消息。
receive_logs.go的代码
package mainimport (loggithub.com/streadway/amqp
)func failOnError(err error, msg string) {if err ! nil {log.Fatalf(%s: %s, msg, err)}
}func main() {conn, err : amqp.Dial(amqp://guest:guestlocalhost:5672/)failOnError(err, Failed to connect to RabbitMQ)defer conn.Close()ch, err : conn.Channel()failOnError(err, Failed to open a channel)defer ch.Close()err ch.ExchangeDeclare(logs, // namefanout, // typetrue, // durablefalse, // auto-deletedfalse, // internalfalse, // no-waitnil, // arguments)failOnError(err, Failed to declare an exchange)q, err : ch.QueueDeclare(, // namefalse, // durablefalse, // delete when unusedtrue, // exclusivefalse, // no-waitnil, // arguments)failOnError(err, Failed to declare a queue)err ch.QueueBind(q.Name, // queue name, // routing keylogs, // exchangefalse,nil,)failOnError(err, Failed to bind a queue)msgs, err : ch.Consume(q.Name, // queue, // consumertrue, // auto-ackfalse, // exclusivefalse, // no-localfalse, // no-waitnil, // args)failOnError(err, Failed to register a consumer)forever : make(chan bool)go func() {for d : range msgs {log.Printf( [x] %s, d.Body)}}()log.Printf( [*] Waiting for logs. To exit press CTRLC)-forever
}receive_logs.go源码
如果要将日志保存到文件只需打开控制台并输入
go run receive_logs.go logs_from_rabbit.log如果希望在屏幕上查看日志请切换到一个新的终端并运行
go run receive_logs.go当然要发出日志请输入
go run emit_log.go使用rabbitmqctl list_bindings命令你可以验证代码是否确实根据需要创建了绑定关系和队列。在运行两个receive_logs.go程序后你应该看到类似以下内容
sudo rabbitmqctl list_bindings
# Listing bindings ...
# logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue []
# logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue []
# ...done.对结果的解释很简单数据从logs交换器进入了两个由服务器分配名称的队列。这正是我们想要的。