青岛公司网站设计,电商营销策划方案,编程软件推荐,如何做淘宝店网站RabbitMQ入门到实战教程#xff0c;MQ消息中间件#xff0c;消息队列实战-CSDN博客 3.7.Topic交换机
3.7.1.说明
Topic类型的Exchange与Direct相比#xff0c;都是可以根据RoutingKey把消息路由到不同的队列。
只不过Topic类型Exchange可以让队列在绑定BindingKey 的时候…RabbitMQ入门到实战教程MQ消息中间件消息队列实战-CSDN博客 3.7.Topic交换机
3.7.1.说明
Topic类型的Exchange与Direct相比都是可以根据RoutingKey把消息路由到不同的队列。
只不过Topic类型Exchange可以让队列在绑定BindingKey 的时候使用通配符
BindingKey 一般都是有一个或多个单词组成多个单词之间以.分割例如 item.insert
通配符规则
#匹配一个或多个词*匹配不多不少恰好1个词
举例
item.#能够匹配item.spu.insert 或者 item.spuitem.*只能匹配item.spu
图示 假如此时publisher发送的消息使用的RoutingKey共有四种
china.news 代表有中国的新闻消息china.weather 代表中国的天气消息japan.news 则代表日本新闻japan.weather 代表日本的天气消息
解释
topic.queue1绑定的是china.# 凡是以 china.开头的routing key 都会被匹配到包括 china.newschina.weathertopic.queue2绑定的是#.news 凡是以 .news结尾的 routing key 都会被匹配。包括: china.newsjapan.news
接下来我们就按照上图所示来演示一下Topic交换机的用法。
首先在控制台按照图示例子创建队列、交换机并利用通配符绑定队列和交换机。此处步骤略。最终结果如下 3.7.2.消息发送
在publisher服务的SpringAmqpTest类中添加测试方法
/*** topicExchange*/
Test
public void testSendTopicExchange() {// 交换机名称String exchangeName hmall.topic;// 消息String message 喜报孙悟空大战哥斯拉胜!;// 发送消息rabbitTemplate.convertAndSend(exchangeName, china.news, message);
}
3.7.3.消息接收
在consumer服务的SpringRabbitListener中添加方法
RabbitListener(queues topic.queue1)
public void listenTopicQueue1(String msg){System.out.println(消费者1接收到topic.queue1的消息【 msg 】);
}RabbitListener(queues topic.queue2)
public void listenTopicQueue2(String msg){System.out.println(消费者2接收到topic.queue2的消息【 msg 】);
}
3.7.4.总结
描述下Direct交换机与Topic交换机的差异
Topic交换机接收的消息RoutingKey必须是多个单词以 **.** 分割Topic交换机与队列绑定时的bindingKey可以指定通配符#代表0个或多个词*代表1个词
3.8.声明队列和交换机
在之前我们都是基于RabbitMQ控制台来创建队列、交换机。但是在实际开发时队列和交换机是程序员定义的将来项目上线又要交给运维去创建。那么程序员就需要把程序中运行的所有队列和交换机都写下来交给运维。在这个过程中是很容易出现错误的。
因此推荐的做法是由程序启动时检查队列和交换机是否存在如果不存在自动创建。
3.8.1.基本API
SpringAMQP提供了一个Queue类用来创建队列 SpringAMQP还提供了一个Exchange接口来表示所有不同类型的交换机 我们可以自己创建队列和交换机不过SpringAMQP还提供了ExchangeBuilder来简化这个过程 而在绑定队列和交换机时则需要使用BindingBuilder来创建Binding对象 3.8.2.fanout示例
在consumer中创建一个类声明队列和交换机
package com.itheima.consumer.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
public class FanoutConfig {/*** 声明交换机* return Fanout类型交换机*/Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange(hmall.fanout);}/*** 第1个队列*/Beanpublic Queue fanoutQueue1(){return new Queue(fanout.queue1);}/*** 绑定队列和交换机*/Beanpublic Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}/*** 第2个队列*/Beanpublic Queue fanoutQueue2(){return new Queue(fanout.queue2);}/*** 绑定队列和交换机*/Beanpublic Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}
3.8.2.direct示例
direct模式由于要绑定多个KEY会非常麻烦每一个Key都要编写一个binding
package com.itheima.consumer.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
public class DirectConfig {/*** 声明交换机* return Direct类型交换机*/Beanpublic DirectExchange directExchange(){return ExchangeBuilder.directExchange(hmall.direct).build();}/*** 第1个队列*/Beanpublic Queue directQueue1(){return new Queue(direct.queue1);}/*** 绑定队列和交换机*/Beanpublic Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with(red);}/*** 绑定队列和交换机*/Beanpublic Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with(blue);}/*** 第2个队列*/Beanpublic Queue directQueue2(){return new Queue(direct.queue2);}/*** 绑定队列和交换机*/Beanpublic Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with(red);}/*** 绑定队列和交换机*/Beanpublic Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with(yellow);}
}
3.8.4.基于注解声明
基于Bean的方式声明队列和交换机比较麻烦Spring还提供了基于注解方式来声明。
例如我们同样声明Direct模式的交换机和队列
RabbitListener(bindings QueueBinding(value Queue(name direct.queue1),exchange Exchange(name hmall.direct, type ExchangeTypes.DIRECT),key {red, blue}
))
public void listenDirectQueue1(String msg){System.out.println(消费者1接收到direct.queue1的消息【 msg 】);
}RabbitListener(bindings QueueBinding(value Queue(name direct.queue2),exchange Exchange(name hmall.direct, type ExchangeTypes.DIRECT),key {red, yellow}
))
public void listenDirectQueue2(String msg){System.out.println(消费者2接收到direct.queue2的消息【 msg 】);
}
是不是简单多了。
再试试Topic模式
RabbitListener(bindings QueueBinding(value Queue(name topic.queue1),exchange Exchange(name hmall.topic, type ExchangeTypes.TOPIC),key china.#
))
public void listenTopicQueue1(String msg){System.out.println(消费者1接收到topic.queue1的消息【 msg 】);
}RabbitListener(bindings QueueBinding(value Queue(name topic.queue2),exchange Exchange(name hmall.topic, type ExchangeTypes.TOPIC),key #.news
))
public void listenTopicQueue2(String msg){System.out.println(消费者2接收到topic.queue2的消息【 msg 】);
}
3.9.消息转换器
Spring的消息发送代码接收的消息体是一个Object 而在数据传输时它会把你发送的消息序列化为字节发送给MQ接收消息的时候还会把字节反序列化为Java对象。
只不过默认情况下Spring采用的序列化方式是JDK序列化。众所周知JDK序列化存在下列问题
数据体积过大有安全漏洞可读性差
我们来测试一下。
3.9.1.测试默认转换器
1创建测试队列
首先我们在consumer服务中声明一个新的配置类 利用Bean的方式创建一个队列
具体代码
package com.itheima.consumer.config;import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
public class MessageConfig {Beanpublic Queue objectQueue() {return new Queue(object.queue);}
}
注意这里我们先不要给这个队列添加消费者我们要查看消息体的格式。
重启consumer服务以后该队列就会被自动创建出来了 2发送消息
我们在publisher模块的SpringAmqpTest中新增一个消息发送的代码发送一个Map对象
Test
public void testSendMap() throws InterruptedException {// 准备消息MapString,Object msg new HashMap();msg.put(name, 柳岩);msg.put(age, 21);// 发送消息rabbitTemplate.convertAndSend(object.queue, msg);
}
发送消息后查看控制台 可以看到消息格式非常不友好。
3.9.2.配置JSON转换器
显然JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高因此可以使用JSON方式来做序列化和反序列化。
在publisher和consumer两个服务中都引入依赖
dependencygroupIdcom.fasterxml.jackson.dataformat/groupIdartifactIdjackson-dataformat-xml/artifactIdversion2.9.10/version
/dependency
注意如果项目中引入了spring-boot-starter-web依赖则无需再次引入Jackson依赖。
配置消息转换器在publisher和consumer两个服务的启动类中添加一个Bean即可
Bean
public MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter new Jackson2JsonMessageConverter();// 2.配置自动创建消息id用于识别不同消息也可以在业务中基于ID判断是否是重复消息jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;
}
消息转换器中添加的messageId可以便于我们将来做幂等性判断。
此时我们到MQ控制台删除object.queue中的旧的消息。然后再次执行刚才的消息发送的代码到MQ的控制台查看消息结构 3.9.3.消费者接收Object
我们在consumer服务中定义一个新的消费者publisher是用Map发送那么消费者也一定要用Map接收格式如下
RabbitListener(queues object.queue)
public void listenSimpleQueueMessage(MapString, Object msg) throws InterruptedException {System.out.println(消费者接收到object.queue消息【 msg 】);
} 4.业务改造
案例需求改造余额支付功能将支付成功后基于OpenFeign的交易服务的更新订单状态接口的同步调用改为基于RabbitMQ的异步通知。
如图 说明我们只关注交易服务步骤如下
定义topic类型交换机命名为pay.topic定义消息队列命名为mark.order.pay.queue将mark.order.pay.queue与pay.topic绑定BindingKey为pay.success支付成功时不再调用交易服务更新订单状态的接口而是发送一条消息到pay.topic发送消息的RoutingKey 为pay.success消息内容是订单id交易服务监听mark.order.pay.queue队列接收到消息后更新订单状态为已支付
4.1.配置MQ
不管是生产者还是消费者都需要配置MQ的基本信息。分为两步
1添加依赖 !--消息发送--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependency
2配置MQ地址
spring:rabbitmq:host: 192.168.150.101 # 你的虚拟机IPport: 5672 # 端口virtual-host: /hmall # 虚拟主机username: hmall # 用户名password: 123 # 密码
4.1.接收消息
在trade-service服务中定义一个消息监听类 其代码如下
package com.hmall.trade.listener;import com.hmall.trade.service.IOrderService;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;Component
RequiredArgsConstructor
public class PayStatusListener {private final IOrderService orderService;RabbitListener(bindings QueueBinding(value Queue(name mark.order.pay.queue, durable true),exchange Exchange(name pay.topic, type ExchangeTypes.TOPIC),key pay.success))public void listenPaySuccess(Long orderId){orderService.markOrderPaySuccess(orderId);}
}
4.2.发送消息
修改pay-service服务下的com.hmall.pay.service.impl.PayOrderServiceImpl类中的tryPayOrderByBalance方法
private final RabbitTemplate rabbitTemplate;Override
Transactional
public void tryPayOrderByBalance(PayOrderDTO payOrderDTO) {// 1.查询支付单PayOrder po getById(payOrderDTO.getId());// 2.判断状态if(!PayStatus.WAIT_BUYER_PAY.equalsValue(po.getStatus())){// 订单不是未支付状态异常throw new BizIllegalException(交易已支付或关闭);}// 3.尝试扣减余额userClient.deductMoney(payOrderDTO.getPw(), po.getAmount());// 4.修改支付单状态boolean success markPayOrderSuccess(payOrderDTO.getId(), LocalDateTime.now());if (!success) {throw new BizIllegalException(交易已支付或关闭);}// 5.修改订单状态// tradeClient.markOrderPaySuccess(po.getBizOrderNo());try {rabbitTemplate.convertAndSend(pay.topic, pay.success, po.getBizOrderNo());} catch (Exception e) {log.error(支付成功的消息发送失败支付单id{} 交易单id{}, po.getId(), po.getBizOrderNo(), e);}
}
5.练习
5.1.抽取共享的MQ配置
将MQ配置抽取到Nacos中管理微服务中直接使用共享配置。
5.2.改造下单功能
改造下单功能将基于OpenFeign的清理购物车同步调用改为基于RabbitMQ的异步通知
定义topic类型交换机命名为trade.topic定义消息队列命名为cart.clear.queue将cart.clear.queue与trade.topic绑定BindingKey为order.create下单成功时不再调用清理购物车接口而是发送一条消息到trade.topic发送消息的RoutingKey 为order.create消息内容是下单的具体商品、当前登录用户信息购物车服务监听cart.clear.queue队列接收到消息后清理指定用户的购物车中的指定商品
5.3.登录信息传递优化
某些业务中需要根据登录用户信息处理业务而基于MQ的异步调用并不会传递登录用户信息。前面我们的做法比较麻烦至少要做两件事
消息发送者在消息体中传递登录用户消费者获取消息体中的登录用户处理业务
这样做不仅麻烦而且编程体验也不统一毕竟我们之前都是使用UserContext来获取用户。
大家思考一下有没有更优雅的办法传输登录用户信息让使用MQ的人无感知依然采用UserContext来随时获取用户。
参考资料
Spring AMQP
5.4.改造项目一
思考一下项目一中的哪些业务可以由同步方式改为异步方式调用试着改造一下。
举例短信发送