工信部网站备案登陆,仿站工具哪个好最好,net framework可以用来做网站吗,建设云个人网站RabbitMQ 消息自动重试机制#xff1a; 让我们消费者处理我们业务代码的时候#xff0c;如果抛出异常的情况下#xff0c;在这时候 MQ 会自动触发重试机制#xff0c;默认的情况下 RabbitMQ 时无限次数的重试需要认为指定重试次数限制问题 在什么情况下消费者实现重试策略… RabbitMQ 消息自动重试机制 让我们消费者处理我们业务代码的时候如果抛出异常的情况下在这时候 MQ 会自动触发重试机制默认的情况下 RabbitMQ 时无限次数的重试需要认为指定重试次数限制问题 在什么情况下消费者实现重试策略 消费者调用第三方接口但是调用第三方接口失败后需要实现重试策略网络延迟只是暂时调不通重试多次有可能会调通消费者获取代码后因为代码问题抛出数据异常此时不需要实现重试策略 我们需要将日志存放起来后期通过定时任务或者人工补偿形式如果是重试多次还是失败消息需要重新发布消费者版本实现消费可以使用死信队列 MQ 在重试的过程中可能会引发消费者重复消费的问题MQ 消费者需要解决幂等性问题 幂等性保证数据唯一 解决幂等性问题 生产者在投递消息的时候生成一个唯一 id 放在我们消息中消费者获取到该消息可以根据全局唯一 id 实现去重全局唯一 id 根据业务来定的订单号码作为全局的 id 实际上还是需要在 DB 层面解决数据防重复业务逻辑是在做 insert 操作使用唯一主键约束业务逻辑是在做 update 操作使用乐观锁 当消费者业务逻辑代码中抛出异常自动实现重试(默认是无数次重试)应该对 RabbitMQ 重试次数实现限制比如最多重试 5 次每次间隔 30 秒重试多次还是失败的情况下存放到死信队列或者存放到数据库表中记录后期人工补偿 如何选择消息重试 消费者获取消息后调用第三方接口但是调用第三方接口失败后是否要重试消费者获取消息后如果代码问题抛出数据异常是否需要重试总结 如果消费者处理消息时因为代码原因抛出异常是需要重新发布版本才能解决就不要重试存放到死信队列或者是数据库记录、后期人工实现补偿 实现 yml 文件 spring:rabbitmq:####连接地址host: 127.0.0.1####端口号port: 5672####账号username: guest####密码password: guest### 地址virtual-host: boyatopVirtualHostlistener:simple:retry:#开启消费者进行重试(程序异常的情况)enabled: true#最大重试次数max-attempts: 5#重试间隔时间initial-interval: 3000#手动确认机制acknowledge-mode: manualdatasource:url: jdbc:mysql://localhost:3306/test?useUnicodetruecharacterEncodingUTF-8username: rootpassword: rootdriver-class-name: com.mysql.jdbc.Driverboyatop:#备胎交换机dlx:exchange: boyatop_dlx_exchangequeue: boyatop_dlx_queueroutingKey: dlx#普通交换机order:exchange: boyatop_order_exchangequeue: boyatop_order_queueroutingKey: order 配置类 Component
public class IdempotentExchangeConfig {//交换机Value(${boyatop.order.exchange})private String order_exchange;//普通队列Value(${boyatop.order.queue})private String order_queue;//普通队列的 keyValue(${boyatop.order.routingKey})private String order_rotingKey;//死信交换机Value(${boyatop.dlx.exchange})private String dlx_exchange;//死信队列Value(${boyatop.dlx.queue})private String dlx_queue;//死信队列的 keyValue(${boyatop.dlx.routingKey})private String dlx_routingKey;//定义死信交换机Beanpublic DirectExchange dlxExchange(){return new DirectExchange(dlx_exchange);}//定义死信队列Beanpublic Queue dlxQueue(){return new Queue(dlx_queue);}//定义普通交换机Beanpublic DirectExchange orderExchange(){return new DirectExchange(order_exchange);}//定义普通队列Beanpublic Queue orderQueue(){//订单队列绑定死信交换机MapString,Object arguments new HashMap(2);arguments.put(x-dead-letter-exchange,dlx_exchange);arguments.put(x-dead-letter-routing-key,dlx_routingKey);return new Queue(order_queue,true,false,false,arguments);
// return QueueBuilder.durable(order_queue).withArguments(arguments).build();}//订单队列绑定交换机Beanpublic Binding bindingOrderExchange(DirectExchange orderExchange, Queue orderQueue){return BindingBuilder.bind(orderQueue).to(orderExchange).with(order_rotingKey);}//死信队列绑定交换机Beanpublic Binding bindingDlxExchange(DirectExchange dlxExchange, Queue dlxQueue){return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(dlx_routingKey);}} 实体类 Data
NoArgsConstructor
public class OrderEntity implements Serializable {private Integer id;private String orderName;private String orderId;public OrderEntity(String orderName, String orderId) {this.orderName orderName;this.orderId orderId;}
} Mapper public interface OrderMapper {Insert(INSERT into order_entity value (null,#{orderName},#{orderId}))int addOrder(OrderEntity orderEntity);Select(select * from order_entity where order_id #{orderId} )OrderEntity getOrder(String orderId);
} 生产者 Component
Slf4j
public class OrderProducer {Autowiredprivate RabbitTemplate rabbitTemplate;Value(${boyatop.order.exchange})private String order_exchange;//普通队列的 keyValue(${boyatop.order.routingKey})private String order_rotingKey;public void sendMsg(String orderName,String orderId){OrderEntity orderEntity new OrderEntity(orderName,orderId);rabbitTemplate.convertAndSend(order_exchange,order_rotingKey,orderEntity,message - {message.getMessageProperties().setExpiration(5000);return message;});}
} 消费者 Component
Slf4j
RabbitListener(queues boyatop_order_queue)
public class OrderConsumer {Autowiredprivate OrderMapper orderMapper;RabbitHandlerpublic void process(OrderEntity orderEntity, Message message, Channel channel){try{String orderId orderEntity.getOrderId();if(StringUtils.isEmpty(orderId)){return;}OrderEntity dbOrderEntity orderMapper.getOrder(orderId);if(dbOrderEntity ! null){//出现异常,消息拒收,进入死信队列人为处理channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);}int result orderMapper.addOrder(orderEntity);//出现异常int i 1 / 0;channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);System.out.println(监听内容: orderEntity);}catch (Exception e){// 记录该消息日志形式 存放数据库db中、后期通过定时任务实现消息补偿、人工实现补偿//将该消息存放到死信队列中单独写一个死信消费者实现消费。}}
}