创建网站目录结构应遵循的方法,广州建设总承包集团,武穴建设网站,网站挖掘工具0 大纲
[Apache Flink]2017年12月发布的1.4.0版本开始#xff0c;为流计算引入里程碑特性#xff1a;TwoPhaseCommitSinkFunction。它提取了两阶段提交协议的通用逻辑#xff0c;使得通过Flink来构建端到端的Exactly-Once程序成为可能。同时支持#xff1a;
数据源#…0 大纲
[Apache Flink]2017年12月发布的1.4.0版本开始为流计算引入里程碑特性TwoPhaseCommitSinkFunction。它提取了两阶段提交协议的通用逻辑使得通过Flink来构建端到端的Exactly-Once程序成为可能。同时支持
数据源source和输出端sink
包括Apache Kafka 0.11及更高版本。它提供抽象层用户只需实现少数方法就能实现端到端Exactly-Once语义。
新功能及Flink实现逻辑
描述Flink checkpoint机制如何保证Flink程序结果的Exactly-Once的显示Flink如何通过两阶段提交协议与数据源和数据输出端交互以提供端到端的Exactly-Once保证通过一个简单的示例了解如何使用TwoPhaseCommitSinkFunction实现Exactly-Once的文件输出
1 Flink应用中的Exactly-Once语义
Exactly-Once指每个输入的事件只影响最终结果一次。即使机器或软件故障既没有重复数据也不会丢数据。
Flink很久就提供Exactly-Oncecheckpoint机制是Flink有能力提供Exactly-Once语义的核心。
一次checkpoint是以下内容的一致性快照
应用程序的当前状态输入流的位置
Flink可配置一个固定时间点定期产生checkpoint将checkpoint的数据写入持久存储系统如S3或HDFS。将checkpoint数据写入持久存储是异步即Flink应用程序在checkpoint过程中可以继续处理数据。
如果发生机器或软件故障重新启动后Flink应用程序将从最新的checkpoint点恢复处理 Flink会恢复应用程序状态将输入流回滚到上次checkpoint保存的位置然后重新开始运行。这意味着Flink可以像从未发生过故障一样计算结果。
Flink 1.4.0前Exactly-Once语义仅限Flink应用程序内部没有扩展到Flink数据处理完后发送的大多数外部系统。Flink应用程序与各种数据输出端进行交互开发人员自己维护组件上下文保证Exactly-Once语义。
为提供端到端的Exactly-Once语义 – 即除了Flink应用程序内部Flink写入的外部系统也需要能满足Exactly-Once语义 – 这些外部系统必须提供提交或回滚的方法然后通过Flink的checkpoint机制协调。
分布式系统中协调提交和回滚的常用方法是2pc协议。讨论Flink的TwoPhaseCommitSinkFunction如何利用2pc提供端到端的Exactly-Once语义。
2 Flink应用程序端到端的Exactly-Once语义
Kafka经常与Flink使用。Kafka 0.11版本添加事务支持。这意味着现在通过Flink读写Kafaka并提供端到端的Exactly-Once语义有了必要支持。
Flink对端到端的Exactly-Once语义的支持不仅局限Kafka可将它与任何一个提供必要的协调机制的源/输出端一起使用。如Pravega来自DELL/EMC的开源流媒体存储系统通过Flink的TwoPhaseCommitSinkFunction也能支持端到端的Exactly-Once语义。 示例程序有
从Kafka读取的数据源Flink内置的KafkaConsumer窗口聚合将数据写回Kafka的数据输出端Flink内置的KafkaProducer
要使数据输出端提供Exactly-Once保证须将所有数据通过一个事务提交给Kafka。提交捆绑了两个checkpoint之间的所有要写数据。这确保在故障时能回滚写入的数据。但分布式系统中通常有多个并发运行的写入任务所有组件须在提交或回滚时“一致”才能确保一致结果。Flink使用2PC及预提交阶段解决这问题。
pre-commit
checkpoint开始时即2PC的“预提交”阶段。当checkpoint开始时Flink的JobManager会将checkpoint barrier将数据流中的记录分为进入当前checkpoint与进入下一个checkpoint注入数据流。
brarrier在operator之间传递。对每个operator它触发operator的状态快照写入state backend。 数据源保存了消费Kafka的偏移量(offset)之后将checkpoint barrier传递给下一operator。
这种方式仅适用于operator具有『内部』状态。
内部状态
指Flink state backend保存和管理的。如第二个operator中window聚合算出来的sum值。当一个进程有它的内部状态时除了在checkpoint前需将数据变更写入state backend无需在pre-commit阶段执行其他操作。
Flink负责在checkpoint成功时正确提交这些写入或故障时中止这些写入。 3 Flink应用启动pre-commit阶段
当进程具有『外部』状态需额外处理。外部状态通常以写入外部系统如Kafka的形式出现。此时为提供Exactly-Once保证外部系统须【支持事务】才能和两阶段提交协议集成。
示例数据需写入Kafka因此数据输出端Data Sink有外部状态。此时在预提交阶段
除了将其状态写入state backend数据输出端还必须预先提交其外部事务 当checkpoint barrier在所有operator都传递了一遍并且触发的checkpoint回调成功完成时预提交阶段结束。所有触发的状态快照都被视为该checkpoint的一部分。checkpoint是整个应用程序状态的快照包括预先提交的外部状态。若故障可回滚到上次成功完成快照的时间点。
下一步是通知所有operatorcheckpoint已经成功了。这是2PC的提交阶段JobManager为应用程序中的每个operator发出checkpoint已完成的回调。
数据源和 widnow operator没有外部状态因此在提交阶段这些operator不必执行任何操作。但是数据输出端Data Sink拥有外部状态此时应该提交外部事务。 总结
一旦所有operator完成预提交就提交一个commit。如果至少有一个预提交失败则所有其他提交都将中止我们将回滚到上一个成功完成的checkpoint。在预提交成功之后提交的commit需要保证最终成功 – operator和外部系统都需要保障这点。如果commit失败例如由于间歇性网络问题整个Flink应用程序将失败应用程序将根据用户的重启策略重新启动还会尝试再提交。这个过程至关重要因为如果commit最终没有成功将会导致数据丢失。
因此我们可以确定所有operator都同意checkpoint的最终结果所有operator都同意数据已提交或提交被中止并回滚。
4 在Flink中实现两阶段提交Operator
完整的实现两阶段提交协议可能有点复杂这就是为什么Flink将它的通用逻辑提取到抽象类TwoPhaseCommitSinkFunction中的原因。
接下来基于输出到文件的简单示例说明如何使用TwoPhaseCommitSinkFunction。用户只需要实现四个函数就能为数据输出端实现Exactly-Once语义
beginTransaction – 在事务开始前我们在目标文件系统的临时目录中创建一个临时文件。随后我们可以在处理数据时将数据写入此文件。preCommit – 在预提交阶段我们刷新文件到存储关闭文件不再重新写入。我们还将为属于下一个checkpoint的任何后续文件写入启动一个新的事务。commit – 在提交阶段我们将预提交阶段的文件原子地移动到真正的目标目录。需要注意的是这会增加输出数据可见性的延迟。abort – 在中止阶段我们删除临时文件。
我们知道如果发生任何故障Flink会将应用程序的状态恢复到最新的一次checkpoint点。一种极端的情况是预提交成功了但在这次commit的通知到达operator之前发生了故障。在这种情况下Flink会将operator的状态恢复到已经预提交但尚未真正提交的状态。
我们需要在预提交阶段保存足够多的信息到checkpoint状态中以便在重启后能正确的中止或提交事务。在这个例子中这些信息是临时文件和目标目录的路径。
TwoPhaseCommitSinkFunction已经把这种情况考虑在内了并且在从checkpoint点恢复状态时会优先发出一个commit。我们需要以幂等方式实现提交一般来说这并不难。在这个示例中我们可以识别出这样的情况临时文件不在临时目录中但已经移动到目标目录了。
在TwoPhaseCommitSinkFunction中还有一些其他边界情况也会考虑在内请参考Flink文档了解更多信息。
FAQ
flink sink在如果过来一个checkpoint barrier会去存储state这个动作会和普通的write并行吗还是串行
在Flink的checkpoint机制中,当一个Checkpoint Barrier过来时,sink会触发对状态的snapshot,这个snapshot动作默认是和普通的write操作并行进行的。
具体来说:
Flink的checkpoint机制是通过在datastream中注入Checkpoint Barrier来实现的。 当source接收到Checkpoint Barrier时,会将其传递给下游的transformation和sink。 当sink接收到Checkpoint Barrier时,会启动一个新的线程来执行state snapshot(状态保存)。 这个状态snapshot线程会从状态后端Snapshot State,并存储检查点。 而sink的主线程在接收到Checkpoint Barrier时,会继续处理正常的write。 这样,状态snapshot和正常的write操作就是并行进行的。
但是也可以通过Sink的配置来设置snapshot和write的执行策略,主要有两种模式:
并行模式(默认):snapshot和write同时进行 串行模式:snapshot完成后再进行write
综上,Flink sink在默认的并行checkpoint模式下,状态snapshot和普通的write操作是并行执行的。可以通过配置来改变其行为。这样可以根据实际需要进行平衡。
总结
Flink的checkpoint机制是支持两阶段提交协议并提供端到端的Exactly-Once语义的基础。这个方案的优点是: Flink不像其他一些系统那样通过网络传输存储数据 – 不需要像大多数批处理程序那样将计算的每个阶段写入磁盘。Flink的TwoPhaseCommitSinkFunction提取了两阶段提交协议的通用逻辑基于此将Flink和支持事务的外部系统结合构建端到端的Exactly-Once成为可能。从Flink 1.4.0开始Pravega和Kafka 0.11 producer都提供了Exactly-Once语义Kafka在0.11版本首次引入了事务为在Flink程序中使用Kafka producer提供Exactly-Once语义提供了可能性。Kafaka 0.11 producer的事务是在TwoPhaseCommitSinkFunction基础上实现的和at-least-once producer相比只增加了非常低的开销。 本文由博客一文多发平台 OpenWrite 发布