当前位置: 首页 > news >正文

北京建站公司球队排名榜实时排名

北京建站公司,球队排名榜实时排名,网站开发人员兼职,aspnet网站开发视频最近做RabbitMQ故障演练发现RabbitMQ服务器停止后#xff0c;基于springboot的消费端不可以自动的恢复#xff0c;队列的消费者消失#xff0c;消息一直积压到队列中#xff0c;这种情况肯定是不可接收的#xff1b;通过研究源代码找到了解决方案。 一、添加自动恢复配置a… 最近做RabbitMQ故障演练发现RabbitMQ服务器停止后基于springboot的消费端不可以自动的恢复队列的消费者消失消息一直积压到队列中这种情况肯定是不可接收的通过研究源代码找到了解决方案。 一、添加自动恢复配置automaticRecovery CachingConnectionFactory factory new CachingConnectionFactory(connectionFactory); cachingConnectionFactoryConfigurer.configure(factory);//设置TCP连接超时时间默认60000ms factory.getRabbitConnectionFactory().setConnectionTimeout(properties.getConnectionTimeout()); //启用或禁用连接自动恢复默认false factory.getRabbitConnectionFactory().setAutomaticRecoveryEnabled(properties.isAutomaticRecovery()); //设置连接恢复时间间隔默认5000ms factory.getRabbitConnectionFactory().setNetworkRecoveryInterval(properties.getNetworkRecoveryInterval()); //启用或禁用拓扑恢复默认true【拓扑恢复功能可以帮助消费者重新声明之前定义的队列、交换机和绑定等拓扑结构】 factory.getRabbitConnectionFactory().setTopologyRecoveryEnabled(properties.isTopologyRecovery()); //替换默认异常处理DefaultExceptionHandler factory.getRabbitConnectionFactory().setExceptionHandler(new DefaultMqExceptionHandler()); //添加连接监听器 factory.addConnectionListener(new DefaultMqConnectionListener(factory));通过上述配置如果RabbitMQ服务器发生故障则会自动重启恢复连接及队列的消费者如果恢复失败则会间隔5000ms再次重试在这里提一个问题如果服务重试一直失败重试的上限是多少带着这个问题我们分析下源码。 二、RabbitMQ客户端实现连接的自动恢复功能 AutorecoveringConnection#beginAutomaticRecovery是在 RabbitMQ 客户端库层面实现的连接的自动恢复功能。当 RabbitMQ 连接出现故障时它会尝试重新建立连接以确保消息传递的可靠性。 private synchronized void beginAutomaticRecovery() throws InterruptedException {//获取故障恢复连接的间隔时间实际是设置的networkRecoveryIntervalfinal long delay this.params.getRecoveryDelayHandler().getDelay(0);if (delay 0) {//等待指定的间隔时间this.wait(delay);}//调用恢复通知监听器this.notifyRecoveryListenersStarted();//获取恢复建立的连接对象final RecoveryAwareAMQConnection newConn this.recoverConnection();//如果为null则直接返回if (newConn null) {return;}//连接已经恢复建立恢复监听器、channel等资源LOGGER.debug(Connection {} has recovered, newConn);this.addAutomaticRecoveryListener(newConn);this.recoverShutdownListeners(newConn);this.recoverBlockedListeners(newConn);this.recoverChannels(newConn);// dont assign new delegate connection until channel recovery is completethis.delegate newConn;//判断是否恢复拓扑结构如果开启则开启拓扑结构恢复if (this.params.isTopologyRecoveryEnabled()) {notifyTopologyRecoveryListenersStarted();recoverTopology(params.getTopologyRecoveryExecutor());}this.notifyRecoveryListenersComplete();}addAutomaticRecoveryListener自动恢复监听器 private void addAutomaticRecoveryListener(final RecoveryAwareAMQConnection newConn) {final AutorecoveringConnection c this;// this listener will run after shutdown listeners,// see https://github.com/rabbitmq/rabbitmq-java-client/issues/135RecoveryCanBeginListener starter cause - {try {if (shouldTriggerConnectionRecovery(cause)) {//开始自动回复c.beginAutomaticRecovery();}} catch (Exception e) {newConn.getExceptionHandler().handleConnectionRecoveryException(c, e);}};synchronized (this) {newConn.addRecoveryCanBeginListener(starter);} }init初始化 public void init() throws IOException, TimeoutException {//建立连接否则抛出异常this.delegate this.cf.newConnection();//自动回复监听器this.addAutomaticRecoveryListener(delegate); }三、消费端实现消息的消费和处理 SimpleMessageListenerContainer.AsyncMessageProcessingConsumer#run是应用程序层面实现消息的消费和处理它负责从RabbitMQ中接收消息并进行相应的逻辑处理 Override // NOSONAR - complexity - many catch blocks public void run() { // NOSONAR - line count...try {//消费端初始化方法initialize();//当消费端是活跃状态或者队列非空或者消费端未被关闭则进入主循环while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {mainLoop();}}catch (InterruptedException e) {...} } 消费端initialize初始化方法 private void initialize() throws Throwable { // NOSONARtry {redeclareElementsIfNecessary();//启动消费端初始化this.consumer.start();this.start.countDown();}catch (QueuesNotAvailableException e) {if (isMissingQueuesFatal()) {throw e;}else {this.start.countDown();//消费端启动异常等待处理handleStartupFailure(this.consumer.getBackOffExecution());throw e;}}catch (FatalListenerStartupException ex) {if (isPossibleAuthenticationFailureFatal()) {throw ex;}else {Throwable possibleAuthException ex.getCause().getCause();if (!(possibleAuthException instanceof PossibleAuthenticationFailureException)) {throw ex;}else {this.start.countDown();//消费端启动异常等待处理handleStartupFailure(this.consumer.getBackOffExecution());throw possibleAuthException;}}}catch (Throwable t) { //NOSONARthis.start.countDown();//消费端启动异常等待处理handleStartupFailure(this.consumer.getBackOffExecution());throw t;}if (getTransactionManager() ! null) {/** Register the consumers channel so it will be used by the transaction manager* if its an instance of RabbitTransactionManager.*/ConsumerChannelRegistry.registerConsumerChannel(this.consumer.getChannel(), getConnectionFactory());}}消费端异常等待处理处理 protected void handleStartupFailure(BackOffExecution backOffExecution) {//获取等待时间间隔参考FixedBackOff类实现long recoveryInterval backOffExecution.nextBackOff();if (BackOffExecution.STOP recoveryInterval) {synchronized (this) {if (isActive()) {logger.warn(stopping container - restart recovery attempts exhausted);stop();}}return;}try {if (logger.isDebugEnabled() isActive()) {logger.debug(Recovering consumer in recoveryInterval ms.);}//当前时间加上等待时间long timeout System.currentTimeMillis() recoveryInterval;//如果当前时间小于等待时间则休眠200毫秒再次尝试while (isActive() System.currentTimeMillis() timeout) {Thread.sleep(RECOVERY_LOOP_WAIT_TIME);}}catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IllegalStateException(Irrecoverable interruption on consumer restart, e);}}FixedBackOff回退等待时间类实现 public class FixedBackOff implements BackOff {// 默认恢复重试间隔public static final long DEFAULT_INTERVAL 5000L;//最大重试次数可以认为无限大public static final long UNLIMITED_ATTEMPTS Long.MAX_VALUE;// 默认恢复重试间隔private long interval 5000L;//最大重试次数可以认为无限大private long maxAttempts Long.MAX_VALUE;public FixedBackOff() {}public FixedBackOff(long interval, long maxAttempts) {this.interval interval;this.maxAttempts maxAttempts;}public void setInterval(long interval) {this.interval interval;}public long getInterval() {return this.interval;}public void setMaxAttempts(long maxAttempts) {this.maxAttempts maxAttempts;}public long getMaxAttempts() {return this.maxAttempts;}public BackOffExecution start() {return new FixedBackOffExecution();}private class FixedBackOffExecution implements BackOffExecution {private long currentAttempts;private FixedBackOffExecution() {this.currentAttempts 0L;}//获取下一次尝试的时间间隔可以认为一直都是5000mspublic long nextBackOff() {this.currentAttempts;return this.currentAttempts FixedBackOff.this.getMaxAttempts() ? FixedBackOff.this.getInterval() : -1L;}public String toString() {String attemptValue FixedBackOff.this.maxAttempts Long.MAX_VALUE ? unlimited : String.valueOf(FixedBackOff.this.maxAttempts);return FixedBackOff{interval FixedBackOff.this.interval , currentAttempts this.currentAttempts , maxAttempts attemptValue };}} } 总结综上源码分析可知消费端故障恢复重试等待时间是5000ms,重试次数可以认为是无限制Long最大值 mainloop主循环逻辑 private void mainLoop() throws Exception { // NOSONAR Exceptiontry {if (SimpleMessageListenerContainer.this.stopNow.get()) {this.consumer.forceCloseAndClearQueue();return;}//接收客户端发送过来的消息至少获取一条boolean receivedOk receiveAndExecute(this.consumer); // At least one message receivedif (SimpleMessageListenerContainer.this.maxConcurrentConsumers ! null) {checkAdjust(receivedOk);}long idleEventInterval getIdleEventInterval();if (idleEventInterval 0) {if (receivedOk) {updateLastReceive();}else {long now System.currentTimeMillis();long lastAlertAt SimpleMessageListenerContainer.this.lastNoMessageAlert.get();long lastReceive getLastReceive();if (now lastReceive idleEventInterval now lastAlertAt idleEventInterval SimpleMessageListenerContainer.this.lastNoMessageAlert.compareAndSet(lastAlertAt, now)) {publishIdleContainerEvent(now - lastReceive);}}}}catch (ListenerExecutionFailedException ex) {// Continue to process, otherwise re-throwif (ex.getCause() instanceof NoSuchMethodException) {throw new FatalListenerExecutionException(Invalid listener, ex);}}catch (AmqpRejectAndDontRequeueException rejectEx) {/** These will normally be wrapped by an LEFE if thrown by the* listener, but we will also honor it if thrown by an* error handler.*/}}receiveAndExecute接收和处理消息 private boolean receiveAndExecute(final BlockingQueueConsumer consumer) throws Exception { // NOSONARPlatformTransactionManager transactionManager getTransactionManager();if (transactionManager ! null) {try {if (this.transactionTemplate null) {this.transactionTemplate new TransactionTemplate(transactionManager, getTransactionAttribute());}return this.transactionTemplate.execute(status - { // NOSONAR null never returnedRabbitResourceHolder resourceHolder ConnectionFactoryUtils.bindResourceToTransaction(new RabbitResourceHolder(consumer.getChannel(), false),getConnectionFactory(), true);// unbound in ResourceHolderSynchronization.beforeCompletion()try {//接收处理消息return doReceiveAndExecute(consumer);}catch (RuntimeException e1) {prepareHolderForRollback(resourceHolder, e1);throw e1;}catch (Exception e2) {throw new WrappedTransactionException(e2);}});}catch (WrappedTransactionException e) { // NOSONAR exception flow controlthrow (Exception) e.getCause();}}//接收处理消息return doReceiveAndExecute(consumer);}调用具体的消息监听器消费消息 private void doExecuteListener(Channel channel, Object data) {if (data instanceof Message) {Message message (Message) data;if (this.afterReceivePostProcessors ! null) {for (MessagePostProcessor processor : this.afterReceivePostProcessors) {message processor.postProcessMessage(message);if (message null) {throw new ImmediateAcknowledgeAmqpException(Message Post Processor returned null, discarding message);}}}if (this.deBatchingEnabled this.batchingStrategy.canDebatch(message.getMessageProperties())) {this.batchingStrategy.deBatch(message, fragment - invokeListener(channel, fragment));}else {invokeListener(channel, message);}}else {invokeListener(channel, data);}}GitHub代码https://github.com/mingyang66/spring-parent
http://www.dnsts.com.cn/news/139787.html

相关文章:

  • 手机网站建设最新报价seo关键词排名报价
  • 做网站跟网站设计的区别厦门关键词seo排名网站
  • 石家庄工信部网站备案网站开发毕设结论
  • 国内wordpress主题网站网站域名建设
  • 怎么自己创建一个免费网站亚马逊没有网站怎么做seo
  • 电影采集网站怎么做seowordpress作者
  • 开发一个定制的网站网站公众平台建设方案
  • zend studio 网站开发济南哪个网络公司建网站好
  • 比较有名的网站建设平台重庆市建设工程信息
  • 平昌县住房和城乡建设局网站成功的软文推广
  • 白山市网站建设苏州公众号开发公司
  • 网站高端建设开发公司wordpress 悬浮栏
  • 苏州市建设厅网站app网站开发合同
  • 企业网站建设对网络营销的影响深圳做网站推广公司哪家好
  • 南京高端网站制作免费注册企业邮箱域名
  • 网站营销推广计划如何给网站配置域名
  • 番禺网站建设a2345如何招网站开发人员
  • 建设政务网站网页设计html如何换行
  • 电商网站建设课程设计实验报告做网站f12的用处
  • 网站帮助中心设计怎样做淘宝商品链接导航网站
  • 外包网站建设价格tamed wordpress插件
  • 网站建设前需求调研表邢台建设局网站
  • 建设网站涉及的技术常州做网站的公司有哪些
  • 做ps可以在哪些网站上找素材建设一个和聚享游差不多的网站
  • 福鼎建设局网站首页什么叫效果图
  • 网站建设结算系统宣传网站站点最有效的方式是
  • 网站建设哪家技术好宁波工业设计公司排名
  • 环保网站建设公司公司介绍页面设计
  • 嘉定南翔网站建设网站开发可选的方案有
  • 实训报告网站开发oppo商店官网入口