企业网站源码怎么用,wordpress防爆破插件,邯郸信息港人才招聘,网站怎么做市场分析事务消息介绍
在一些对数据一致性有强需求的场景#xff0c;可以用 Apache RocketMQ 事务消息来解决#xff0c;从而保证上下游数据的一致性。
以电商交易场景为例#xff0c;用户支付订单这一核心操作的同时会涉及到下游物流发货、积分变更、购物车状态清空等多个子系统的…事务消息介绍
在一些对数据一致性有强需求的场景可以用 Apache RocketMQ 事务消息来解决从而保证上下游数据的一致性。
以电商交易场景为例用户支付订单这一核心操作的同时会涉及到下游物流发货、积分变更、购物车状态清空等多个子系统的变更。当前业务的处理分支包括
主分支订单系统状态更新由未支付变更为支付成功。物流系统状态新增新增待发货物流记录创建订单物流记录。积分系统状态变更变更用户积分更新用户积分表。购物车系统状态变更清空购物车更新用户购物车记录。
当主分支订单系统状态更新失败后物流、积分、购物车系统都不应该接收到消息
事务消息的发送流程
使用普通消息是做不到的因为他会直接将消息发送到topic中
而事务消息参考了两阶段提交的原理
先把消息发送broker中当消息发送成功后会执行本地事务通过本地事务的执行情况返回一个状态状态对应三种情况 LocalTransactionState.UNKNOW需要broker调用发送端的回查机制LocalTransactionState.COMMIT_MESSAGEbroker将消息发送到指定的topic此时消费端可以接收到消息LocalTransactionState.ROLLBACK_MESSAGEbroker丢弃消息不发送到指定的topic消费端接收不到消息
整个事务消息的详细交互流程如下图所示
Test
public void sendTrans() throws MQBrokerException, RemotingException, InterruptedException, MQClientException {// 创建事务消息发送客户端TransactionMQProducer transProducer new TransactionMQProducer(test-trans-producer);transProducer.setNamesrvAddr(RocketMQConfig.NAME_SERVER_ADDR);// 指定回查事务消息时的线程池ExecutorService executorService new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue(2000), new ThreadFactory() {Overridepublic Thread newThread(Runnable r) {Thread thread new Thread(r);thread.setName(client-transaction-msg-check-thread);return thread;}});transProducer.setExecutorService(executorService);// 设置事务监听器transProducer.setTransactionListener(new TransactionListener() {// 执行本地事务Overridepublic LocalTransactionState executeLocalTransaction(Message message, Object o) {System.out.println(Thread.currentThread().getName() 执行本地事务);// 触发回查机制return LocalTransactionState.UNKNOW;}// 回查本地事务如果执行本地事务返回UNKNOW状态或者生产者应用退出导致本地事务未提交任何状态Overridepublic LocalTransactionState checkLocalTransaction(MessageExt messageExt) {System.out.println(Thread.currentThread().getName() 触发事务回查);// 提交事务return LocalTransactionState.COMMIT_MESSAGE;}});transProducer.start();Message message new Message(RocketMQConfig.TEST_TOPIC, hello world.getBytes());// 发送事务消息SendResult send transProducer.sendMessageInTransaction(message,null);System.out.println(send.getSendStatus());Thread.sleep(Integer.MAX_VALUE);
}注需要注意的是事务消息的生产组名称 ProducerGroupName不能随意设置。事务消息有回查机制回查时Broker端如果发现原始生产者已经崩溃则会联系同一生产者组的其他生产者实例回查本地事务执行情况以Commit或Rollback半事务消息。