当前位置: 首页 > news >正文

申请关闭网站黄骅市市长

申请关闭网站,黄骅市市长,江阴哪里有做网站推广,杭州有名的室内设计公司基于消息队列实现分布式事务#xff0c;实现消息最终一致性 如何基于消息队列实现分布式事务#xff1f; 通过消息队列实现分布式事务的话#xff0c;可以保证当前数据的最终一致性。实现思路#xff1a;将大的分布式事务#xff0c;进行拆分#xff0c;拆分成若干个小…基于消息队列实现分布式事务实现消息最终一致性 如何基于消息队列实现分布式事务 通过消息队列实现分布式事务的话可以保证当前数据的最终一致性。实现思路将大的分布式事务进行拆分拆分成若干个小的本地事务只要每一个小的本地事务是执行成功的那就代表当前的分布式事务是执行成功的。下面以用户下单增加积分来进行消息队列实现分布式事务的操作。 功能实现流程图如下 主要包含三个角色订单服务消息队列和用户服务。 步骤1-4订单服务 1.首先在订单服务中会生成订单相关数据并添加到数据库中。 2.接着会在订单数据库中创建任务表 会向任务表添加数据UsernameorderIDpoint积分 3.设置定时任务扫描任务表获取相关数据如每隔7S扫描一次 4.发送任务数据到MQ 步骤5-12用户服务 5.6.7.用户服务会接收消息保证消息的不可重复消费判断在Redis和 在数据库中是否存在。防止重复处理不要重复添加积分。 用户服务正在执行消息 如果不存在继续业务功能。 添加本地事务 12.通知订单服务 步骤13订单服务 13.删除任务表数据。 一、 准备工作 1.1 shangcheng_order库新增数据表 任务表 DROP TABLE IF EXISTS tb_task; CREATE TABLE tb_task (id bigint(32) NOT NULL AUTO_INCREMENT COMMENT 任务id,create_time datetime DEFAULT NULL,update_time datetime DEFAULT NULL,delete_time datetime DEFAULT NULL,task_type varchar(32) DEFAULT NULL COMMENT 任务类型,mq_exchange varchar(64) DEFAULT NULL COMMENT 交换机名称,mq_routingkey varchar(64) DEFAULT NULL COMMENT routingkey,request_body varchar(512) DEFAULT NULL COMMENT 任务请求的内容,status varchar(32) DEFAULT NULL COMMENT 任务状态,errormsg varchar(512) DEFAULT NULL COMMENT 任务错误信息,PRIMARY KEY (id) ) ENGINEInnoDB DEFAULT CHARSETutf8;历史任务表 DROP TABLE IF EXISTS tb_task_his; CREATE TABLE tb_task_his (id bigint(32) NOT NULL AUTO_INCREMENT COMMENT 任务id,create_time datetime DEFAULT NULL,update_time datetime DEFAULT NULL,delete_time datetime DEFAULT NULL,task_type varchar(32) DEFAULT NULL COMMENT 任务类型,mq_exchange varchar(64) DEFAULT NULL COMMENT 交换机名称,mq_routingkey varchar(64) DEFAULT NULL COMMENT routingkey,request_body varchar(512) DEFAULT NULL COMMENT 任务请求的内容,status varchar(32) DEFAULT NULL COMMENT 任务状态,errormsg varchar(512) DEFAULT NULL,PRIMARY KEY (id) ) ENGINEInnoDB AUTO_INCREMENT9 DEFAULT CHARSETutf8;1.2 shangcheng_service_order_api添加相关实体类 Table(nametb_task) public class Task { ​Idprivate Long id; ​Column(name create_time)private Date createTime; ​Column(name update_time)private Date updateTime; ​Column(name delete_time)private Date deleteTime; ​Column(name task_type)private String taskType; ​Column(name mq_exchange)private String mqExchange; ​Column(name mq_routingkey)private String mqRoutingkey; ​Column(name request_body)private String requestBody; ​Column(name status)private String status; ​Column(name errormsg)private String errormsg;//gettersetter略 }Table(nametb_task_his) public class TaskHis { ​Idprivate Long id; ​Column(name create_time)private Date createTime; ​Column(name update_time)private Date updateTime; ​Column(name delete_time)private Date deleteTime; ​Column(name task_type)private String taskType; ​Column(name mq_exchange)private String mqExchange; ​Column(name mq_routingkey)private String mqRoutingkey; ​Column(name request_body)private String requestBody; ​Column(name status)private String status; ​Column(name errormsg)private String errormsg; ​//gettersetter略 }1.3 shangcheng_user新增积分日志表 DROP TABLE IF EXISTS tb_point_log; CREATE TABLE tb_point_log (order_id varchar(200) NOT NULL,user_id varchar(200) NOT NULL,point int(11) NOT NULL,PRIMARY KEY (order_id) ) ENGINEInnoDB DEFAULT CHARSETutf8; 1.4 shangcheng_service_user_api添加实体类 PointLog Table(nametb_point_log) public class PointLog { ​private String orderId;private String userId;private Integer point;//gettersetter略 }1.5 shangcheng_service_order添加rabbitMQ配置类 Configuration public class RabbitMQConfig {//添加积分任务交换机public static final String EX_BUYING_ADDPOINTUSER ex_buying_addpointuser; ​//添加积分消息队列public static final String CG_BUYING_ADDPOINT cg_buying_addpoint; ​//完成添加积分消息队列public static final String CG_BUYING_FINISHADDPOINT cg_buying_finishaddpoint; ​//添加积分路由keypublic static final String CG_BUYING_ADDPOINT_KEY addpoint; ​//完成添加积分路由keypublic static final String CG_BUYING_FINISHADDPOINT_KEY finishaddpoint; ​/*** 交换机配置* return the exchange*/Bean(EX_BUYING_ADDPOINTUSER)public Exchange EX_BUYING_ADDPOINTUSER() {return ExchangeBuilder.directExchange(EX_BUYING_ADDPOINTUSER).durable(true).build();}//声明队列Bean(CG_BUYING_FINISHADDPOINT)public Queue QUEUE_CG_BUYING_FINISHADDPOINT() {Queue queue new Queue(CG_BUYING_FINISHADDPOINT);return queue;}//声明队列Bean(CG_BUYING_ADDPOINT)public Queue QUEUE_CG_BUYING_ADDPOINT() {Queue queue new Queue(CG_BUYING_ADDPOINT);return queue;}/*** 绑定队列到交换机 .* param queue the queue* param exchange the exchange* return the binding*/Beanpublic Binding BINDING_QUEUE_FINISHADDPOINT(Qualifier(CG_BUYING_FINISHADDPOINT) Queue queue, Qualifier(EX_BUYING_ADDPOINTUSER) Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(CG_BUYING_FINISHADDPOINT_KEY).noargs();}Beanpublic Binding BINDING_QUEUE_ADDPOINT(Qualifier(CG_BUYING_ADDPOINT) Queue queue, Qualifier(EX_BUYING_ADDPOINTUSER) Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(CG_BUYING_ADDPOINT_KEY).noargs();} } 二、 订单服务添加任务并发送 步骤1 修改添加订单方法 当添加订单的时候添加任务表中相关数据, 代码如下 //增加任务表记录 Task task new Task(); task.setCreateTime(new Date()); task.setUpdateTime(new Date()); task.setMqExchange(RabbitMQConfig.EX_BUYING_ADDPOINTURSE); task.setMqRoutingkey(RabbitMQConfig.CG_BUYING_ADDPOINT_KEY); ​ Map map new HashMap(); map.put(userName,order.getUsername()); map.put(orderId,order.getId()); map.put(point,order.getPayMoney()); task.setRequestBody(JSON.toJSONString(map)); taskMapper.insertSelective(task);步骤2 定时扫描任务表最新数据 订单服务新增定时任务类获取小于系统当前时间的所有任务数据 1 修改订单服务启动类添加开启定时任务注解 EnableScheduling2 定义定时任务类 查询最新数据 更新taskMapper新增方法查询所有小于系统当前时间的数据 public interface TaskMapper extends MapperTask { ​Select(SELECT * from tb_task WHERE update_time#{currentTime})Results({Result(column create_time,property createTime),Result(column update_time,property updateTime),Result(column delete_time,property deleteTime),Result(column task_type,property taskType),Result(column mq_exchange,property mqExchange),Result(column mq_routingkey,property mqRoutingkey),Result(column request_body,property requestBody),Result(column status,property status),Result(column errormsg,property errormsg)})ListTask findTaskLessTanCurrentTime(Date currentTime); }任务类实现 Component public class QueryPointTask { ​Autowiredprivate RabbitTemplate rabbitTemplate; ​Autowiredprivate TaskMapper taskMapper; ​Scheduled(cron 0 0/2 * * * ?)public void queryTask(){ ​//1.获取小于系统当前时间数据ListTask taskList taskMapper.findTaskLessTanCurrentTime(new Date()); ​if (taskList!null taskList.size()0){//将任务数据发送到消息队列for (Task task : taskList) {rabbitTemplate.convertAndSend(RabbitMQConfig.EX_BUYING_ADDPOINTURSE,RabbitMQConfig.CG_BUYING_ADDPOINT_KEY, JSON.toJSONString(task));}}} }三、 用户服务更改积分 步骤1 添加rabbitmq配置类(与订单服务相同) Configuration public class RabbitMQConfig {//添加积分任务交换机public static final String EX_BUYING_ADDPOINTURSE ex_buying_addpointurse; ​//添加积分消息队列public static final String CG_BUYING_ADDPOINT cg_buying_addpoint; ​//完成添加积分消息队列public static final String CG_BUYING_FINISHADDPOINT cg_buying_finishaddpoint; ​//添加积分路由keypublic static final String CG_BUYING_ADDPOINT_KEY addpoint; ​//完成添加积分路由keypublic static final String CG_BUYING_FINISHADDPOINT_KEY finishaddpoint; ​/*** 交换机配置* return the exchange*/Bean(EX_BUYING_ADDPOINTURSE)public Exchange EX_DECLARE() {return ExchangeBuilder.directExchange(EX_BUYING_ADDPOINTURSE).durable(true).build();}//声明队列Bean(CG_BUYING_FINISHADDPOINT)public Queue QUEUE_CG_BUYING_FINISHADDPOINT() {Queue queue new Queue(CG_BUYING_FINISHADDPOINT);return queue;}//声明队列Bean(CG_BUYING_ADDPOINT)public Queue QUEUE_CG_BUYING_ADDPOINT() {Queue queue new Queue(CG_BUYING_ADDPOINT);return queue;}/*** 绑定队列到交换机 .* param queue the queue* param exchange the exchange* return the binding*/Beanpublic Binding BINDING_QUEUE_FINISHADDPOINT(Qualifier(CG_BUYING_FINISHADDPOINT) Queue queue, Qualifier(EX_BUYING_ADDPOINTURSE) Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(CG_BUYING_FINISHADDPOINT_KEY).noargs();}Beanpublic Binding BINDING_QUEUE_ADDPOINT(Qualifier(CG_BUYING_ADDPOINT) Queue queue, Qualifier(EX_BUYING_ADDPOINTURSE) Exchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(CG_BUYING_ADDPOINT_KEY).noargs();} }步骤2 定义消息监听类 Component public class AddPointListener { ​Autowiredprivate UserService userService; ​Autowiredprivate RedisTemplate redisTemplate; ​Autowiredprivate RabbitTemplate rabbitTemplate; ​RabbitListener(queues RabbitMQConfig.CG_BUYING_ADDPOINT)public void receiveMessage(String message){ ​Task task JSON.parseObject(message, Task.class); ​if (task null || StringUtils.isEmpty(task.getRequestBody())){return;} ​//判断redis中是否存在内容Object value redisTemplate.boundValueOps(task.getId()).get();if (value ! null){return;} ​//更新用户积分int result userService.updateUserPoints(task); ​if (result0){return;} ​//返回通知 rabbitTemplate.convertAndSend(RabbitMQConfig.EX_BUYING_ADDPOINTURSE,RabbitMQConfig.CG_BUYING_FINISHADDPOINT_KEY,JSON.toJSONString(task)); ​} } 步骤3 定义修改用户积分实现 实现思路 1判断当前订单是否操作过 2将任务存入redis 3修改用户积分 4添加积分日志表记录 5删除redis中记录 Autowired private PointLogMapper pointLogMapper; ​/*** 修改用户积分* param task* return*/OverrideTransactionalpublic int updateUserPoints(Task task) {Map info JSON.parseObject(task.getRequestBody(), Map.class);String userName info.get(userName).toString();String orderId info.get(orderId).toString();int point (int) info.get(point); ​//判断当前订单是否操作过PointLog pointLog pointLogMapper.findLogInfoByOrderId(orderId);if (pointLog ! null){return 0;} ​//将任务存入redisredisTemplate.boundValueOps(task.getId()).set(exist,1,TimeUnit.MINUTES); ​//修改用户积分int result userMapper.updateUserPoint(userName, point);if (result0){return result;} ​//添加积分日志表记录pointLog new PointLog();pointLog.setOrderId(orderId);pointLog.setPoint(point);pointLog.setUserId(userName);result pointLogMapper.insertSelective(pointLog);if (result0){return result;} ​//删除redis中的记录redisTemplate.delete(task.getId()); ​return 1;}步骤4 定义根据订单id查询积分日志表 定义PointLogMapper实现根据订单id查询 public interface PointLogMapper extends MapperPointLog { ​Select(select * from tb_point_log where order_id#{orderId})PointLog findLogInfoByOrderId(Param(orderId) String orderId); }四、 订单服务删除原任务 步骤1 定义监听类 在订单服务中定义监听类用于监听队列如果队列中有消息则删除原任务防止消息重复发送并对任务信息进行记录 Component public class DelTaskListener { ​Autowiredprivate TaskService taskService; ​RabbitListener(queues RabbitMQConfig.CG_BUYING_FINISHADDPOINT)public void receiveMessage(String message){ ​Task task JSON.parseObject(message, Task.class); ​taskService.delTask(task);} }步骤 定义任务service public interface TaskService { ​void delTask(Task task); }Service Transactional public class TaskServiceImpl implements TaskService { ​Autowiredprivate TaskMapper taskMapper; ​Autowiredprivate TaskHisMapper taskHisMapper; ​ ​Overridepublic void delTask(Task task) {//1. 设置删除时间task.setDeleteTime(new Date());Long id task.getId();task.setId(null); ​//bean复制TaskHis taskHis new TaskHis();BeanUtils.copyProperties(task,taskHis); ​//记录任务信息taskHisMapper.insertSelective(taskHis); ​//删除原任务task.setId(id);taskMapper.deleteByPrimaryKey(task);} }
http://www.dnsts.com.cn/news/242663.html

相关文章:

  • 网站 seo免费域名注册流程
  • 网站建设内容3000字wordpress如何设置头像
  • 建设装修公司网站谷歌seo搜索优化
  • 牟平做网站怎样用ps设计网站模板
  • 济南行知网网站建设北京中兴时代网站建设
  • 企业的网站内容管理系统设计专业哪个学校好
  • 南昌手机网站建设asp.net网站开发是什么
  • 系统网站设计订餐网站怎么做
  • 福田网站设计哪家好wordpress 折800模板
  • 个人资料展示网站哪种网站
  • 网站为什么没有被收录网站开发实训内容
  • 大沥网站开发公司官网网址
  • 网站建设及推广培训信息流推广渠道
  • 怎么修改网站图片深圳开发软件公司
  • wordpress娱乐网模板海口百度seo公司
  • 中美网站建设中国建筑装饰网设计师联盟
  • 中国建设信号工证网站wordpress 暖岛 主题
  • 微网站是手机网站吗wordpress vip付费插件
  • 网站修改图片怎么做如何查看网站外链
  • 做网站销售挣钱吗海口企业网站建设
  • 青海省公路建设服务网站天津做网站.都找津坤科技
  • 做旅游网站的原因长沙做网站团队
  • 淘客做网站有必要吗页面设计尺寸规范
  • 河南手机网站建设公司哪家好扫码点餐小程序怎么制作
  • 做app网站有哪些广州越秀公司网站建设
  • html创建站点的步骤wordpress付费播放
  • 餐饮品牌设计网站网站备案被注销的原因
  • 有哪些做副业的网站优秀的门户网站
  • 花都做网站公司wordpress博客字体
  • 用vs做网站在安装时要勾选霸州做网站1766534168