威海网站建设在哪,wordpress 多重排序,百度做广告费用,使用三剑客做网站需求分析
本案例是通过一个发送短信验证码的功能来实验MQ发送消息时实现分布式事务#xff0c;思路分析如下 消息生产者生产发送验证码的半消息 生产者执行本地事务#xff08;将验证码保存到数据库#xff09;#xff0c;并记录事务的ID#xff0c;如果整个过程不出现异…
需求分析
本案例是通过一个发送短信验证码的功能来实验MQ发送消息时实现分布式事务思路分析如下 消息生产者生产发送验证码的半消息 生产者执行本地事务将验证码保存到数据库并记录事务的ID如果整个过程不出现异常则提交事务消息成功投递否则进行事务的回滚操作 MQ二次确认消息是否成功投递如果没成功发生了异常则丢弃消息
需求实现
一、创建项目
创建一个主工程stream-mq-demo目的是维护项目的版本号、一些必要的类库、集成SpringCloudAlibaba子工程producer目的是生产发送验证码的消息及使用事务将验证码保存到数据库子工程consumer目的是消费消息
二、主工程
2.1、pom.xml
目的是维护项目的版本号、一些必要的类库、以及集成SpringCloudAlibaba
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersionparentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion2.1.5.RELEASE/version/parentgroupIdorg.example/groupIdartifactIdstream-mq-demo/artifactIdpackagingpom/packagingversion1.0-SNAPSHOT/versionmodulesmoduleproducer/modulemoduleconsumer/module/modulespropertiesspring-cloud.versionGreenwich.SR1/spring-cloud.versionspring-cloud-alibaba.version0.9.0.RELEASE/spring-cloud-alibaba.versionjava.version1.8/java.versionlombok.version1.18.8/lombok.versionrocketmq.version2.0.3/rocketmq.versionmybatis.plus.version3.5.1/mybatis.plus.versionmysql.version8.0.32/mysql.version/propertiesdependencies!-- RocketMQ坐标 --dependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-spring-boot-starter/artifactIdversion${rocketmq.version}/version/dependency!-- SpringCloudStream坐标 --dependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-starter-stream-rocketmq/artifactId/dependency!-- SpringWeb坐标 --dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependency!-- lombok坐标 --dependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdversion${lombok.version}/version/dependency!-- test --dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependency/dependenciesdependencyManagementdependencies!--整合spring cloud--dependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-dependencies/artifactIdversion${spring-cloud.version}/versiontypepom/typescopeimport/scope/dependency!--整合spring cloud alibaba--dependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-alibaba-dependencies/artifactIdversion${spring-cloud-alibaba.version}/versiontypepom/typescopeimport/scope/dependency/dependencies/dependencyManagement
/project三、Producer子工程
3.1、pom.xml
添加MyBatisPlus、MySQL、FastJSON类库
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdparentartifactIdstream-mq-demo/artifactIdgroupIdorg.example/groupIdversion1.0-SNAPSHOT/version/parentmodelVersion4.0.0/modelVersionartifactIdproducer/artifactIddependencies!-- mybatis-plus --dependencygroupIdcom.baomidou/groupIdartifactIdmybatis-plus-boot-starter/artifactIdversion${mybatis.plus.version}/version/dependency!-- mysql-connector --dependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion${mysql.version}/version/dependency!-- fastjson --dependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion2.0.12/version/dependency/dependencies
/project3.2、application.yml
配置数据源application_druid.yml配置端口号为8081配置MQ的name-server地址配置SpringCloudStream的消费者模式并开启事务配置MQ的topic
数据源application_druid.yml
spring:datasource:type: com.zaxxer.hikari.HikariDataSourcedriver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://192.168.0.3:3306/mq_demo?useUnicodetruecharacterEncodingutf8zeroDateTimeBehaviorconvertToNulluseSSLfalseserverTimezoneGMT%2B8allowPublicKeyRetrievaltrueusername: rootpassword: Aa123123.jackson:date-format: yyyy-MM-dd HH:mm:ss
mybatis-plus:type-aliases-package: demo.entityconfiguration:log-impl: org.apache.ibatis.logging.stdout.StdOutImplglobal-config:db-config:table-prefix: t_主配置文件application.yml
spring:profiles:include: druidcloud:stream:rocketmq:binder:name-server: 192.168.0.3:9876bindings:#消费者output:producer:#事务消息transactional: true#与AddBonusTransactionListener类中RocketMQTransactionListener一致group: tx-captcha-groupbindings:output:#用来指定topic要和content-center微服务的topic匹配destination: captcha-topic
server:port: 80813.3、启动类
使用EnableBinding(Source.class)定义消息的推送管道
Source.class源代码
public interface Source {String OUTPUT output;Output(output)MessageChannel output();
}application.yml中配置的output属性 启动类
package demo;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;SpringBootApplication
EnableBinding(Source.class)
public class ProducerApplication {public static void main(String[] args) {SpringApplication.run(ProducerApplication.class, args);}
}3.4、必要的实体类
验证码类
package demo.entity;import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
import lombok.Builder;
import lombok.Data;import java.util.Date;Data
Builder
public class Captcha {JsonSerialize(using ToStringSerializer.class)TableId(type IdType.AUTO)private Integer id;private String captcha;private String phone;private Date publishTime;
}事务日志类
package demo.entity;import lombok.Builder;
import lombok.Data;import java.util.Date;Data
Builder
public class TransactionLog {private String transactionId;private Date createTime;private String log;
}3.5、本地事务类
发送半消息保存验证码到数据库并记录日志
package demo.service;import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import demo.entity.Captcha;
import demo.entity.TransactionLog;
import demo.mapper.CaptchaMapper;
import demo.mapper.TransactionLogMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;Slf4j
Service
RequiredArgsConstructor(onConstructor __(Autowired))
public class SendCaptchaService {private final Source source;private final TransactionLogMapper transactionLogMapper;private final CaptchaMapper boundMapper;/** 发送半消息*/public void sendCaptchaMsg(Captcha captcha){// 发送半消息。。String transactionId UUID.randomUUID().toString();MapString, Object msg new HashMap();msg.put(phone, captcha.getPhone());msg.put(captcha, captcha.getCaptcha());this.source.output().send(MessageBuilder.withPayload(msg)// header也有妙用....setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId).setHeader(dto, JSON.toJSONString(captcha)).build());}/**添加验证码到数据库并记录事务日志*/Transactional(rollbackFor Exception.class)public void addBoundWithRocketMqLog(Captcha captcha, String transactionId) {//执行本地事务this.addBound(captcha);//记录MQ事务日志transactionLogMapper.insert(TransactionLog.builder().transactionId(transactionId).createTime(new Date()).log(发送短信验证码).build());}/**将验证码保存到数据库*/Transactional(rollbackFor Exception.class)public void addBound(Captcha captcha){captcha.setPublishTime(new Date());boundMapper.insert(captcha);}
}3.6、MQ事务类
MQ事务类实现RocketMQLocalTransactionListener接口
重写用于执行本地事务的方法executeLocalTransaction在该方法中执行本地事务类的保存验证码到数据库并记录日志的方法addBoundWithRocketMqLog重写本地事务的检查接口检查本地事务是否执行成功即MQ没有收到执行本地事务后的二次确认checkLocalTransaction在该方法中去查询事务日志表t_transaction_log是否存在相同事务ID的日志如果不存在则将消息丢弃否则标记为成功投递
package demo.mq;import com.alibaba.fastjson.JSON;
import demo.entity.Captcha;
import demo.entity.TransactionLog;
import demo.mapper.TransactionLogMapper;
import demo.service.SendCaptchaService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;RocketMQTransactionListener(txProducerGroup tx-captcha-group)
RequiredArgsConstructor(onConstructor __(Autowired))
Slf4j
public class SendCaptchaTransactionListener implements RocketMQLocalTransactionListener {private final SendCaptchaService addBoundService;private final TransactionLogMapper transactionLogMapper;/** 用于执行本地事务的方法*/Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {MessageHeaders headers msg.getHeaders();String transactionId (String) headers.get(RocketMQHeaders.TRANSACTION_ID);String dtoString (String) headers.get(dto);Captcha bound JSON.parseObject(dtoString, Captcha.class);//本地事务service层用Transaction标注的方法成功就提交本地事务失败就回滚try {//执行本地事务addBoundService.addBoundWithRocketMqLog(bound, transactionId);return RocketMQLocalTransactionState.COMMIT; //本地事务执行成功就提交MQ} catch (Exception e) {return RocketMQLocalTransactionState.ROLLBACK; //本地事务执行失败就回滚MQ}}/** 本地事务的检查接口检查本地事务是否执行成功即MQ没有收到执行本地事务后的二次确认*/Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {MessageHeaders headers msg.getHeaders();String transactionId (String) headers.get(RocketMQHeaders.TRANSACTION_ID);log.info(MQ二次事务检查transactionID{}, transactionId);// 从MQ事务日志表里查看看对应的事务ID是否存在记录如果存在则表示成功COMMIT)否则表示执行本地事务失败(ROLLBACK)TransactionLog transactionLog transactionLogMapper.selectById(transactionId);if (transactionLog ! null) {return RocketMQLocalTransactionState.COMMIT;} else {return RocketMQLocalTransactionState.ROLLBACK;}}
}3.7、测试
使用单元测试创建测试方法调用本地事务类发送半消息
package demo;import demo.entity.Captcha;
import demo.service.SendCaptchaService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;SpringBootTest(classes {ProducerApplication.class})
RunWith(SpringJUnit4ClassRunner.class)
public class TestSendCaptcha {Autowiredprivate SendCaptchaService sendCaptchaMsg;Testpublic void test(){//随机生成一个4位的验证码String code ;for(int i0; i4; i){code (int)(Math.random()*10);}//发送半消息sendCaptchaMsg.sendCaptchaMsg(Captcha.builder().captcha(code).phone(13843188848).build());}
}运行单元测试方法之后浏览器访问MQ-Dashboard可以看到topic已经被创建 在Message中可以看到刚刚发送的消息 消息详情 数据库验证码表t_captcha插入了数据 数据库事务日志表t_transaction_log插入了数据 Tip可以在本地事务中模拟一个运行时异常可以发现事务日志表中并无法插入日志在MQ事务二次确认消息的时候会讲消息丢弃 四、Consumer子工程
4.1、application.yml
配置端口号为8082配置MQ的name-server地址配置MQ的topic配置group如果使用的消息队列是RocketMQ则该属性务必配置内容可以是任意字符串
spring:cloud:stream:rocketmq:binder:name-server: 192.168.0.3:9876bindings:#消息消费者input:#用来指定topic要和消息生产者的的topic匹配destination: captcha-topic#一定要设置必填项如果用其他MQ该属性可以不设置group: test
server:port: 80824.2、启动类
使用EnableBinding(Sink.class)定义消息的推送管道
Sink.class源代码
public interface Sink {String INPUT input;Input(input)SubscribableChannel input();
}application.yml中配置的input属性 使用StreamListener(Sink.INPUT)注解监听消息使用StreamListener(“errorChannel”)统一处理MQ的异常
启动类
package demo;import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.ErrorMessage;import java.util.HashMap;Slf4j
SpringBootApplication
EnableBinding(Sink.class)
public class ConsumerApplication {/*** 消费消息监听器** param message*/StreamListener(Sink.INPUT)public void receive(HashMapString, Object message) {log.info(消费消息{}, message);}/*** 全局异常处理** param message 发生异常的消息*/StreamListener(errorChannel)public void error(Message? message) {ErrorMessage errorMessage (ErrorMessage) message;log.warn(RocketMQ-SpringCloudStream发生异常errorMessage{}, errorMessage);}public static void main(String[] args) {SpringApplication.run(ConsumerApplication.class, args);}
}4.3、测试
再次执行Producer子工程单元测试方法发送半消息发现在Consumer子工程中成功监听到了消息 4.4、消息过滤器
在StreamListener注解中可以使用condition属性来定义要匹配过滤的消息将消费者改造一下只接收手机号为13843188848的消息 Tip该方式只支持RoketMQ不支持Kafka/RabbitMQ /*** 消费消息监听器** condition的作用是消息过滤当前案例是匹配消息中header属性phone的值为13843188848的消息*/StreamListener(value Sink.INPUT, condition headers[phone]13843188848)public void receive(HashMapString, Object message) {log.info(消费消息{}, message);}如果不满足匹配条件将会有提示 但是消息已经成功发送