广州档案馆建设网站,东营人事考试信息网,企业宣传片,临沂教育平台网站建设订单服务完成支付后将支付结果发给每一个与订单服务对接的微服务#xff0c;订单服务将消息发给交换机#xff0c;由交换机广播消息#xff0c;每个订阅消息的微服务都可以接收到支付结果.
微服务收到支付结果根据订单的类型去更新自己的业务数据。
相关技术方案
使用消息…
订单服务完成支付后将支付结果发给每一个与订单服务对接的微服务订单服务将消息发给交换机由交换机广播消息每个订阅消息的微服务都可以接收到支付结果.
微服务收到支付结果根据订单的类型去更新自己的业务数据。
相关技术方案
使用消息队列进行异步通知需要保证消息的可靠性即生产端将消息成功通知到消费端。
消息从生产端发送到消费端经历了如下过程
1、消息发送到交换机
2、消息由交换机发送到队列
3、消息者收到消息进行处理
保证消息的可靠性需要保证以上过程的可靠性本项目使用RabbitMQ可以通过如下方面保证消息的可靠性。
1、生产者确认机制
发送消息前使用数据库事务将消息保证到数据库表中
成功发送到交换机将消息从数据库中删除
2、mq持久化
mq收到消息进行持久化当mq重启即使消息没有消费完也不会丢失。
需要配置交换机持久化、队列持久化、发送消息时设置持久化。
3、消费者确认机制
消费者消费成功自动发送ack否则重试消费。
订单服务通过消息队列将支付结果发给学习中心服务消息队列采用发布订阅模式。
1、订单服务创建支付结果通知交换机。
2、学习中心服务绑定队列到交换机。
首先需要在学习中心服务和订单服务工程配置连接消息队列。
1、首先在订单服务添加消息队列依赖 XMLdependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-amqp/artifactId/dependency
2、在nacos配置rabbitmq-dev.yaml为通用配置文件 YAML spring: rabbitmq: host: 192.168.101.65 port: 5672 username: guest password: guest virtual-host: / publisher-confirm-type: correlated #correlated 异步回调定义ConfirmCallbackMQ返回结果时会回调这个ConfirmCallback publisher-returns: false #开启publish-return功能同样是基于callback机制需要定义ReturnCallback template: mandatory: false #定义消息路由失败时的策略。true则调用ReturnCallbackfalse则直接丢弃消息 listener: simple: acknowledge-mode: none #出现异常时返回unack消息回滚到mq没有异常返回ack ,manual:手动控制,none:丢弃消息不回滚到mq retry: enabled: true #开启消费者失败重试 initial-interval: 1000ms #初识的失败等待时长为1秒 multiplier: 1 #失败的等待时长倍数下次等待时长 multiplier * last-interval max-attempts: 3 #最大重试次数 stateless: true #true无状态false有状态。如果业务中包含事务这里改为false
3、在订单服务接口工程引入rabbitmq-dev.yaml配置文件 YAML shared-configs: - data-id: rabbitmq-${spring.profiles.active}.yaml group: xuecheng-plus-common refresh: true
4、在订单服务service工程编写MQ配置类配置交换机 Java package com.xuecheng.orders.config; import com.alibaba.fastjson.JSON; import com.xuecheng.messagesdk.model.po.MqMessage; import com.xuecheng.messagesdk.service.MqMessageService; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * author Mr.M * version 1.0 * description TODO * date 2023/2/23 16:59 */ Slf4j Configuration public class PayNotifyConfig implements ApplicationContextAware { //交换机 public static final String PAYNOTIFY_EXCHANGE_FANOUT paynotify_exchange_fanout; //支付结果通知消息类型 public static final String MESSAGE_TYPE payresult_notify; //支付通知队列 public static final String PAYNOTIFY_QUEUE paynotify_queue; //声明交换机且持久化 Bean(PAYNOTIFY_EXCHANGE_FANOUT) public FanoutExchange paynotify_exchange_fanout() { // 三个参数交换机名称、是否持久化、当没有queue与其绑定时是否自动删除 return new FanoutExchange(PAYNOTIFY_EXCHANGE_FANOUT, true, false); } //支付通知队列,且持久化 Bean(PAYNOTIFY_QUEUE) public Queue course_publish_queue() { return QueueBuilder.durable(PAYNOTIFY_QUEUE).build(); } //交换机和支付通知队列绑定 Bean public Binding binding_course_publish_queue(Qualifier(PAYNOTIFY_QUEUE) Queue queue, Qualifier(PAYNOTIFY_EXCHANGE_FANOUT) FanoutExchange exchange) { return BindingBuilder.bind(queue).to(exchange); } Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { // 获取RabbitTemplate RabbitTemplate rabbitTemplate applicationContext.getBean(RabbitTemplate.class); //消息处理service MqMessageService mqMessageService applicationContext.getBean(MqMessageService.class); // 设置ReturnCallback rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) - { // 投递失败记录日志 log.info(消息发送失败应答码{}原因{}交换机{}路由键{},消息{}, replyCode, replyText, exchange, routingKey, message.toString()); MqMessage mqMessage JSON.parseObject(message.toString(), MqMessage.class); //将消息再添加到消息表 mqMessageService.addMessage(mqMessage.getMessageType(),mqMessage.getBusinessKey1(),mqMessage.getBusinessKey2(),mqMessage.getBusinessKey3()); }); } }
重启订单服务登录rabbitmq查看交换机自动创建成功
查看队列自动成功
4.3.2 发送支付结果
在OrderService中定义接口 Java /** * 发送通知结果 * param message */ public void notifyPayResult(MqMessage message);
编写接口实现方法 Java Override public void notifyPayResult(MqMessage message) { //1、消息体转json String msg JSON.toJSONString(message); //设置消息持久化 Message msgObj MessageBuilder.withBody(msg.getBytes(StandardCharsets.UTF_8)) .setDeliveryMode(MessageDeliveryMode.PERSISTENT) .build(); // 2.全局唯一的消息ID需要封装到CorrelationData中 CorrelationData correlationData new CorrelationData(message.getId().toString()); // 3.添加callback correlationData.getFuture().addCallback( result - { if(result.isAck()){ // 3.1.ack消息成功 log.debug(通知支付结果消息发送成功, ID:{}, correlationData.getId()); //删除消息表中的记录 mqMessageService.completed(message.getId()); }else{ // 3.2.nack消息失败 log.error(通知支付结果消息发送失败, ID:{}, 原因{},correlationData.getId(), result.getReason()); } }, ex - log.error(消息发送异常, ID:{}, 原因{},correlationData.getId(),ex.getMessage()) ); // 发送消息 rabbitTemplate.convertAndSend(PayNotifyConfig.PAYNOTIFY_EXCHANGE_FANOUT, , msgObj,correlationData); }
订单服务收到第三方平台的支付结果时在saveAliPayStatus方法中添加代码向数据库消息表添加消息并进行发送消息如下所示 Java Transactional Override public void saveAliPayStatus(PayStatusDto payStatusDto) { ....... //保存消息记录,参数1支付结果通知类型2: 业务id3:业务类型 MqMessage mqMessage mqMessageService.addMessage(payresult_notify, orders.getOutBusinessId(), orders.getOrderType(), null); //通知消息 notifyPayResult(mqMessage); } } 配置交换机和队列
在order-service工程配置 消息发送方法 Java /** * 发送通知结果 * param message */ public void notifyPayResult(MqMessage message); 4.4 接收支付结果
4.4.1 学习中心服务集成MQ
1、在学习中心服务添加消息队列依赖 XMLdependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-amqp/artifactId/dependency
2、在学习中心服务接口工程引入rabbitmq-dev.yaml配置文件 YAML shared-configs: - data-id: rabbitmq-${spring.profiles.active}.yaml group: xuecheng-plus-common refresh: true
3、添加配置类 Java package com.xuecheng.learning.config; import com.alibaba.fastjson.JSON; import com.xuecheng.messagesdk.model.po.MqMessage; import com.xuecheng.messagesdk.service.MqMessageService; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * author Mr.M * version 1.0 * description TODO * date 2023/2/23 16:59 */ Slf4j Configuration public class PayNotifyConfig { //交换机 public static final String PAYNOTIFY_EXCHANGE_FANOUT paynotify_exchange_fanout; //支付结果通知消息类型 public static final String MESSAGE_TYPE payresult_notify; //支付通知队列 public static final String PAYNOTIFY_QUEUE paynotify_queue; //声明交换机且持久化 Bean(PAYNOTIFY_EXCHANGE_FANOUT) public FanoutExchange paynotify_exchange_fanout() { // 三个参数交换机名称、是否持久化、当没有queue与其绑定时是否自动删除 return new FanoutExchange(PAYNOTIFY_EXCHANGE_FANOUT, true, false); } //支付通知队列,且持久化 Bean(PAYNOTIFY_QUEUE) public Queue course_publish_queue() { return QueueBuilder.durable(PAYNOTIFY_QUEUE).build(); } //交换机和支付通知队列绑定 Bean public Binding binding_course_publish_queue(Qualifier(PAYNOTIFY_QUEUE) Queue queue, Qualifier(PAYNOTIFY_EXCHANGE_FANOUT) FanoutExchange exchange) { return BindingBuilder.bind(queue).to(exchange); } }
4.4.2 接收支付结果
监听MQ接收支付结果定义ReceivePayNotifyService类如下 Java package com.xuecheng.learning.service.impl; import com.alibaba.fastjson.JSON; import com.rabbitmq.client.Channel; import com.xuecheng.base.exception.XueChengPlusException; import com.xuecheng.learning.config.PayNotifyConfig; import com.xuecheng.learning.service.MyCourseTablesService; import com.xuecheng.messagesdk.model.po.MqMessage; import com.xuecheng.messagesdk.service.MqMessageService; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.io.IOException; /** * author Mr.M * version 1.0 * description 接收支付结果 * date 2023/2/23 19:04 */ Slf4j Service public class ReceivePayNotifyService { Autowired private RabbitTemplate rabbitTemplate; Autowired MqMessageService mqMessageService; Autowired MyCourseTablesService myCourseTablesService; //监听消息队列接收支付结果通知 RabbitListener(queues PayNotifyConfig.PAYNOTIFY_QUEUE) public void receive(Message message, Channel channel) { try { Thread.sleep(5000); } catch (InterruptedException e) { throw new RuntimeException(e); } //获取消息 MqMessage mqMessage JSON.parseObject(message.getBody(), MqMessage.class); log.debug(学习中心服务接收支付结果:{}, mqMessage); //消息类型 String messageType mqMessage.getMessageType(); //订单类型,60201表示购买课程 String businessKey2 mqMessage.getBusinessKey2(); //这里只处理支付结果通知 if (PayNotifyConfig.MESSAGE_TYPE.equals(messageType) 60201.equals(businessKey2)) { //选课记录id String choosecourseId mqMessage.getBusinessKey1(); //添加选课 boolean b myCourseTablesService.saveChooseCourseStauts(choosecourseId); if(!b){ //添加选课失败抛出异常消息重回队列 XueChengPlusException.cast(收到支付结果添加选课失败); } } } }