如何在360网站网页上做笔记,军事国际形势最新消息,强大的wordpress 二次开发,海外专用服务器一、实现思路
二、异常情况测试现象及解决 说明:本文涵盖了关于RabbitMQ很多方面的知识点, 如: 消息发送确认机制 、消费确认机制 、消息的重新投递 、消费幂等性,
二、实现思路 1.简略介绍163邮箱授权码的获取 2.编写发送邮件工具类 3.编写RabbitMQ配置文件 4.生产者发起调用…一、实现思路
二、异常情况测试现象及解决 说明:本文涵盖了关于RabbitMQ很多方面的知识点, 如: 消息发送确认机制 、消费确认机制 、消息的重新投递 、消费幂等性,
二、实现思路 1.简略介绍163邮箱授权码的获取 2.编写发送邮件工具类 3.编写RabbitMQ配置文件 4.生产者发起调用 5.消费者发送邮件 6.定时任务定时拉取投递失败的消息, 重新投递 7.各种异常情况的测试验证 8.拓展: 使用动态代理实现消费端幂等性验证和消息确认(ack)
三、 代码实现
配置版本如下
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersionparentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion2.6.3/versionrelativePath/ !-- lookup parent from repository --/parentgroupIdcom.atguigu.gulimall/groupIdartifactIdprovider-and-consumer/artifactIdversion0.0.1-SNAPSHOT/versionnameprovider-and-consumer/namedescriptionDemo project for Spring Boot/descriptionurl/licenseslicense//licensesdevelopersdeveloper//developersscmconnection/developerConnection/tag/url//scmpropertiesjava.version1.8/java.version!-- spring-cloud.version2021.0.4/spring-cloud.version--spring-cloud.version2021.0.1/spring-cloud.version/propertiesdependencies!--joda time 这个还有些问题这个类库是做什么的--dependencygroupIdjoda-time/groupIdartifactIdjoda-time/artifactIdversion2.10/version/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-mail/artifactId/dependencydependencygroupIdcom.atguigu.gulimall/groupIdartifactIdgulimall-common/artifactIdversion0.0.1-SNAPSHOT/versionexclusionsexclusionartifactIdservlet-api/artifactIdgroupIdjavax.servlet/groupId/exclusion/exclusions/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependencydependencygroupIdorg.springframework.amqp/groupIdartifactIdspring-rabbit-test/artifactIdscopetest/scope/dependency!--什么作用 --dependencygroupIdorg.apache.commons/groupIdartifactIdcommons-collections4/artifactIdversion4.2/version/dependencydependencygroupIdorg.springframework.amqp/groupIdartifactIdspring-rabbit/artifactIdversion2.4.2/versionscopecompile/scope/dependency/dependenciesbuildpluginsplugingroupIdorg.springframework.boot/groupIdartifactIdspring-boot-maven-plugin/artifactId/plugin/pluginsresourcesresourcedirectorysrc/main/java/directory!--所在的目录--includes!--包括目录下的.properties,.xml文件都会扫描到--include**/*.properties/includeinclude**/*.xml/include/includesfilteringfalse/filtering/resource/resources/build/project
完整代码可以参考我的GitHub, https://gitee.com/zhai_jiahao/gulimall
代码实现 1.163邮箱授权码的获取, 如图: 每次启用授权码的时候就会出现一行字符串其实就是三方发送邮件的时候使用的密码该授权码就是配置文件spring.mail.password需要的密码
项目结构
1、rabbitmq、邮箱配置
server:port: 8023#数据源配置
spring:datasource:url: jdbc:mysql://192.168.56.10:3306/gulimall_umsusername: rootpassword: rootdriver-class-name: com.mysql.cj.jdbc.Driver#配置nacoscloud:nacos:discovery:server-addr: 127.0.0.1:8848#配置服务名称application:name: provider-and-consumer# 配置rabbitMq 服务器#spring.application.namerabbitmq-consumer-truerabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guest#虚拟host 可以不设置,使用server默认hostvirtual-host: /publisher-returns: true #确认消息已发送到队列(Queue) 这个在生产者模块配置 这个后期再配置这会还用不到publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange) 这个在生产者模块配置 这个后期再配置这会还用不到listener: #这个在测试消费多个消息的时候不能有下面这些配置否则只能消费一个消息后就不继续消费了simple:acknowledge-mode: manual #指定MQ消费者的确认模式是手动确认模式 这个在消费者者模块配置 设置手动确认(ack)prefetch: 1 #一次只能消费一条消息 这个在消费者者模块配置#配置mailmail:host: smtp.163.comusername: 15131650119163.comfrom: 15131650119163.compassword: GTMCFUFBTNZERDJAdefault-encoding: UTF-8properties:mail:stmp:auth: truestarttls:enable: truerequired: true#配置日志输出级别
logging:level:com.atguigu.gulimall: debug #level 日志等级 指定命名空间的日志输出pattern:console: %d %-5level %logger : %msg%nfile: %d %-5level [%thread] %logger : %msg%nfile:name: d://spring/log
说明: password即授权码, username和from要一致
2、表结构
CREATE TABLE msg_log (msg_id varchar(255) NOT NULL DEFAULT COMMENT 消息唯一标识,msg text COMMENT 消息体, json格式化,exchange varchar(255) NOT NULL DEFAULT COMMENT 交换机,routing_key varchar(255) NOT NULL DEFAULT COMMENT 路由键,status int(11) NOT NULL DEFAULT 0 COMMENT 状态: 0投递中 1投递成功 2投递失败 3已消费,try_count int(11) NOT NULL DEFAULT 0 COMMENT 重试次数,next_try_time datetime DEFAULT NULL COMMENT 下一次重试时间,create_time datetime DEFAULT NULL COMMENT 创建时间,update_time datetime DEFAULT NULL COMMENT 更新时间,PRIMARY KEY (msg_id),UNIQUE KEY unq_msg_id (msg_id) USING BTREE
) ENGINEInnoDB DEFAULT CHARSETutf8mb4 COMMENT消息投递日志;select * from msg_log t order by t.create_time desc;
说明: exchange routing_key字段是在定时任务重新投递消息时需要用到的
后面会用到的sql(设置时区使用)
#查询需要定时任务处理的数据
select msg_id, msg, exchange, routing_key, status, try_count,
next_try_time, create_time, update_time,SYSDATE(), now() from msg_log where status 0 and next_try_time now() #设置时区
SELECT global.time_zone;
SET GLOBAL time_zone Asia/Shanghai;
3、启动类、服务接口、服务接口实现类
启动类ProviderAndConsumerApplication
package com.atguigu.gulimall.providerconsumer;import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.scheduling.annotation.EnableScheduling;/*** MQ消息发送邮件功能实战博客地址https://blog.csdn.net/onceing/article/details/126407845*/EnableScheduling //设置能使用定时任务
EnableDiscoveryClient
SpringBootApplication
MapperScan(com.atguigu.gulimall.providerconsumer.mapper)
public class ProviderAndConsumerApplication {public static void main(String[] args) {SpringApplication.run(ProviderAndConsumerApplication.class, args);}}
4、TestController 向队列中入消息的入口 package com.atguigu.gulimall.providerconsumer.controller;import com.atguigu.gulimall.providerconsumer.common.ServerResponse;
import com.atguigu.gulimall.providerconsumer.pojo.Mail;
import com.atguigu.gulimall.providerconsumer.service.TestService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.Errors;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/**** 测试入库控制器类* author: jd* create: 2024-06-28*/RestController
RequestMapping(/test)
Slf4j
public class TestController {Autowiredprivate TestService testService;/*** 发送邮件* param mail 邮件对象* param errors JSR303验证结果错误对象 猜测是可以拿到验证的错误信息的用于返回校验的提示* return*/PostMapping(/send)public ServerResponse sendMail(RequestBody Validated Mail mail, Errors errors){if(errors.hasErrors()){String defaultMessage errors.getFieldError().getDefaultMessage();return ServerResponse.error(defaultMessage);}return testService.send(mail);}}
5、消息生产接口 TestService.java
package com.atguigu.gulimall.providerconsumer.service;import com.atguigu.gulimall.providerconsumer.common.ServerResponse;
import com.atguigu.gulimall.providerconsumer.pojo.Mail;/*** 消息生产接口*/
public interface TestService {ServerResponse testIdempotence();ServerResponse accessLimit();ServerResponse send(Mail mail);}
TestServiceImpl.java
package com.atguigu.gulimall.providerconsumer.service.impl;import com.atguigu.gulimall.providerconsumer.common.ResponseCode;
import com.atguigu.gulimall.providerconsumer.common.ServerResponse;
import com.atguigu.gulimall.providerconsumer.config.RabbitConfig;
import com.atguigu.gulimall.providerconsumer.mapper.MsgLogMapper;
import com.atguigu.gulimall.providerconsumer.mq.MessageHelper;
import com.atguigu.gulimall.providerconsumer.pojo.Mail;
import com.atguigu.gulimall.providerconsumer.pojo.MsgLog;
import com.atguigu.gulimall.providerconsumer.service.TestService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.UUID;/*** 消息生产接口实现类* author: jd* create: 2024-06-27*/
Service
Slf4j
public class TestServiceImpl implements TestService {Autowiredprivate MsgLogMapper msgLogMapper;Autowiredprivate RabbitTemplate rabbitTemplate;Overridepublic ServerResponse testIdempotence() {return ServerResponse.success(testIdempotence: success);}Overridepublic ServerResponse accessLimit() {return ServerResponse.success(accessLimit: success);}Overridepublic ServerResponse send(Mail mail) {// 1. 生产唯一业务标识String msgId String.valueOf(UUID.randomUUID()); //业务的唯一标识mail.setMsgId(msgId);//2.记录日志MsgLog msgLog new MsgLog(msgId, mail, RabbitConfig.MAIL_EXCHANGE_NAME, RabbitConfig.MAIL_ROUTING_KEY_NAME);msgLogMapper.insertMsgLog(msgLog);// 消息入库 先记录日志//3.真正发送消息到MQ中CorrelationData correlationData new CorrelationData(msgId);rabbitTemplate.convertAndSend(RabbitConfig.MAIL_EXCHANGE_NAME, RabbitConfig.MAIL_ROUTING_KEY_NAME,MessageHelper.objToMsg(mail), correlationData);// 发送消息log.info(消息已发送队列);//返回公共的响应结果return ServerResponse.success(ResponseCode.MAIL_SEND_SUCCESS.getMsg());}
}
MsgLogMapper.java
package com.atguigu.gulimall.providerconsumer.mapper;import com.atguigu.gulimall.providerconsumer.batch.BatchProcessMapper;
import com.atguigu.gulimall.providerconsumer.pojo.MsgLog;
import org.apache.ibatis.annotations.Mapper;import java.util.List;/*** 日志操作mapper接口*/
Mapper
public interface MsgLogMapper extends BatchProcessMapperMsgLog {/*** 记录消息日志* param msgLog*/void insertMsgLog(MsgLog msgLog);/*** 更新消息日志状态* param msgLog*/void updateStatus(MsgLog msgLog);/*** 查询超时消息* return*/ListMsgLog selectTimeoutMsg();/*** 更新尝试的次数* param msgLog*/void updateTryCount(MsgLog msgLog);/*** 通过主键筛选出消息日志对象* param msgId* return*/MsgLog selectByPrimaryKey(String msgId);}
MsgLogMapper.xml
?xml version1.0 encodingUTF-8?
!DOCTYPE mapper PUBLIC -//mybatis.org//DTD Mapper 3.0//EN http://mybatis.org/dtd/mybatis-3-mapper.dtd
mapper namespacecom.atguigu.gulimall.providerconsumer.mapper.MsgLogMapper resultMap idBaseResultMap typecom.atguigu.gulimall.providerconsumer.pojo.MsgLog id columnmsg_id propertymsgId jdbcTypeVARCHAR /result columnmsg propertymsg jdbcTypeVARCHAR /result columnexchange propertyexchange jdbcTypeVARCHAR /result columnrouting_key propertyroutingKey jdbcTypeVARCHAR /result columnstatus propertystatus jdbcTypeINTEGER /result columntry_count propertytryCount jdbcTypeINTEGER /result columnnext_try_time propertynextTryTime jdbcTypeTIMESTAMP /result columncreate_time propertycreateTime jdbcTypeTIMESTAMP /result columnupdate_time propertyupdateTime jdbcTypeTIMESTAMP //resultMapsql idBase_Column_List msg_id, msg, exchange, routing_key, status, try_count, next_try_time, create_time, update_time/sqlinsert idinsertMsgLog parameterTypecom.atguigu.gulimall.providerconsumer.pojo.MsgLogINSERT INTO msg_log(msg_id, msg, exchange, routing_key, status, try_count, next_try_time, create_time, update_time)VALUES (#{msgId}, #{msg}, #{exchange}, #{routingKey}, #{status}, #{tryCount}, #{nextTryTime}, #{createTime}, #{updateTime})/insertupdate idupdateStatus parameterTypecom.atguigu.gulimall.providerconsumer.pojo.MsgLogupdate msg_log set status #{status}, update_time now()where msg_id #{msgId}/updateselect idselectTimeoutMsg resultMapBaseResultMapselect include refidBase_Column_List/from msg_logwhere status 0and next_try_time lt; now()/selectupdate idupdateTryCountupdate msg_log set try_count try_count 1, next_try_time #{nextTryTime}, update_time now()where msg_id #{msgId}/updateselect idselectByPrimaryKey parameterTypejava.lang.String resultMapBaseResultMapselectinclude refidBase_Column_List /from msg_logwhere msg_id #{msgId,jdbcTypeVARCHAR}/select
/mapperMsgLogService.java
package com.atguigu.gulimall.providerconsumer.service;import com.atguigu.gulimall.providerconsumer.pojo.MsgLog;import java.util.Date;
import java.util.List;/*** 日志记录接口类*/
public interface MsgLogService {void updateStatus(String msgId, Integer status);MsgLog selectByMsgId(String msgId);ListMsgLog selectTimeoutMsg();void updateTryCount(String msgId, Date tryTime);
}
MsgLogServiceImpl.java 消息日志操作实现类
package com.atguigu.gulimall.providerconsumer.service.impl;import com.atguigu.gulimall.providerconsumer.mapper.MsgLogMapper;
import com.atguigu.gulimall.providerconsumer.pojo.MsgLog;
import com.atguigu.gulimall.providerconsumer.service.MsgLogService;
import com.atguigu.gulimall.providerconsumer.util.JodaTimeUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.Date;
import java.util.List;/*** 消息日志操作实现类* author: jd* create: 2024-06-27*/
Service
public class MsgLogServiceImpl implements MsgLogService {Autowiredprivate MsgLogMapper msgLogMapper;Overridepublic void updateStatus(String msgId, Integer status) {MsgLog msgLog new MsgLog();msgLog.setMsgId(msgId);msgLog.setStatus(status);msgLog.setUpdateTime(new Date());msgLogMapper.updateStatus(msgLog);}Overridepublic MsgLog selectByMsgId(String msgId) {return msgLogMapper.selectByPrimaryKey(msgId);}Overridepublic ListMsgLog selectTimeoutMsg() {return msgLogMapper.selectTimeoutMsg();}Overridepublic void updateTryCount(String msgId, Date tryTime) {//获取下一次重发发送时间上一次发送时间 加一分钟Date nextTryTime JodaTimeUtil.plusMinutes(tryTime, 1);//构建消息对象MsgLog msgLog new MsgLog();msgLog.setMsgId(msgId);msgLog.setNextTryTime(nextTryTime); //设置下一次消息重发时间msgLogMapper.updateTryCount(msgLog);}
}
通用BatchProcessMapper.java 所有的mapper可以继承的
package com.atguigu.gulimall.providerconsumer.batch;import java.util.List;/*** 通用manpper接口* param T*/
public interface BatchProcessMapperT {void batchInsert(ListT list);void batchUpdate(ListT list);
}
通用manpper接口实现类 MapperProxy
package com.atguigu.gulimall.providerconsumer.batch.mapperproxy;import com.atguigu.gulimall.providerconsumer.batch.BatchProcessMapper;
import com.google.common.collect.Lists;
import org.apache.commons.collections4.CollectionUtils;import java.util.List;import static com.atguigu.gulimall.providerconsumer.common.Constant.MAX_SIZE_PER_TIME;/*** 通用manpper接口实现类* author: jd* create: 2024-06-27*/
public class MapperProxyT implements BatchProcessMapperT {private BatchProcessMapper batchProcessMapper;public MapperProxy(BatchProcessMapper batchProcessMapper) {this.batchProcessMapper batchProcessMapper;}Overridepublic void batchInsert(ListT list) {if (CollectionUtils.isEmpty(list)) {return;}ListListT partition Lists.partition(list, MAX_SIZE_PER_TIME);for (ListT batchList : partition) {batchProcessMapper.batchInsert(batchList);}}Overridepublic void batchUpdate(ListT list) {if (CollectionUtils.isEmpty(list)) {return;}ListListT partition Lists.partition(list, MAX_SIZE_PER_TIME);for (ListT batchList : partition) {batchProcessMapper.batchUpdate(batchList);}}}
常量类 Constant.java
package com.atguigu.gulimall.providerconsumer.common;import java.util.Arrays;
import java.util.stream.Collectors;/*** 常量 、枚举类* author: jd* create: 2024-06-27*/
public class Constant {public static final int MAX_SIZE_PER_TIME 1000;public static final int INDEX_ZERO 0;public static final int INDEX_ONE 1;public static final int INDEX_TWO 2;public static final int INDEX_THREE 3;public static final int NUMBER_ZERO 0;public static final int NUMBER_ONE 1;public static final String COLON :;public static final String COMMA ,;public static final String DOUBLE_STRIGULA --;public static final String REPLACEMENT_TARGET -99999%;public static final String UNKNOWN_TYPE 未知类型;public interface Redis {String OK OK;// 过期时间, 60s, 一分钟Integer EXPIRE_TIME_MINUTE 60;// 过期时间, 一小时Integer EXPIRE_TIME_HOUR 60 * 60;// 过期时间, 一天Integer EXPIRE_TIME_DAY 60 * 60 * 24;String TOKEN_PREFIX token:;String MSG_CONSUMER_PREFIX consumer:;String ACCESS_LIMIT_PREFIX accessLimit:;String FUND_RANK fundRank;String FUND_LIST fundList;}public interface LogType {// 登录Integer LOGIN 1;// 登出Integer LOGOUT 2;}/*** 相较于生产者对消息的角度来设置的此项枚举值*/public interface MsgLogStatus {// 消息投递中Integer DELIVERING 0;// 投递成功Integer DELIVER_SUCCESS 1;// 投递失败Integer DELIVER_FAIL 2;// 已消费Integer CONSUMED_SUCCESS 3;}public enum CalculateTypeEnum {ADD(1, 加),SUBTRACT(2, 减),MULTIPLY(3, 乘),DIVIDE(4, 除);Integer type;String desc;CalculateTypeEnum(Integer type, String desc) {this.type type;this.desc desc;}public Integer getType() {return type;}public String getDesc() {return desc;}}public enum FundSortType {ASC(asc),DESC(desc),;private String type;FundSortType(String type) {this.type type;}public String getType() {return type;}}
}
公共服务响应包装类【这个一般的项目中都会用到这个公共的封装】ServerResponse.java
package com.atguigu.gulimall.providerconsumer.common;import com.fasterxml.jackson.annotation.JsonIgnore;
import jdk.nashorn.internal.ir.annotations.Ignore;import java.io.Serializable;/*** 公共服务响应包装类【这个一般的项目中都会用到这个公共的封装】* author: jd* create: 2024-06-27*/
public class ServerResponse implements Serializable {private static final long serialVersionUID 7498483649536881777L;private Integer status;private String msg;private Object data;public ServerResponse() {}public ServerResponse(Integer status, String msg, Object data) {this.status status;this.msg msg;this.data data;}/*** JsonIgnore注解在Java中主要用于处理JSON序列化和反序列化过程其具体作用如下** 忽略属性当在Java对象的某个属性或方法上使用JsonIgnore注解时该属性或方法对应的属性在序列化为JSON字符串时会被忽略同样地在将JSON字符串反序列化为Java对象时该属性或方法对应的属性也不会被解析。* 当用在属性上时表示忽略该属性的序列化和反序列化。* 当用在方法上时表示忽略该方法对应的属性的序列化和反序列化。* 保护敏感信息在实际应用中JsonIgnore注解可以用于隐藏一些敏感信息比如密码、token等确保这些信息不会被发送到客户端或存储在不安全的地方。* 减少数据大小通过忽略一些不必要的属性可以减少序列化后的JSON数据大小提高数据传输效率。* 解决循环引用问题当对象之间存在循环引用时使用JsonIgnore注解可以避免在序列化过程中出现无限递归的情况。* 提高程序的可维护性和安全性通过精确控制哪些属性参与序列化和反序列化可以使得程序更加健壮减少潜在的安全风险。* 需要注意的是JsonIgnore注解是Jackson库提供的因此需要确保项目中引入了Jackson库的相关依赖。同时在使用JsonIgnore注解时要确保被标记的属性或方法确实不需要参与序列化和反序列化否则可能会导致意外的结果。** 总之JsonIgnore注解在Java对象和JSON之间的转换过程中起到了非常重要的作用能够帮助我们更灵活地控制序列化和反序列化的行为。* return*/JsonIgnorepublic boolean isSuccess() {return this.status ResponseCode.SUCCESS.getCode();}public static ServerResponse success() {return new ServerResponse(ResponseCode.SUCCESS.getCode(), null, null);}public static ServerResponse success(String msg) {return new ServerResponse(ResponseCode.SUCCESS.getCode(), msg, null);}public static ServerResponse success(Object data) {return new ServerResponse(ResponseCode.SUCCESS.getCode(), null, data);}public static ServerResponse success(String msg, Object data) {return new ServerResponse(ResponseCode.SUCCESS.getCode(), msg, data);}public static ServerResponse error(String msg) {return new ServerResponse(ResponseCode.ERROR.getCode(), msg, null);}public static ServerResponse error(Object data) {return new ServerResponse(ResponseCode.ERROR.getCode(), null, data);}public static ServerResponse error(String msg, Object data) {return new ServerResponse(ResponseCode.ERROR.getCode(), msg, data);}public Integer getStatus() {return status;}public void setStatus(Integer status) {this.status status;}public String getMsg() {return msg;}public void setMsg(String msg) {this.msg msg;}public Object getData() {return data;}public void setData(Object data) {this.data data;}
}
服务响应状态码 大部分的服务中都会用到这个公共的状态码类 ResponseCode.java
package com.atguigu.gulimall.providerconsumer.common;/*** 服务响应状态码 大部分的服务中都会用到这个公共的状态码类*/
public enum ResponseCode {// 系统模块SUCCESS(0, 操作成功),ERROR(1, 操作失败),SERVER_ERROR(500, 服务器异常),// 通用模块 1xxxxILLEGAL_ARGUMENT(10000, 参数不合法),REPETITIVE_OPERATION(10001, 请勿重复操作),ACCESS_LIMIT(10002, 请求太频繁, 请稍后再试),MAIL_SEND_SUCCESS(10003, 邮件发送成功),// 用户模块 2xxxxNEED_LOGIN(20001, 登录失效),USERNAME_OR_PASSWORD_EMPTY(20002, 用户名或密码不能为空),USERNAME_OR_PASSWORD_WRONG(20003, 用户名或密码错误),USER_NOT_EXISTS(20004, 用户不存在),WRONG_PASSWORD(20005, 密码错误),;private Integer code;private String msg;ResponseCode(Integer code, String msg) {this.code code;this.msg msg;}public Integer getCode() {return code;}public void setCode(Integer code) {this.code code;}public String getMsg() {return msg;}public void setMsg(String msg) {this.msg msg;}
}
4、工具类
时间字符操作类 JodaTimeUtil.java
package com.atguigu.gulimall.providerconsumer.util;import com.alibaba.cloud.commons.lang.StringUtils;
import lombok.extern.slf4j.Slf4j;
import org.joda.time.DateTime;
import java.util.Date;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
/*** 时间字符操作类 JodaTimeUtil* author: jd* create: 2024-06-27*/
Slf4j
public class JodaTimeUtil {private static final String STANDARD_FORMAT yyyy-MM-dd HH:mm:ss;/*** date类型 - string类型** param date* return*/public static String dateToStr(Date date) {return dateToStr(date, STANDARD_FORMAT);}/*** date类型 - string类型** param date* param format 自定义日期格式* return*/public static String dateToStr(Date date, String format) {if (date null) {return null;}format StringUtils.isBlank(format) ? STANDARD_FORMAT : format;DateTime dateTime new DateTime(date);return dateTime.toString(format);}/*** string类型 - date类型** param timeStr* return*/public static Date strToDate(String timeStr) {return strToDate(timeStr, STANDARD_FORMAT);}/*** string类型 - date类型** param timeStr* param format 自定义日期格式* return*/public static Date strToDate(String timeStr, String format) {if (StringUtils.isBlank(timeStr)) {return null;}format StringUtils.isBlank(format) ? STANDARD_FORMAT : format;DateTimeFormatter dateTimeFormatter DateTimeFormat.forPattern(format);DateTime dateTime;try {dateTime dateTimeFormatter.parseDateTime(timeStr);} catch (Exception e) {log.error(strToDate error: timeStr: {}, timeStr, e);return null;}return dateTime.toDate();}/*** 判断date日期是否过期(与当前时刻比较)** param date* return*/public static Boolean isTimeExpired(Date date) {String timeStr dateToStr(date);return isBeforeNow(timeStr);}/*** 判断date日期是否过期(与当前时刻比较)** param timeStr* return*/public static Boolean isTimeExpired(String timeStr) {if (StringUtils.isBlank(timeStr)) {return true;}return isBeforeNow(timeStr);}/*** 判断timeStr是否在当前时刻之前** param timeStr* return*/private static Boolean isBeforeNow(String timeStr) {DateTimeFormatter format DateTimeFormat.forPattern(STANDARD_FORMAT);DateTime dateTime;try {dateTime DateTime.parse(timeStr, format);} catch (Exception e) {log.error(isBeforeNow error: timeStr: {}, timeStr, e);return null;}return dateTime.isBeforeNow();}/*** 日期加天数** param date* param days* return*/public static Date plusDays(Date date, int days) {return plusOrMinusDays(date, days, 0);}/*** 日期减天数** param date* param days* return*/public static Date minusDays(Date date, int days) {return plusOrMinusDays(date, days, 1);}/*** 加减天数** param date* param days* param type 0:加天数 1:减天数* return*/private static Date plusOrMinusDays(Date date, int days, Integer type) {if (null date) {return null;}DateTime dateTime new DateTime(date);if (type 0) {dateTime dateTime.plusDays(days);} else {dateTime dateTime.minusDays(days);}return dateTime.toDate();}/*** 日期加分钟** param date* param minutes* return*/public static Date plusMinutes(Date date, int minutes) {return plusOrMinusMinutes(date, minutes, 0);}/*** 日期减分钟** param date* param minutes* return*/public static Date minusMinutes(Date date, int minutes) {return plusOrMinusMinutes(date, minutes, 1);}/*** 加减分钟** param date* param minutes* param type 0:加分钟 1:减分钟* return*/private static Date plusOrMinusMinutes(Date date, int minutes, Integer type) {if (null date) {return null;}DateTime dateTime new DateTime(date);if (type 0) {dateTime dateTime.plusMinutes(minutes);} else {dateTime dateTime.minusMinutes(minutes);}return dateTime.toDate();}/*** 日期加月份** param date* param months* return*/public static Date plusMonths(Date date, int months) {return plusOrMinusMonths(date, months, 0);}/*** 日期减月份** param date* param months* return*/public static Date minusMonths(Date date, int months) {return plusOrMinusMonths(date, months, 1);}/*** 加减月份** param date* param months* param type 0:加月份 1:减月份* return*/private static Date plusOrMinusMonths(Date date, int months, Integer type) {if (null date) {return null;}DateTime dateTime new DateTime(date);if (type 0) {dateTime dateTime.plusMonths(months);} else {dateTime dateTime.minusMonths(months);}return dateTime.toDate();}/*** 判断target是否在开始和结束时间之间** param target* param startTime* param endTime* return*/public static Boolean isBetweenStartAndEndTime(Date target, Date startTime, Date endTime) {if (null target || null startTime || null endTime) {return false;}DateTime dateTime new DateTime(target);return dateTime.isAfter(startTime.getTime()) dateTime.isBefore(endTime.getTime());}
}
Object 和String互转类 JsonUtil
package com.atguigu.gulimall.providerconsumer.util;import com.alibaba.cloud.commons.lang.StringUtils;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import lombok.extern.slf4j.Slf4j;import java.text.SimpleDateFormat;/*** Object 和String互转类* author: jd* create: 2024-06-27*/
Slf4j
public class JsonUtil {private static ObjectMapper objectMapper new ObjectMapper();private static final String DATE_FORMAT yyyy-MM-dd HH:mm:ss;static {// 对象的所有字段全部列入objectMapper.setSerializationInclusion(JsonInclude.Include.ALWAYS);// 取消默认转换timestamps形式objectMapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);// 忽略空bean转json的错误objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);// 统一日期格式objectMapper.setDateFormat(new SimpleDateFormat(DATE_FORMAT));// 忽略在json字符串中存在, 但在java对象中不存在对应属性的情况, 防止错误objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);}/*** 将Object转化为String对象* param obj* param T* return*/public static T String objToStr(T obj) {if (null obj) {return null;}try {return obj instanceof String ? (String) obj : objectMapper.writeValueAsString(obj);} catch (Exception e) {log.warn(objToStr error: , e);return null;}}/*** 将字符串转化成Object对象* param str 待转的字符串* param clazz 类名* param T* return*/public static T T strToObj(String str, ClassT clazz) {if (StringUtils.isBlank(str) || null clazz) {return null;}try {return clazz.equals(String.class) ? (T) str : objectMapper.readValue(str, clazz);} catch (Exception e) {log.warn(strToObj error: , e);return null;}}public static T T strToObj(String str, TypeReferenceT typeReference) {if (StringUtils.isBlank(str) || null typeReference) {return null;}try {return (T) (typeReference.getType().equals(String.class) ? str : objectMapper.readValue(str, typeReference));} catch (Exception e) {log.error(strToObj error, e);return null;}}
}
发送邮件工具类 MailUtil.java
package com.atguigu.gulimall.providerconsumer.util;import com.atguigu.gulimall.providerconsumer.pojo.Mail;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.mail.MailException;
import org.springframework.mail.SimpleMailMessage;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.stereotype.Component;import javax.mail.internet.AddressException;
import javax.mail.internet.InternetAddress;/**** 发送邮件工具类* author: jd* create: 2024-06-27*/Component
Slf4j
public class MailUtil {Value(${spring.mail.from}) //这里从application.xml中拿不到配置信息所以从这里直接写死了private String from 15131650119163.com;Autowiredprivate JavaMailSender mailSender;public boolean send(Mail mail) throws AddressException {//模拟消费成功但是业务实际没成功此时会重新入队列不会造成消息丢失
// if(true){
// return false;
// }String to mail.getTo();// 目标邮箱String title mail.getTitle();// 邮件标题String content mail.getContent();// 邮件正文SimpleMailMessage message new SimpleMailMessage();message.setFrom(String.valueOf(new InternetAddress(from))); //设置发送人message.setTo(to); //设置目标账户message.setSubject(title); //设置邮件标题message.setText(content); //设置邮件内容try {log.info(开始发送邮件);mailSender.send(message);log.info(邮件发送成功);return true;} catch (MailException e) {log.error(邮件发送失败, to: {}, title: {}, to, title, e);return false;}}}
SpringBeanUtil.java 获取BeanSpring容器类
package com.atguigu.gulimall.providerconsumer.util;import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;/*** author: jd* create: 2024-06-27*/
Component
public class SpringBeanUtil implements ApplicationContextAware {private static ApplicationContext applicationContext;Overridepublic void setApplicationContext(ApplicationContext applicationContext)throws BeansException {SpringBeanUtil.applicationContext applicationContext;}/*** 通过名称在spring容器中获取对象** param beanName* return*/public static Object getBean(String beanName) {System.out.println(applicationContext);return applicationContext.getBean(beanName);}}
5、RabbitMQ消费者、生产者配置类
A、MQ生产者
TestController.java
package com.atguigu.gulimall.providerconsumer.service.impl;import com.atguigu.gulimall.providerconsumer.common.ResponseCode;
import com.atguigu.gulimall.providerconsumer.common.ServerResponse;
import com.atguigu.gulimall.providerconsumer.config.RabbitConfig;
import com.atguigu.gulimall.providerconsumer.mapper.MsgLogMapper;
import com.atguigu.gulimall.providerconsumer.mq.MessageHelper;
import com.atguigu.gulimall.providerconsumer.pojo.Mail;
import com.atguigu.gulimall.providerconsumer.pojo.MsgLog;
import com.atguigu.gulimall.providerconsumer.service.TestService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.UUID;/*** 消息生产接口实现类* author: jd* create: 2024-06-27*/
Service
Slf4j
public class TestServiceImpl implements TestService {Autowiredprivate MsgLogMapper msgLogMapper;Autowiredprivate RabbitTemplate rabbitTemplate;Overridepublic ServerResponse testIdempotence() {return ServerResponse.success(testIdempotence: success);}Overridepublic ServerResponse accessLimit() {return ServerResponse.success(accessLimit: success);}Overridepublic ServerResponse send(Mail mail) {// 1. 生产唯一业务标识String msgId String.valueOf(UUID.randomUUID()); //业务的唯一标识mail.setMsgId(msgId);//2.记录日志MsgLog msgLog new MsgLog(msgId, mail, RabbitConfig.MAIL_EXCHANGE_NAME, RabbitConfig.MAIL_ROUTING_KEY_NAME);msgLogMapper.insertMsgLog(msgLog);// 消息入库 先记录日志//3.真正发送消息到MQ中CorrelationData correlationData new CorrelationData(msgId);rabbitTemplate.convertAndSend(RabbitConfig.MAIL_EXCHANGE_NAME, RabbitConfig.MAIL_ROUTING_KEY_NAME,MessageHelper.objToMsg(mail), correlationData);// 发送消息log.info(消息已发送队列);//返回公共的响应结果return ServerResponse.success(ResponseCode.MAIL_SEND_SUCCESS.getMsg());}
}
队列 交换机配置用于消息生产者:RabbitConfig.java
package com.atguigu.gulimall.providerconsumer.config;import com.atguigu.gulimall.providerconsumer.common.Constant;
import com.atguigu.gulimall.providerconsumer.service.MsgLogService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;/**** 队列 交换机配置用于消息生产者* author: jd* create: 2024-06-27*/Slf4j
Component
Configuration
public class RabbitConfig {Autowiredprivate MsgLogService msgLogService;// 发送邮件public static final String MAIL_QUEUE_NAME mail.queue;public static final String MAIL_EXCHANGE_NAME mail.exchange;public static final String MAIL_ROUTING_KEY_NAME mail.routing.key;Beanpublic Queue mailQueue() {return new Queue(MAIL_QUEUE_NAME, true);}Beanpublic DirectExchange mailExchange() {return new DirectExchange(MAIL_EXCHANGE_NAME, true, false);}Beanpublic Binding mailBinding() {return BindingBuilder.bind(mailQueue()).to(mailExchange()).with(MAIL_ROUTING_KEY_NAME);}// Autowired
// private CachingConnectionFactory connectionFactory;// ConnectionFactory connectionFactory (ConnectionFactory) SpringBeanUtil.getBean(connectionFactory);/*** 设置生产者消息确认回调函数**/Beanpublic RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);rabbitTemplate.setMandatory(true);rabbitTemplate.setMessageConverter(converter());rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {log.info(消息成功发送到Exchange);String msgId correlationData.getId();msgLogService.updateStatus(msgId, Constant.MsgLogStatus.DELIVER_SUCCESS);} else {log.info(消息发送到Exchange失败, {}, cause: {}, correlationData, cause);}System.out.println(ConfirmCallback回调: 相关数据correlationData);System.out.println(ConfirmCallback回调: 确认情况ack);System.out.println(ConfirmCallback回调: 原因cause);}});rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {System.out.println(ReturnCallback回调: 消息returnedMessage.getMessage());System.out.println(ReturnCallback回调: 回应码returnedMessage.getReplyCode());System.out.println(ReturnCallback回调: 回应信息returnedMessage.getReplyText());System.out.println(ReturnCallback回调: 交换机returnedMessage.getExchange());System.out.println(ReturnCallback回调: 路由键returnedMessage.getRoutingKey());log.info(消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {},returnedMessage.getExchange(),returnedMessage.getRoutingKey(),returnedMessage.getReplyCode(),returnedMessage.getReplyText(),returnedMessage.getMessage());}});return rabbitTemplate;}Beanpublic Jackson2JsonMessageConverter converter() {return new Jackson2JsonMessageConverter();}}
B、MQ 消费者 其实就完成了3件事: 1.保证消费幂等性, 2.发送邮件, 3.更新消息状态, 手动ack
package com.atguigu.gulimall.providerconsumer.mq.consumer;import com.atguigu.gulimall.providerconsumer.common.Constant;
import com.atguigu.gulimall.providerconsumer.config.RabbitConfig;
import com.atguigu.gulimall.providerconsumer.mq.MessageHelper;
import com.atguigu.gulimall.providerconsumer.pojo.Mail;
import com.atguigu.gulimall.providerconsumer.pojo.MsgLog;
import com.atguigu.gulimall.providerconsumer.service.MsgLogService;
import com.atguigu.gulimall.providerconsumer.util.JsonUtil;
import com.atguigu.gulimall.providerconsumer.util.MailUtil;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.mail.internet.AddressException;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;/*** MQ 监听者操作业务发送邮件* 其实就完成了3件事:* 1.保证消费幂等性, 2.发送邮件, 3.更新消息状态, 手动ack* author: jd* create: 2024-06-27*/
Component
Slf4j
RabbitListener(queues RabbitConfig.MAIL_QUEUE_NAME) //指定监听队列
public class MailConsumer {Autowiredprivate MsgLogService msgLogService;Autowiredprivate MailUtil mailUtil;RabbitHandler(isDefault true) //指定监听后的处理动作public void consume(Message message, Channel channel) throws IOException, AddressException {//将Message中的业务数据转化成Mail对象Mail mail MessageHelper.msgToObj(message, Mail.class);log.info(消费者收到消息: {}, mail.toString());log.debug(测试debug和info有什么区别);//根据ID查询Msg对象String msgId mail.getMsgId();MsgLog msgLog msgLogService.selectByMsgId(msgId);// 消费幂等性if (null msgLog || msgLog.getStatus().equals(Constant.MsgLogStatus.CONSUMED_SUCCESS)) {log.info(消费者重复消费,此时不进行消费 ,msgId: {}, msgId);//直接终止程序运行程序返回return;}//拿到MQ中的每一条消息的唯一标识TagMessageProperties properties message.getMessageProperties();long tag properties.getDeliveryTag();//业务操作发送邮件log.info(准备发送邮件);boolean send mailUtil.send(mail);
//try {//如果发送邮件成功则修改消息状态为 已消费if(send){//发送成功后更新消息日志表的消息记录状态msgLogService.updateStatus(msgId, Constant.MsgLogStatus.CONSUMED_SUCCESS);//取得进程IDThread t Thread.currentThread();log.info(【消息队列】current request consumer success, request info: {}; thread info: {};, JsonUtil.objToStr(mail), t);// 消费确认设置反馈给MQchannel.basicAck(tag, false);}else {log.error(【消息队列】consumer failed,, msg info: {}, JsonUtil.objToStr(mail));channel.basicNack(tag, false, true); //这样会告诉rabbitmq该消息消费失败, 需要重新入队, 可以重新投递到其他正常的消费端进行消费, 从而保证消息不被丢失}} catch (Exception e) {//产生异常之后则不消费直接拒绝此消息不进行消费这样会导致这条失败的消息会一直存在队列里面然后定时任务过一会在数据库中扫到这个信息之后会再去MQ中拿这个消息进行消费e.printStackTrace();ByteArrayOutputStream bass new ByteArrayOutputStream();e.printStackTrace(new PrintStream(bass));log.error(【消息队列】consumer error, error info: {}, msg info: {}, bass, JsonUtil.objToStr(mail));channel.basicNack(tag, false, true);}}}
6、定时任务重发 ResendMsg.java (说明: 每一条消息都和exchange routingKey绑定, 所有消息重投共用这一个定时任务即可)
package com.atguigu.gulimall.providerconsumer.task;import com.atguigu.gulimall.providerconsumer.common.Constant;
import com.atguigu.gulimall.providerconsumer.config.RabbitConfig;
import com.atguigu.gulimall.providerconsumer.mq.MessageHelper;
import com.atguigu.gulimall.providerconsumer.pojo.MsgLog;
import com.atguigu.gulimall.providerconsumer.service.MsgLogService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Correlation;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;import java.util.List;/*** 消息重发定时任务* author: jd* create: 2024-06-28*/
Component
Slf4j
public class ResendMsg {Autowiredprivate RabbitTemplate rabbitTemplate;// 最大投递次数。第四次投递失败private static final int MAX_TRY_COUNT 3;Autowiredprivate MsgLogService msgLogService;/*** 每30s拉取投递失败的消息, 重新投递*/Scheduled(cron 0/30 * * * * ?)public void reSend(){log.info(开始执行定时任务(重新投递消息));ListMsgLog msgLogs msgLogService.selectTimeoutMsg(); //查询还在投递中的消息msgLogs.forEach(msgLog-{String msgId msgLog.getMsgId();//超过投递次数则不会重新投递中的消息是否需要投递if(msgLog.getTryCount()MAX_TRY_COUNT){//不需要重新投递msgLogService.updateStatus(msgId, Constant.MsgLogStatus.DELIVER_FAIL);log.info(消息ID {}超过最大的投递次数 {} 次投递失败需要人工查看,msgId,MAX_TRY_COUNT);}else {//拿到消息在表中的本次重试时间去获取下一次重试时间 同时 投递次数1msgLogService.updateTryCount(msgId,msgLog.getNextTryTime());CorrelationData correlationData new CorrelationData(msgId);//携带业务信息作为业务的唯一标识//重新发送消息到MQ让MQ去重新尝试消费这一条之前没有发送到MQ的消息因为我们现在查的消息的状态是status 0 的代表是消息还是投递中的没有变成投递成功的消息肯定是投递有问题rabbitTemplate.convertAndSend(RabbitConfig.MAIL_EXCHANGE_NAME,RabbitConfig.MAIL_ROUTING_KEY_NAME, //每一条消息都和exchange routingKey绑定, 所有消息重投共用这一个定时任务即可MessageHelper.objToMsg(msgLog),correlationData);log.info(第 (msgLog.getTryCount() 1) 次重新投递消息);}});log.info(定时任务执行结束(重新投递消息)); //}}
四、基本测试
OK, 目前为止, 代码准备就绪, 现在进行正常流程的测试 1.发送请求: 后台日志: 3.库消息记录: 状态为3, 表明已消费, 消息重试次数为0, 表明一次投递就成功了此时就可以到目标邮箱中去查看是否接收到了这个邮件
五、异常情况测试
1.验证消息发送到Exchange失败情况下的回调, 对应上图P - X
如何验证? 可以随便指定一个不存在的交换机名称, 请求接口, 看是否会触发回调 发送失败, 原因: reply-code404, reply-textNOT_FOUND - no exchange ‘mail.exchangeabcd’ in vhost ‘/’, 该回调能够保证消息正确发送到Exchange, 测试完成
2.验证消息从Exchange路由到Queue失败情况下的回调, 对应上图X - Q 同理, 修改一下路由键为不存在的即可, 路由失败, 触发回调 发送失败, 原因: route: mail.routing.keyabcd, replyCode: 312, replyText: NO_ROUTE
3.验证在手动ack模式下, 消费端必须进行手动确认(ack), 否则消息会一直保存在队列中, 直到被消费, 对应上图Q - C 将消费端代码channel.basicAck(tag, false);// 消费确认注释掉, 查看控制台和rabbitmq管控台 可以看到, 虽然消息确实被消费了, 但是由于是手动确认模式, 而最后又没手动确认, 所以, 消息仍被rabbitmq保存, 所以, 手动ack能够保证消息一定被消费, 但一定要记得basicAck
4.验证消费端幂等性 接着上一步, 去掉注释, 重启服务器, 由于有一条未被ack的消息, 所以重启后监听到消息, 进行消费, 但是由于消费前会判断该消息的状态是否未被消费, 发现status3, 即已消费, 所以, 直接return, 这样就保证了消费端的幂等性, 即使由于网络等原因投递成功而未触发回调, 从而多次投递, 也不会重复消费进而发生业务异常
5.验证消费端发生异常消息也不会丢失 很显然, 消费端代码可能发生异常, 如果不做处理, 业务没正确执行, 消息却不见了, 给我们感觉就是消息丢失了, 由于我们消费端代码做了异常捕获, 业务异常时, 会触发: channel.basicNack(tag, false, true);, 这样会告诉rabbitmq该消息消费失败, 需要重新入队, 可以重新投递到其他正常的消费端进行消费, 从而保证消息不被丢失 测试: send方法直接返回false即可(这里跟抛出异常一个意思)因为我们向MQ插入了消息但是实际业务消费了但是发送邮件返回了false这样会从新投递到MQ队列中再进行消费一直重复。 代码修改 结果
可以看到, 由于channel.basicNack(tag, false, true), 未被ack的消息(unacked)会重新入队并被消费, 这样就保证了消息不会走丢
6.验证定时任务的消息重投 实际应用场景中, 可能由于网络原因, 或者消息未被持久化MQ就宕机了, 使得投递确认的回调方法ConfirmCallback没有被执行, 从而导致数据库该消息状态一直是投递中的状态, 此时就需要进行消息重投, 即使也许消息已经被消费了 定时任务只是保证消息100%投递成功, 而多次投递的消费幂等性需要消费端自己保证 我们可以将回调和消费成功后更新消息状态的代码注释掉, 开启定时任务, 查看是否重投
这是没有异常信息的情况下定时任务每次都不会做实际的业务 当我们对一条消息进行了实际的业务处理而且也业务处理成功了只是没有把状态修改成成功这样定时任务会扫重新入队列但是有幂等性校验所以一直发送到队列将这条信息直到3次后消息会被更新为发送失败 发送邮件其实很简单, 但深究起来其实有很多需要注意和完善的点, 一个看似很小的知识点, 也可以引申出很多问题, 甚至涉及到方方面面, 这些都需要自己踩坑, 当然我这代码肯定还有很多不完善和需要优化的点, 希望小伙伴多多提意见和建议 我的代码都是经过自测验证过的, 图也都是一点一点自己画的或认真截的, 希望小伙伴能学到一点东西, 路过的点个赞或点个关注呗, 谢谢
部分参考springboot rabbitmq发送邮件实战(保证消息100%投递成功并被消费)