北京建设网站公司哪家好,电商平台都有哪些,网页设计图标素材,如何知道网站有没有备案Apache RocketMQ 支持 PushConsumer 、 SimpleConsumer 以及 PullConsumer 这三种类型的消费者#xff0c;本文分别从使用方式、实现原理、可靠性重试和适用场景等方面为您介绍这三种类型的消费者。
背景信息
Apache RocketMQ 面向不同的业务场景提供了不同消费者类型本文分别从使用方式、实现原理、可靠性重试和适用场景等方面为您介绍这三种类型的消费者。
背景信息
Apache RocketMQ 面向不同的业务场景提供了不同消费者类型每种消费者类型的集成方式和控制方式都不一样。了解如下问题可以帮助您选择更匹配业务场景的消费者类型。 如何实现并发消费消费者如何使用并发的多线程机制处理消息以此提高消息处理效率 如何实现同步、异步消息处理对于不同的集成场景消费者获取消息后可能会将消息异步分发到业务逻辑中处理此时消息异步化处理如何实现 如何实现消息可靠处理消费者处理消息时如何返回响应结果如何在消息异常情况进行重试保证消息的可靠处理
以上问题的具体答案请参考下文。
功能概述 如上图所示 Apache RocketMQ 的消费者处理消息时主要经过以下阶段消息获取---消息处理---消费状态提交。
针对以上几个阶段Apache RocketMQ 提供了不同的消费者类型 PushConsumer 、SimpleConsumer 和 PullConsumer。这几种类型的消费者通过不同的实现方式和接口可满足您在不同业务场景下的消费需求。具体差异如下 在实际使用场景中PullConsumer 仅推荐在流处理框架中集成使用大多数消息收发场景使用 PushConsumer 和 SimpleConsumer 就可以满足需求。 若您的业务场景发生变更或您当前使用的消费者类型不适合当前业务您可以选择在 PushConsumer 和SimpleConsumer 之间变更消费者类型。变更消费者类型不影响当前Apache RocketMQ 资源的使用和业务处理。 危险 生产环境中相同的 ConsumerGroup 下严禁混用 PullConsumer 和其他两种消费者否则会导致消息消费异常。 PushConsumer
PushConsumers是一种高度封装的消费者类型消费消息仅通过消费监听器处理业务并返回消费结果。消息的获取、消费状态提交以及消费重试都通过 Apache RocketMQ 的客户端SDK完成。
使用方式
PushConsumer的使用方式比较固定在消费者初始化时注册一个消费监听器并在消费监听器内部实现消息处理逻辑。由 Apache RocketMQ 的SDK在后台完成消息获取、触发监听器调用以及进行消息重试处理。
示例代码如下
// 消费示例使用PushConsumer消费普通消息。
ClientServiceProvider provider ClientServiceProvider.loadService();
String topic YourTopic;
FilterExpression filterExpression new FilterExpression(YourFilterTag, FilterExpressionType.TAG);
PushConsumer pushConsumer provider.newPushConsumerBuilder()// 设置消费者分组。.setConsumerGroup(YourConsumerGroup)// 设置接入点。.setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints(YourEndpoint).build())// 设置预绑定的订阅关系。.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))// 设置消费监听器。.setMessageListener(new MessageListener() {Overridepublic ConsumeResult consume(MessageView messageView) {// 消费消息并返回处理结果。return ConsumeResult.SUCCESS;}}).build();
PushConsumer的消费监听器执行结果分为以下三种情况 返回消费成功以Java SDK为例返回ConsumeResult.SUCCESS表示该消息处理成功服务端按照消费结果更新消费进度。 返回消费失败以Java SDK为例返回ConsumeResult.FAILURE表示该消息处理失败需要根据消费重试逻辑判断是否进行重试消费。 出现非预期失败例如抛异常等行为该结果按照消费失败处理需要根据消费重试逻辑判断是否进行重试消费。
PushConsumer 消费消息时若消息处理逻辑出现预期之外的阻塞导致消息处理一直无法执行成功SDK会按照消费超时处理强制提交消费失败结果并按照消费重试逻辑进行处理。消息超时请参见PushConsumer消费重试策略。 出现消费超时情况时SDK虽然提交消费失败结果但是当前消费线程可能仍然无法响应中断还会继续处理消息。 内部原理
在PushConsumer类型中消息的实时处理能力是基于SDK内部的典型Reactor线程模型实现的。如下图所示SDK内置了一个长轮询线程先将消息异步拉取到SDK内置的缓存队列中再分别提交到消费线程中触发监听器执行本地消费逻辑。 可靠性重试
PushConsumer 消费者类型中客户端SDK和消费逻辑的唯一边界是消费监听器接口。客户端SDK严格按照监听器的返回结果判断消息是否消费成功并做可靠性重试。所有消息必须以同步方式进行消费处理并在监听器接口结束时返回调用结果不允许再做异步化分发。消息重试具体信息请参见PushConsumer消费重试策略。
使用PushConsumer消费者消费时不允许使用以下方式处理消息否则 Apache RocketMQ 无法保证消息的可靠性。 错误方式一消息还未处理完成就提前返回消费成功结果。此时如果消息消费失败Apache RocketMQ 服务端是无法感知的因此不会进行消费重试。 错误方式二在消费监听器内将消息再次分发到自定义的其他线程消费监听器提前返回消费结果。此时如果消息消费失败Apache RocketMQ 服务端同样无法感知因此也不会进行消费重试。
顺序性保障
基于 Apache RocketMQ 顺序消息的定义如果消费者分组设置了顺序消费模式则PushConsumer在触发消费监听器时严格遵循消息的先后顺序。业务处理逻辑无感知即可保证消息的消费顺序。
适用场景
PushConsumer严格限制了消息同步处理及每条消息的处理超时时间适用于以下场景 消息处理时间可预估如果不确定消息处理耗时经常有预期之外的长时间耗时的消息PushConsumer的可靠性保证会频繁触发消息重试机制造成大量重复消息。 无异步化、高级定制场景PushConsumer限制了消费逻辑的线程模型由客户端SDK内部按最大吞吐量触发消息处理。该模型开发逻辑简单但是不允许使用异步化和自定义处理流程。
SimpleConsumer
SimpleConsumer 是一种接口原子型的消费者类型消息的获取、消费状态提交以及消费重试都是通过消费者业务逻辑主动发起调用完成
使用方式
SimpleConsumer 的使用涉及多个接口调用由业务逻辑按需调用接口获取消息然后分发给业务线程处理消息最后按照处理的结果调用提交接口返回服务端当前消息的处理结果。示例如下
// 消费示例使用 SimpleConsumer 消费普通消息主动获取消息处理并提交。
ClientServiceProvider provider ClientServiceProvider.loadService();
String topic YourTopic;
FilterExpression filterExpression new FilterExpression(YourFilterTag, FilterExpressionType.TAG);
SimpleConsumer simpleConsumer provider.newSimpleConsumerBuilder()// 设置消费者分组。.setConsumerGroup(YourConsumerGroup)// 设置接入点。.setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints(YourEndpoint).build())// 设置预绑定的订阅关系。.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))// 设置从服务端接受消息的最大等待时间.setAwaitDuration(Duration.ofSeconds(1)).build();
try {// SimpleConsumer 需要主动获取消息并处理。ListMessageView messageViewList simpleConsumer.receive(10, Duration.ofSeconds(30));messageViewList.forEach(messageView - {System.out.println(messageView);// 消费处理完成后需要主动调用 ACK 提交消费结果。try {simpleConsumer.ack(messageView);} catch (ClientException e) {logger.error(Failed to ack message, messageId{}, messageView.getMessageId(), e);}});
} catch (ClientException e) {// 如果遇到系统流控等原因造成拉取失败需要重新发起获取消息请求。logger.error(Failed to receive message, e);
}
SimpleConsumer主要涉及以下几个接口行为 可靠性重试
SimpleConsumer消费者类型中客户端SDK和服务端通过ReceiveMessage和AckMessage接口通信。客户端SDK如果处理消息成功则调用AckMessage接口如果处理失败只需要不回复ACK响应即可在定义的消费不可见时间到达后触发消费重试流程。更多信息请参见SimpleConsumer消费重试策略。
顺序性保障
基于 Apache RocketMQ 顺序消息的定义SimpleConsumer在处理顺序消息时会按照消息存储的先后顺序获取消息。即需要保持顺序的一组消息中如果前面的消息未处理完成则无法获取到后面的消息。
适用场景
SimpleConsumer提供原子接口用于消息获取和提交消费结果相对于PushConsumer方式更加灵活。SimpleConsumer适用于以下场景 消息处理时长不可控如果消息处理时长无法预估经常有长时间耗时的消息处理情况。建议使用SimpleConsumer消费类型可以在消费时自定义消息的预估处理时长若实际业务中预估的消息处理时长不符合预期也可以通过接口提前修改。 需要异步化、批量消费等高级定制场景SimpleConsumer在SDK内部没有复杂的线程封装完全由业务逻辑自由定制可以实现异步分发、批量消费等高级定制场景。 需要自定义消费速率SimpleConsumer是由业务逻辑主动调用接口获取消息因此可以自由调整获取消息的频率自定义控制消费速率。
PullConsumer
使用建议
PushConsumer合理控制消费耗时避免无限阻塞
对于PushConsumer消费类型需要严格控制消息的消费耗时尽量避免出现消息处理超时导致消息重复。如果业务经常会出现一些预期外的长时间耗时的消息建议使用SimpleConsumer并设置好消费不可见时间。