中山响应式网站,阿里云网站用什么做的,虾米音乐怎么连接到wordpress,现在清算组备案在哪个网站做RabbitMQ 知识详解#xff08;Java版#xff09;
RabbitMQ 是一个开源的消息代理#xff0c;实现了高级消息队列协议#xff08;AMQP#xff09;。它用于在分布式系统中实现应用解耦、异步通信和流量削峰。 核心概念
生产者(Producer)#xff1a;发送消息的应用消费者(…RabbitMQ 知识详解Java版
RabbitMQ 是一个开源的消息代理实现了高级消息队列协议AMQP。它用于在分布式系统中实现应用解耦、异步通信和流量削峰。 核心概念
生产者(Producer)发送消息的应用消费者(Consumer)接收消息的应用队列(Queue)消息存储的缓冲区交换机(Exchange)接收消息并路由到队列绑定(Binding)连接交换机和队列的规则路由键(Routing Key)消息的路由标识 交换机类型
类型路由规则典型用途Direct精确匹配Routing Key点对点通信Topic模式匹配支持通配符多条件路由Fanout广播到所有绑定队列发布/订阅Headers消息头键值对匹配复杂路由 Java 示例使用官方客户端
依赖
dependencygroupIdcom.rabbitmq/groupIdartifactIdamqp-client/artifactIdversion5.9.0/version
/dependency
dependencygroupIdorg.slf4j/groupIdartifactIdslf4j-nop/artifactIdversion1.7.30/version
/dependency示例1基本发送/接收点对点
// 生产者
public class Producer {private final static String QUEUE_NAME hello;public static void main(String[] args) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);try (Connection connection factory.newConnection();Channel channel connection.createChannel()) {// 创建队列持久化/非持久化channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message Hello RabbitMQ!;channel.basicPublish(, QUEUE_NAME, null, message.getBytes());System.out.println( [x] Sent message );}}
}// 消费者
public class Consumer {private final static String QUEUE_NAME hello;public static void main(String[] args) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);Connection connection factory.newConnection();Channel channel connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println( [*] Waiting for messages...);DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println( [x] Received message );};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag - { });}
}示例2发布/订阅模式Fanout交换机
// 发布者
public class Publisher {private static final String EXCHANGE_NAME logs;public static void main(String[] args) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);try (Connection connection factory.newConnection();Channel channel connection.createChannel()) {// 声明fanout类型交换机channel.exchangeDeclare(EXCHANGE_NAME, fanout);String message Broadcast message!;channel.basicPublish(EXCHANGE_NAME, , null, message.getBytes());System.out.println( [x] Sent message );}}
}// 订阅者
public class Subscriber {private static final String EXCHANGE_NAME logs;public static void main(String[] args) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);Connection connection factory.newConnection();Channel channel connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, fanout);// 创建临时队列String queueName channel.queueDeclare().getQueue();channel.queueBind(queueName, EXCHANGE_NAME, );System.out.println( [*] Waiting for messages...);DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println( [x] Received message );};channel.basicConsume(queueName, true, deliverCallback, consumerTag - { });}
}示例3主题路由Topic交换机
// 生产者主题发布
public class TopicProducer {private static final String EXCHANGE_NAME topic_logs;public static void main(String[] args) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);try (Connection connection factory.newConnection();Channel channel connection.createChannel()) {// 声明topic类型交换机channel.exchangeDeclare(EXCHANGE_NAME, topic);String routingKey order.error;String message Order processing error;channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());System.out.println( [x] Sent routingKey : message );}}
}// 消费者主题订阅
public class TopicConsumer {private static final String EXCHANGE_NAME topic_logs;public static void main(String[] args) throws Exception {ConnectionFactory factory new ConnectionFactory();factory.setHost(localhost);Connection connection factory.newConnection();Channel channel connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, topic);String queueName channel.queueDeclare().getQueue();// 绑定多个路由键使用通配符channel.queueBind(queueName, EXCHANGE_NAME, *.error);channel.queueBind(queueName, EXCHANGE_NAME, order.*);System.out.println( [*] Waiting for messages...);DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);String routingKey delivery.getEnvelope().getRoutingKey();System.out.println( [x] Received routingKey : message );};channel.basicConsume(queueName, true, deliverCallback, consumerTag - { });}
}关键特性Java实现
1. 消息持久化
// 声明持久化队列
boolean durable true;
channel.queueDeclare(task_queue, durable, false, false, null);// 发送持久化消息
channel.basicPublish(, task_queue, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());2. 公平分发Prefetch
// 每次只分发一条消息
int prefetchCount 1;
channel.basicQos(prefetchCount);3. 消息确认ACK
// 消费者关闭自动ACK
boolean autoAck false;
channel.basicConsume(queueName, autoAck, deliverCallback, consumerTag - {});// 处理完成后手动ACK
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);4. 持久化消费者
// 重启后自动恢复的消费者
MapString, Object args new HashMap();
args.put(x-queue-type, quorum);
channel.queueDeclare(persistent_queue, true, false, false, args);使用场景
服务解耦订单系统与库存系统分离异步处理耗时操作如邮件发送流量削峰突发请求缓冲秒杀系统分布式事务最终一致性实现日志收集多系统日志聚合 最佳实践
连接管理使用连接池如Spring AMQP的CachingConnectionFactory异常处理实现Consumer和Connection的监听器死信队列处理失败消息集群部署保证高可用性监控管理使用RabbitMQ Management Plugin 提示生产环境推荐使用Spring AMQP简化开发它提供了RabbitTemplate和RabbitListener等便捷工具。