申请关闭网站,黄骅市市长,江阴哪里有做网站推广,杭州有名的室内设计公司基于消息队列实现分布式事务#xff0c;实现消息最终一致性
如何基于消息队列实现分布式事务#xff1f;
通过消息队列实现分布式事务的话#xff0c;可以保证当前数据的最终一致性。实现思路#xff1a;将大的分布式事务#xff0c;进行拆分#xff0c;拆分成若干个小…基于消息队列实现分布式事务实现消息最终一致性
如何基于消息队列实现分布式事务
通过消息队列实现分布式事务的话可以保证当前数据的最终一致性。实现思路将大的分布式事务进行拆分拆分成若干个小的本地事务只要每一个小的本地事务是执行成功的那就代表当前的分布式事务是执行成功的。下面以用户下单增加积分来进行消息队列实现分布式事务的操作。
功能实现流程图如下 主要包含三个角色订单服务消息队列和用户服务。 步骤1-4订单服务 1.首先在订单服务中会生成订单相关数据并添加到数据库中。 2.接着会在订单数据库中创建任务表 会向任务表添加数据UsernameorderIDpoint积分
3.设置定时任务扫描任务表获取相关数据如每隔7S扫描一次 4.发送任务数据到MQ
步骤5-12用户服务 5.6.7.用户服务会接收消息保证消息的不可重复消费判断在Redis和 在数据库中是否存在。防止重复处理不要重复添加积分。 用户服务正在执行消息 如果不存在继续业务功能。 添加本地事务 12.通知订单服务
步骤13订单服务 13.删除任务表数据。
一、 准备工作
1.1 shangcheng_order库新增数据表
任务表
DROP TABLE IF EXISTS tb_task;
CREATE TABLE tb_task (id bigint(32) NOT NULL AUTO_INCREMENT COMMENT 任务id,create_time datetime DEFAULT NULL,update_time datetime DEFAULT NULL,delete_time datetime DEFAULT NULL,task_type varchar(32) DEFAULT NULL COMMENT 任务类型,mq_exchange varchar(64) DEFAULT NULL COMMENT 交换机名称,mq_routingkey varchar(64) DEFAULT NULL COMMENT routingkey,request_body varchar(512) DEFAULT NULL COMMENT 任务请求的内容,status varchar(32) DEFAULT NULL COMMENT 任务状态,errormsg varchar(512) DEFAULT NULL COMMENT 任务错误信息,PRIMARY KEY (id)
) ENGINEInnoDB DEFAULT CHARSETutf8;历史任务表
DROP TABLE IF EXISTS tb_task_his;
CREATE TABLE tb_task_his (id bigint(32) NOT NULL AUTO_INCREMENT COMMENT 任务id,create_time datetime DEFAULT NULL,update_time datetime DEFAULT NULL,delete_time datetime DEFAULT NULL,task_type varchar(32) DEFAULT NULL COMMENT 任务类型,mq_exchange varchar(64) DEFAULT NULL COMMENT 交换机名称,mq_routingkey varchar(64) DEFAULT NULL COMMENT routingkey,request_body varchar(512) DEFAULT NULL COMMENT 任务请求的内容,status varchar(32) DEFAULT NULL COMMENT 任务状态,errormsg varchar(512) DEFAULT NULL,PRIMARY KEY (id)
) ENGINEInnoDB AUTO_INCREMENT9 DEFAULT CHARSETutf8;1.2 shangcheng_service_order_api添加相关实体类
Table(nametb_task)
public class Task {
Idprivate Long id;
Column(name create_time)private Date createTime;
Column(name update_time)private Date updateTime;
Column(name delete_time)private Date deleteTime;
Column(name task_type)private String taskType;
Column(name mq_exchange)private String mqExchange;
Column(name mq_routingkey)private String mqRoutingkey;
Column(name request_body)private String requestBody;
Column(name status)private String status;
Column(name errormsg)private String errormsg;//gettersetter略
}Table(nametb_task_his)
public class TaskHis {
Idprivate Long id;
Column(name create_time)private Date createTime;
Column(name update_time)private Date updateTime;
Column(name delete_time)private Date deleteTime;
Column(name task_type)private String taskType;
Column(name mq_exchange)private String mqExchange;
Column(name mq_routingkey)private String mqRoutingkey;
Column(name request_body)private String requestBody;
Column(name status)private String status;
Column(name errormsg)private String errormsg;
//gettersetter略
}1.3 shangcheng_user新增积分日志表
DROP TABLE IF EXISTS tb_point_log;
CREATE TABLE tb_point_log (order_id varchar(200) NOT NULL,user_id varchar(200) NOT NULL,point int(11) NOT NULL,PRIMARY KEY (order_id)
) ENGINEInnoDB DEFAULT CHARSETutf8;
1.4 shangcheng_service_user_api添加实体类 PointLog
Table(nametb_point_log)
public class PointLog {
private String orderId;private String userId;private Integer point;//gettersetter略
}1.5 shangcheng_service_order添加rabbitMQ配置类
Configuration
public class RabbitMQConfig {//添加积分任务交换机public static final String EX_BUYING_ADDPOINTUSER ex_buying_addpointuser;
//添加积分消息队列public static final String CG_BUYING_ADDPOINT cg_buying_addpoint;
//完成添加积分消息队列public static final String CG_BUYING_FINISHADDPOINT cg_buying_finishaddpoint;
//添加积分路由keypublic static final String CG_BUYING_ADDPOINT_KEY addpoint;
//完成添加积分路由keypublic static final String CG_BUYING_FINISHADDPOINT_KEY finishaddpoint;
/*** 交换机配置* return the exchange*/Bean(EX_BUYING_ADDPOINTUSER)public Exchange EX_BUYING_ADDPOINTUSER() {return ExchangeBuilder.directExchange(EX_BUYING_ADDPOINTUSER).durable(true).build();}//声明队列Bean(CG_BUYING_FINISHADDPOINT)public Queue QUEUE_CG_BUYING_FINISHADDPOINT() {Queue queue new Queue(CG_BUYING_FINISHADDPOINT);return queue;}//声明队列Bean(CG_BUYING_ADDPOINT)public Queue QUEUE_CG_BUYING_ADDPOINT() {Queue queue new Queue(CG_BUYING_ADDPOINT);return queue;}/*** 绑定队列到交换机 .* param queue the queue* param exchange the exchange* return the binding*/Beanpublic Binding BINDING_QUEUE_FINISHADDPOINT(Qualifier(CG_BUYING_FINISHADDPOINT) Queue queue, Qualifier(EX_BUYING_ADDPOINTUSER) Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(CG_BUYING_FINISHADDPOINT_KEY).noargs();}Beanpublic Binding BINDING_QUEUE_ADDPOINT(Qualifier(CG_BUYING_ADDPOINT) Queue queue, Qualifier(EX_BUYING_ADDPOINTUSER) Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(CG_BUYING_ADDPOINT_KEY).noargs();}
}
二、 订单服务添加任务并发送
步骤1 修改添加订单方法 当添加订单的时候添加任务表中相关数据, 代码如下
//增加任务表记录
Task task new Task();
task.setCreateTime(new Date());
task.setUpdateTime(new Date());
task.setMqExchange(RabbitMQConfig.EX_BUYING_ADDPOINTURSE);
task.setMqRoutingkey(RabbitMQConfig.CG_BUYING_ADDPOINT_KEY);
Map map new HashMap();
map.put(userName,order.getUsername());
map.put(orderId,order.getId());
map.put(point,order.getPayMoney());
task.setRequestBody(JSON.toJSONString(map));
taskMapper.insertSelective(task);步骤2 定时扫描任务表最新数据 订单服务新增定时任务类获取小于系统当前时间的所有任务数据
1 修改订单服务启动类添加开启定时任务注解
EnableScheduling2 定义定时任务类 查询最新数据 更新taskMapper新增方法查询所有小于系统当前时间的数据
public interface TaskMapper extends MapperTask {
Select(SELECT * from tb_task WHERE update_time#{currentTime})Results({Result(column create_time,property createTime),Result(column update_time,property updateTime),Result(column delete_time,property deleteTime),Result(column task_type,property taskType),Result(column mq_exchange,property mqExchange),Result(column mq_routingkey,property mqRoutingkey),Result(column request_body,property requestBody),Result(column status,property status),Result(column errormsg,property errormsg)})ListTask findTaskLessTanCurrentTime(Date currentTime);
}任务类实现
Component
public class QueryPointTask {
Autowiredprivate RabbitTemplate rabbitTemplate;
Autowiredprivate TaskMapper taskMapper;
Scheduled(cron 0 0/2 * * * ?)public void queryTask(){
//1.获取小于系统当前时间数据ListTask taskList taskMapper.findTaskLessTanCurrentTime(new Date());
if (taskList!null taskList.size()0){//将任务数据发送到消息队列for (Task task : taskList) {rabbitTemplate.convertAndSend(RabbitMQConfig.EX_BUYING_ADDPOINTURSE,RabbitMQConfig.CG_BUYING_ADDPOINT_KEY, JSON.toJSONString(task));}}}
}三、 用户服务更改积分
步骤1 添加rabbitmq配置类(与订单服务相同)
Configuration
public class RabbitMQConfig {//添加积分任务交换机public static final String EX_BUYING_ADDPOINTURSE ex_buying_addpointurse;
//添加积分消息队列public static final String CG_BUYING_ADDPOINT cg_buying_addpoint;
//完成添加积分消息队列public static final String CG_BUYING_FINISHADDPOINT cg_buying_finishaddpoint;
//添加积分路由keypublic static final String CG_BUYING_ADDPOINT_KEY addpoint;
//完成添加积分路由keypublic static final String CG_BUYING_FINISHADDPOINT_KEY finishaddpoint;
/*** 交换机配置* return the exchange*/Bean(EX_BUYING_ADDPOINTURSE)public Exchange EX_DECLARE() {return ExchangeBuilder.directExchange(EX_BUYING_ADDPOINTURSE).durable(true).build();}//声明队列Bean(CG_BUYING_FINISHADDPOINT)public Queue QUEUE_CG_BUYING_FINISHADDPOINT() {Queue queue new Queue(CG_BUYING_FINISHADDPOINT);return queue;}//声明队列Bean(CG_BUYING_ADDPOINT)public Queue QUEUE_CG_BUYING_ADDPOINT() {Queue queue new Queue(CG_BUYING_ADDPOINT);return queue;}/*** 绑定队列到交换机 .* param queue the queue* param exchange the exchange* return the binding*/Beanpublic Binding BINDING_QUEUE_FINISHADDPOINT(Qualifier(CG_BUYING_FINISHADDPOINT) Queue queue, Qualifier(EX_BUYING_ADDPOINTURSE) Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(CG_BUYING_FINISHADDPOINT_KEY).noargs();}Beanpublic Binding BINDING_QUEUE_ADDPOINT(Qualifier(CG_BUYING_ADDPOINT) Queue queue, Qualifier(EX_BUYING_ADDPOINTURSE) Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(CG_BUYING_ADDPOINT_KEY).noargs();}
}步骤2 定义消息监听类
Component
public class AddPointListener {
Autowiredprivate UserService userService;
Autowiredprivate RedisTemplate redisTemplate;
Autowiredprivate RabbitTemplate rabbitTemplate;
RabbitListener(queues RabbitMQConfig.CG_BUYING_ADDPOINT)public void receiveMessage(String message){
Task task JSON.parseObject(message, Task.class);
if (task null || StringUtils.isEmpty(task.getRequestBody())){return;}
//判断redis中是否存在内容Object value redisTemplate.boundValueOps(task.getId()).get();if (value ! null){return;}
//更新用户积分int result userService.updateUserPoints(task);
if (result0){return;}
//返回通知
rabbitTemplate.convertAndSend(RabbitMQConfig.EX_BUYING_ADDPOINTURSE,RabbitMQConfig.CG_BUYING_FINISHADDPOINT_KEY,JSON.toJSONString(task));
}
}
步骤3 定义修改用户积分实现 实现思路
1判断当前订单是否操作过
2将任务存入redis
3修改用户积分
4添加积分日志表记录
5删除redis中记录
Autowired
private PointLogMapper pointLogMapper;
/*** 修改用户积分* param task* return*/OverrideTransactionalpublic int updateUserPoints(Task task) {Map info JSON.parseObject(task.getRequestBody(), Map.class);String userName info.get(userName).toString();String orderId info.get(orderId).toString();int point (int) info.get(point);
//判断当前订单是否操作过PointLog pointLog pointLogMapper.findLogInfoByOrderId(orderId);if (pointLog ! null){return 0;}
//将任务存入redisredisTemplate.boundValueOps(task.getId()).set(exist,1,TimeUnit.MINUTES);
//修改用户积分int result userMapper.updateUserPoint(userName, point);if (result0){return result;}
//添加积分日志表记录pointLog new PointLog();pointLog.setOrderId(orderId);pointLog.setPoint(point);pointLog.setUserId(userName);result pointLogMapper.insertSelective(pointLog);if (result0){return result;}
//删除redis中的记录redisTemplate.delete(task.getId());
return 1;}步骤4 定义根据订单id查询积分日志表 定义PointLogMapper实现根据订单id查询
public interface PointLogMapper extends MapperPointLog {
Select(select * from tb_point_log where order_id#{orderId})PointLog findLogInfoByOrderId(Param(orderId) String orderId);
}四、 订单服务删除原任务
步骤1 定义监听类 在订单服务中定义监听类用于监听队列如果队列中有消息则删除原任务防止消息重复发送并对任务信息进行记录
Component
public class DelTaskListener {
Autowiredprivate TaskService taskService;
RabbitListener(queues RabbitMQConfig.CG_BUYING_FINISHADDPOINT)public void receiveMessage(String message){
Task task JSON.parseObject(message, Task.class);
taskService.delTask(task);}
}步骤 定义任务service
public interface TaskService {
void delTask(Task task);
}Service
Transactional
public class TaskServiceImpl implements TaskService {
Autowiredprivate TaskMapper taskMapper;
Autowiredprivate TaskHisMapper taskHisMapper;
Overridepublic void delTask(Task task) {//1. 设置删除时间task.setDeleteTime(new Date());Long id task.getId();task.setId(null);
//bean复制TaskHis taskHis new TaskHis();BeanUtils.copyProperties(task,taskHis);
//记录任务信息taskHisMapper.insertSelective(taskHis);
//删除原任务task.setId(id);taskMapper.deleteByPrimaryKey(task);}
}