建设银行兴安支行网站,医疗网站开发ppt,阳江人社局官网招聘,如何提高用户和网站的互动性造成重复消费的原因#xff1a;
MQ向消费者推送message#xff0c;消费者向MQ返回ack#xff0c;告知所推送的消息消费成功。但是由于网络波动等原因#xff0c;可能造成消费者向MQ返回的ack丢失。MQ长时间#xff08;一分钟#xff09;收不到ack#xff0c;于是会向消…造成重复消费的原因
MQ向消费者推送message消费者向MQ返回ack告知所推送的消息消费成功。但是由于网络波动等原因可能造成消费者向MQ返回的ack丢失。MQ长时间一分钟收不到ack于是会向消费者再次推送该条message这样就造成了重复消费。
解决重复消费的办法 用存储redis或者mysql记录一下已经消费的message的id当message被消费前先去存储中查一下消费记录没有该条message的id则正常消费返回ack有该条message的id的话不用消费直接返回ack给MQ。 当然实际生产中的话选用redis是比较好的选择毕竟查mysql要进行磁盘IO效率要低得多而且绝大多数重复消费都是由于MQ没有收到消费者的ack于是造成MQ再次向消费者进行同一条message的投递。所以message的消费记录其实我们并不需要一直记录只需要保存一段时间当下次投递过来的时候消费者能查到消费记录然后准确返回ack给MQ就行。yml
#配置rabbitMq 服务器rabbitmq:host: xxxx#rabbitmq相关配置 15672是Web管理界面的端口5672是MQ访问的端口port: xxxxusername: xxxxpassword: xxxx#虚拟host 可以不设置,使用server默认hostvirtual-host: xxxxconnection-timeout: 0#确认消息已发送到队列(Queue)publisher-returns: true #确认消息已发送到交换机(Exchange)publisher-confirm-type: correlated# 设置消费端手动 acklistener:simple:retry:# 开启消费者(程序出现异常的情况下)进行重试enabled: true#重试间隔时间max-interval: 1000# 最大重试次数max-attempts: 3#开启手动确认消息acknowledge-mode: manual监听类
package com.rabbitmqprovider.service;import com.rabbitmq.client.Channel;
import com.rabbitmqprovider.commons.CommonUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;import java.io.IOException;/**** 防止重复消费*/
Slf4j
Service
public class TestBasicService {Autowiredprivate StringRedisTemplate redisTemplate;/*** RabbitListener 可以写在类、方法上* param channel* param message* throws IOException*///RabbitListener(queues {CommonUtils.queueStr})RabbitHandlerpublic void getMessage(Channel channel, Message message) throws IOException {try{String messageId message.getMessageProperties().getMessageId();String msg new String(message.getBody(),UTF-8);//判断messageId在redis中是否存在boolean flagestringRedisTemplate(messageId,msg);if(!flage){log.error(消息已重复处理失败,拒绝再次接收...);channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息}else{//如果要防止 重复消费则需要将 id值存在 redis,每次 都要去redis中拿id比对是否存在存在则消费过-messageIdchannel.basicAck(message.getMessageProperties().getDeliveryTag(), false);log.info(接收到的消息{}-redisTemplate.opsForValue().get(messageId));}}catch (Exception e){if (message.getMessageProperties().getRedelivered()) {log.error(消息已重复处理失败,拒绝再次接收...);channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息} else {log.error(消息即将再次返回队列处理...);channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}}}/*** 判断Key是否存在* param messageId 唯一表示key* param msg value值* return*/private boolean stringRedisTemplate(String messageId,String msg){log.info(messageIdmessageId);//判断Key是否存在 有则返回true没有则返回falseif(redisTemplate.hasKey(messageId)){return false;}else{redisTemplate.opsForValue().setIfAbsent(messageId, msg);}return true;}
}------------------------------------------controller--------------------------------------------------
/*** 解决重复消费问题*/
GetMapping(/sendMessageTestOnly)
public void sendMessageTestOnly(){JSONObject jsonObject new JSONObject();jsonObject.put(message,世界很大);jsonObject.put(msg,你想去看看么);String json jsonObject.toJSONString();String messageIdUUID.randomUUID();Message message MessageBuilder.withBody(json.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding(UTF-8).setMessageId(messageId).build();rabbitTemplate.convertAndSend(CommonUtils.dirExchange,CommonUtils.routingKey,message,new CorrelationData(UUID.randomUUID().toString()));
}
---------------------------------回调------------------------------------------------------
package com.rabbitmqprovider.callback;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;/*** 队列防止消息丢失*/
Slf4j
Component
public class QueueCallback implements RabbitTemplate.ReturnCallback{Overridepublic void returnedMessage(Message message,int replyCode, String replyText, String exchange, String routingKey) {log.info(消息 {} 经交换机 {} 通过routingKey{} 路由到队列失败失败code为{} 失败原因为{},new String(message.getBody()), exchange, routingKey, replyCode, replyText);}
}package com.rabbitmqprovider.callback;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;/*** 当消息由生产者发到交换机后会回调该接口中的confirm方法*/
Component
Slf4j
public class ExchangeCallback implements RabbitTemplate.ConfirmCallback{/* correlationData 内含消息内容* ack 交换机接受成功或者失败。 true表示交换机接受消息成功 false表示交换机接受失败* cause 表示失败原因*/Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack){log.info(交换机收到消息 消息内容为{}-, correlationData);}else {log.info(交换机未收到消息消息内容为{}, 原因为{}-, correlationData, cause);}}}-----------------------------------------------------------------------------------------------------------------
执行顺序时先发送消息然后在接收消息并判断消息是否重复如果不重复 则回复消息否则 拒绝回复最后回调。