越秀低价网站建设,dz网站恢复数据库,普集网站开发,景区协会官方网站建设在可靠的系统里使用生产者 即使我们尽可能把 broker 配置得很可靠#xff0c;但如果没有对生产者进行可靠性方面的配置#xff0c;整个系统仍然有可能出现突发性的数据丢失。 比如下面的两个例子#xff1a; #xff08;一#xff09;为 broker 配置了 3 个副本#xff0…在可靠的系统里使用生产者 即使我们尽可能把 broker 配置得很可靠但如果没有对生产者进行可靠性方面的配置整个系统仍然有可能出现突发性的数据丢失。 比如下面的两个例子 一为 broker 配置了 3 个副本并且禁用了不完全首领选举这样应该可以保证万无一失。我们把生产者发送消息的** acks 设为 1**只要首领接收到消息就可以认为消息写入成功。
生产者发送一个消息给首领首领成功写入但跟随者副本还没有接收到这个消息。首领向生产者发送了一个响应告诉它“消息写入成功”然后它崩溃了而此时消息还没有被其他副本复制过去。**此时另外两个副本此时仍然被认为是同步的**毕竟判定一个副本不同步需要一小段时间而且其中的一个副本成了新的首领。因为消息还没有被写入这个副本所以就丢失了但发送消息的客户端却认为消息已成功写入。因为消费者看不到丢失的消息所以此时的系统从消费者角度来看仍然是一致的因为副本没有收到这个消息所以消息不算已提交但从生产者角度来看它丢失了一个消息。
二为 broker 配置了 3 个副本并且禁用了不完全首领选举。把生产者的 acks 设为 all。
假设现在往 Kafka 发送消息分区的首领刚好崩溃新的首领正在选举当中Kafka 会向生产者返回“首领不可用”的响应。在这个时候如果生产者没能正确处理这个错误也没有重试发送消息直到发送成功那么消息也有可能丢失。 虽然这算不上是 broker 的可靠性问题因为 broker 并没有收到这个消息。这也不是一致性问题因为消费者并没有读到这个消息。 但如果生产者没能正确处理这些错误就将丢失掉这些消息。
那么我们该如何避免这些问题呢从上面两个例子可以看出每个使用 Kafka的开发人员都要注意两件事情。
根据可靠性需求配置恰当的 acks 值。在参数配置和代码里正确处理错误。
发送确认
生产者可以选择以下 3 种不同的确认模式。
acks0
意味着如果生产者能够通过网络把消息发送出去那么就认为消息已成功写入Kafka。 在这种情况下还是有可能发生错误比如发送的对象无法被序列化或者网卡发生故障但如果是分区离线或整个集群长时间不可用那就不会收到任何错误。 即使是在发生完全首领选举的情况下这种模式仍然会丢失消息因为在新首领选举过程中它并不知道首领已经不可用了。 在 acks0 模式下的运行速度是非常快的这就是为什么很多基准测试都是基于这个模式你可以得到惊人的吞吐量和带宽利用率不过如果选择了这种模式一定会丢失一些消息。
acks1 意味着首领在收到消息并把它写入到分区数据文件不一定同步到磁盘上时会返回确认或错误响应。 在这个模式下如果发生正常的首领选举生产者会在选举时收到一个 LeaderNotAvailableException 异常如果生产者能恰当地处理这个错误它会重试发送消息最终消息会安全到达新的首领那里。
不过在这个模式下仍然有可能丢失数据比如消息已经成功写入首领但在消息被复制到跟随者副本之前首领发生崩溃。
acksall 意味着首领在返回确认或错误响应之前会等待所有同步副本都收到消息。 如果和 min.insync.replicas 参数结合起来就可以决定在返回确认前至少有多少个副本能够收到消息。 这是最保险的做法——生产者会一直重试直到消息被成功提交。不过这也是最慢的做法生产者在继续发送其他消息之前需要等待所有副本都收到当前的消息。 可以通过使用异步模式和更大的批次来加快速度但这样做通常会降低吞吐量。
配置生产者的重试参数
生产者需要处理的错误包括两部分一部分是生产者可以自动处理的错误还有一部分是需要开发者手动处理的错误。 如果 broker 返回的错误可以通过重试来解决那么生产者会自动处理这些错误。 生产者向 broker 发送消息时broker 可以返回一个成功响应码或者一个错误响应码。 错误响应码可以 分为两种一种是在重试之后可以解决的还有一种是无法通过重试解决的。 例如如果 broker 返回的是 LEADER_NOT_AVAILABLE 错误生产者可以尝试重新发送消息。也许在这个时 候一个新的首领被选举出来了那么这次发送就会成功。也就是说LEADER_NOT_AVAILABLE 是一个可重试错误。 另一方面如果 broker 返回的是 INVALID_CONFIG 错误即使通过重试 也无法改变配置选项所以这样的重试是没有意义的。这种错误是不可重试错误。 一般情况下如果你的目标是不丢失任何消息那么最好让生产者在遇到可重试错误时能够保持重试。为什么要这样?因为像首领选举或网络连接这类问题都可以在几秒钟之内得到解决如果让生产者保持重试你就不需要额外去处理这些问题了。
经常会有人 问:“为生产者配置多少重试次数比较好?” 这个要看你在生产者放弃重试并抛出异常之后想做些什么。 如果你想抓住异常并再多重试几次那么就可以把重试次数设置得多一 点让生产者继续重试;如果你想直接丢弃消息多次重试造成的延迟已经失去发送消息1的意义; 如果你想把消息保存到某个地方然后回过头来再继续处理那就可以停止重试。 Kafka 的跨数据中心复制工具默认会进行无限制的重试(例如 retriesMAX_INT)。作为一个具有高可靠性的复制工具它决不会丢失消息。
要注意重试发送一个已经失败的消息会带来一些风险如果两个消息都写入成功会导致消息重复。 例如生产者因为网络问题没有收到 broker 的确认但实际上消息已经写入 成功生产者会认为网络出现了临时故障就重试发送该消息(因为它不知道消息已经写 入成功)。在这种情况下broker 会收到两个相同的消息。 重试和恰当的错误处理可以保 证每个消息“至少被保存一次”但0.10.0版本Kafka无法保证每个消息“只被保存一次”。 现实中的很多应用程序在消息里加入唯一标识符用于检测重复消息消费者在读取消息时可以对它们进行清理。还要一些应用程序可以做到消息的“幂等”。
额外的错误处理
使用生产者内置的重试机制可以在不造成消息丢失的情况下轻松地处理大部分错误不过对于开发人员来说仍然需要处理其他类型的错误包括:
不可重试的 broker 错误例如消息大小错误、认证错误等;在消息发送之前发生的错误例如序列化错误;在生产者达到重试次数上限时或者在消息占用的内存达到上限时发生的错误。
这些错误处理 器的代码逻辑与具体的应用程序及其目标有关。
丢弃“不合法的消息”?把错误记录下来?把这些消息保存在本地磁盘上?回调另一个应用程序?
具体使用哪一种逻辑要根据具体的架构来决定。只要记住如果错误处理只是为了重试发送消息那么最好还是使用生产者内置的重试机制。
参考这里的