ftp 网站管理,淘宝做关键词的网站,大米包装设计,东莞住房城乡建设部官网背景
这里后补直接上代码
最佳实践
主要从两个方面保证消息不丢失
RabbitMQ方面
创建队列时开启持久化创建交换器时开启持久化创建镜像队列#xff08;可选#xff09;开启延迟队列#xff08;可选#xff09;
代码层面
开启生产者到交换器回调参数开启交换器到队列…背景
这里后补直接上代码
最佳实践
主要从两个方面保证消息不丢失
RabbitMQ方面
创建队列时开启持久化创建交换器时开启持久化创建镜像队列可选开启延迟队列可选
代码层面
开启生产者到交换器回调参数开启交换器到队列回调参数开启消费者手动ack注意消费端打印日志考虑手动补偿可选
实现代码
RabbitMQ配置文件
package com.jndj.core.config;import com.rabbitmq.client.ConnectionFactory;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;Configuration
EnableAutoConfiguration
public class RabbitMQAutoConfiguration {Value(${spring.rabbitmq.host})public String rmHost;Value(${spring.rabbitmq.port})public int rmPort;Value(${spring.rabbitmq.username})public String rmUsername;Value(${spring.rabbitmq.password})public String rmPassword;Value(${spring.rabbitmq.virtual-host})public String virtualHost;Beanpublic CachingConnectionFactory connectionFactory() throws NoSuchAlgorithmException, KeyManagementException {ConnectionFactory rabbitConnectionFactory new ConnectionFactory();rabbitConnectionFactory.setHost(rmHost);rabbitConnectionFactory.setPort(rmPort);rabbitConnectionFactory.setUsername(rmUsername);rabbitConnectionFactory.setPassword(rmPassword);rabbitConnectionFactory.setVirtualHost(virtualHost);rabbitConnectionFactory.useSslProtocol();CachingConnectionFactory cachingConnectionFactory new CachingConnectionFactory(rabbitConnectionFactory);cachingConnectionFactory.setPublisherReturns(true);cachingConnectionFactory.setPublisherConfirms(true);return cachingConnectionFactory;}Beanpublic RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);rabbitTemplate.setMandatory(true);rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println(ConfirmCallback: 相关数据 correlationData);System.out.println(ConfirmCallback: 确认情况 ack);System.out.println(ConfirmCallback: 原因 cause);}});rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println(ReturnCallback: 消息 message);System.out.println(ReturnCallback: 回应码 replyCode);System.out.println(ReturnCallback: 回应信息 replyText);System.out.println(ReturnCallback: 交换机 exchange);System.out.println(ReturnCallback: 路由键 routingKey);}});rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());return rabbitTemplate;}Beanpublic SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(CachingConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);return factory;}}生产者 public AjaxResult insertStuResults(RequestBody MyoResults model) {try {rabbitTemplate.convertAndSend(RabbitMQConstant.EX_STU_RESULTS_MANUAL, RabbitMQConstant.Q_STU_RESULTS_MANUAL, model);return AjaxResult.success(上送成功);} catch (Exception e) {return AjaxResult.error(e.getMessage());}}消费者 RabbitListener(queues RabbitMQConstant.Q_STU_RESULTS_MANUAL)RabbitHandlerpublic void getManualMsg(Message message, Channel channel) throws IOException {String jsonString new String(message.getBody(), StandardCharsets.UTF_8);ObjectMapper objectMapper new ObjectMapper();MyoResults model objectMapper.readValue(jsonString, MyoResults.class);iStuResultsService.insertMongodb(model);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}