当前位置: 首页 > news >正文

保康网站建设永州高端网站建设

保康网站建设,永州高端网站建设,哈尔滨制作网站多少钱,个人网站设计文字内容模板项目地址#xff1a;https://github.com/GTyingzi/BigDATA 该项目是自己在学习大数据过程中整理、总结下来的一份面试小抄。涵盖Hadoop、Spark、Flink、Hive、HBae、Kafka、ES、Zookeeper等。 开源给大家#xff0c;若感觉不错欢迎star~ 摘取Flink部分如下文章目录FlinkFli…项目地址https://github.com/GTyingzi/BigDATA 该项目是自己在学习大数据过程中整理、总结下来的一份面试小抄。涵盖Hadoop、Spark、Flink、Hive、HBae、Kafka、ES、Zookeeper等。 开源给大家若感觉不错欢迎star~ 摘取Flink部分如下 文章目录FlinkFlink介绍Flink架构(重点)作业提交流程高层级视角独立模式YARN集群Flink的水位线(重点)Flink的窗口(重点)窗口分类窗口函数窗口其他APIFlink的Checkpoint(重点)checkpoint保存checkpoint恢复checkpoint算法checkpoint配置SavepointExactly-One(重点)概念输入端保证输出端保证Flink的CEP(重点)概念应用场景模式API模式的检测处理Flink处理背压Flink SQL解析过程Flink Flink介绍 流式大数据处理引擎 内存执行速度 - 速度快任意规模 - 可扩展性强 Flink区别与传统数据处理框架特性如下 高吞吐、低延迟每秒处理数百万个事件毫秒级延迟结果的准确性提供事件事件、处理时间语义。对于乱序事件流仍然能提供一致且准确的结果exactle-once状态一致性保证高可用本身高可用的设置加上与K8s、YARN、Mesos的紧密集成再加上从故障中快速恢复、动态扩展任务的能力Flink能做到以极少的停机事件 7 * 24 全体候运行能够更新应用程序代码将作业迁移到不同的Flink集群而不会丢失应用程序状态 Flink架构(重点) 架构组成作业管理器JobManager任务管理器TaskManagers 启动方式 作为独立集群的进程直接在机器上启动在容器中启动由资源管理平台调度启动如YARN、K8S JobManager Flink集群中任务管理和调度的核心是控制应用执行的主进程 包含三个组件JobMastser、ResourceManager、Dispatcher 1、JobMaster 是JobManager最核心的组件负责所有需要中央协调的操作也负责处理单独的Job(一一对应) 在Job提交时JobMaster会先接收要执行的应用(应用一般指客户端提交来的Jar包、数据流图、作业图) JobMaster会把JobGraph转化成一个物理层面的数据流图包含了所有可并发执行的任务 JobMaster会向ResourceManager发出请求申请执行任务必要的资源当获取足够的资源后就将数据流图发往TaskManager2、ResourceManager 负责资源的分配和管理资源指TaskManager的任务槽(task slots).任务槽是集群中资源调配单元执行计算的一组CPU和内存资源每一个Task都需要分配到一个slot上执行当有新的作业申请资源RM会将空闲槽位的TaskManager分配给JobMaster。若RM没有足够的任务槽它还可以向资源提供平台发起会话请求提供启动TaskManager进程的容器。此外RM还负责停掉空闲的TaskManager释放计算资源Flink内置的RM和其他资源管理管理平台(如YARN)的RM不同针对不同的环境和资源管理管理平台(Standalone、YARN)有不同的具体实现-Standalone部署时TaskManager单独启动没有Per-Job模式RM只能分发TaskManager的任务槽,不能单独启动新的TaskManager3、Dispatcher 提供一个REST接口用来提交应用 为每一个新提交的作业启动一个新的JobMaster组件 启动Web UI,展示、监控作业执行的信息TaskManager Flink中的工作进程数据流的具体计算者被称为Worker Flink集群中至少有一个TaskManager分布式计算会有多个 每个TaskManager都包含一定数量的任务槽(task slots)slot的数量现在TaskManager能够并行处理的任务数量启动之后TaskManager会向RM注册它的slots收到RM指令后TaskManager会将一个或者多个槽位提供给JobMaster调用JobMaster分配任务来执行 在执行过程中TaskManager可以缓存数据还可以和运行于同一应用的其他TaskManager交换数据作业提交流程 高层级视角 1、客户端(APP)通过分发器提供的REST接口将作业提交给JobManager 2、由分发器启动JobMaster并将作业(含JobGraph)提交给JobMaster 3、JobMaster将JobGraph解析为可执行的ExxecutionGraph得到所需的资源数量然后向RM请求资源(slots) 4、RM判断当前是否有足够的可用资源如果没有就启动新的TaskManager 5、TaskManager启动之后向RM注册自己的可用任务槽 6、RM通知TaskManager为新的作业提交slots 7、TaskManager连接到对应的JobMaster提供slots 8、JobMaster将需要执行的任务分发给TaskManager 9、TaskManager执行任务互相之间可以交换数据独立模式 在独立模式下只有会话模式和应用模式两者整体来看流程十分相似TaskManager都需要手动启动当RM收到JobMaster的请求时会要求TaskManager提供资源。JobMaster启动时间点会话模式预先启动应用模式则在作业提交时启动YARN集群 有三类模式会话模式(Session)、单作业模式(Per-Job)、应用模式(Application) 1、会话模式 在会话模式下需要先启动一个YARN session。这个会话会创建一个Flink集群集群只启动JobManager而TaskManager根据需要动态启动。在JobManager内部由于还没有提交作业只有RM、Dispatcher在运行1、客户端通过REST接口将作业提交给分发器 2、分发器启动JobMaster并将作业(含JobGraph)提交给JobMaster 3、JobMaster向资源管理器请求资源slots 4、资源管理器向YARN的资源管理器请求container资源 5、YARN启动新的TaskManager容器 6、TaskManager启动之后Flink的资源管理器注册自己的可用任务槽 7、资源管理器通知TaskManager为新的作业提供slots 8、TaskManager连接到对应的JobMaster提供slots 9、JobMaster将需要执行的任务分发给TaskManager执行任务2、单作业 1、客户端将作业提交给YARN的RM同时将Flink的Jar包和配置上传到HDFS以便后续启动Flink相关组件的容器 2、YARN的RM分配Container资源启动Flink JobManager并将作业提交给JobMaster这里省略了Dispatcher组件 3、JobMaster向RM请求资源(slots) 4、RM向YARN的RM请求container资源 5、YARN启动新的TaskManager容器 6、TaskManager启动之后向Flink的RM注册自己的可用任务槽 7、RM通知TaskManager为新的作业提供slots 8、TaskManager连接到对应的JobMaster提供slots 9、JobMaster将需要执行的任务分发给TaskManager执行任务3、应用模式 与作业模式的提交流程非常相似只是初始提交给YARN的RM不再是具体作业而是整个应用。每个应用可能包含多个作业Flink的水位线(重点) 先引申Flink中的时间语义处理时间、事件时间 处理时间执行处理操作的机器的的系统时间事件时间数据生成的时间自带一个时间戳(timestamp) 水位线定义在事件时间的语义下不依赖于系统时间基于数据自带的时间戳去定义一个时钟表示当前时间的进展 水位线的特性 基于数据的时间戳生成插入数据流中的一个标记通过设置延迟保证正确处理乱序数据水位线t代表数据流中事件时间已经到达t时间戳小 t的数据均到齐了 水位线的生成策略DataStream API中单独生成水位线的方法assignTimestampsAndWatermarks()传入watermarkStrategy参数 public SingleOutputStreamOperatorT assignTimestampsAndWatermarks(WatermarkStrategyT watermarkStrategy)public interface WatermarkStrategyT extends TimestampAssignerSupplierT,WatermarkGeneratorSupplierT{OverrideTimestampAssignerT createTimestampAssigner(TimestampAssignerSupplier.Context context);OverrideWatermarkGeneratorT createWatermarkGenerator(WatermarkGeneratorSupplier.Context context); }TimestampAssigner负责从流中数据元素的某个字段提取时间戳并分配给元素 WatermarkGenerator按照既定方式基于时间戳生成水位线。接口有两个方法onEventonPeriodcEmit onEvent每个数据到来时都会调用的方法它的参数有当前数据、时间戳、WatermarkOutput onPeriodcEmit周期性调用的方法由WatermarkOutput发出水位线。周期时间为处理时间可调用环境配置的setAutoWatermarkInterva()方法来设置默认为200ms public interface WatermarkGeneratorT {void onEvent(T event, long eventTimestamp, WatermarkOutput output);void onPeriodicEmit(WatermarkOutput output); }内置水位线生成器有序 - forMonotonousTimestamps()、无序 - forBoundedOutOfOrderness() 这两个等价 WatermarkStrategy.forMonotonousTimestamps() WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(0))注意乱序流中生成的水位线真正的时间戳最大时间戳 - 延迟时间 - 1单位为毫秒 public void onPeriodicEmit(WatermarkOutput output) {output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1)); }水位线的传递实际应用中上下游都有多个并行子任务为了统一推进事件时间的进展要求上游任务处理完水位线、时钟改变之后将当前水位线广播给所有下游任务。上游并行子任务发来不同的水位线当前任务会为每一个分区设置一个分区水位线当前任务时钟取最小水位线 水位线的总结 在事件时间的世界里面承担了时钟的角色是唯一的时间尺度水位线的默认计算公式水位线 观察到的最大事件时间 - 最大延迟时间 - 1 ms数据流开始时插入一个负无穷大的水位线水位线结束时插入一个正无穷大的水位线以保证所有窗口合闭、所有定时器被触发对于离线数据集Flink将其作为流读入。Flink只会插入两次水位线最开始插入负无穷大结束处插入正无穷大 Flink的窗口(重点) 将无限数据切割成有限大数据块处理无界流的核心 窗口如桶将每个数据发到对应的桶中当到达窗口结束时间时就对每个桶中收集的数据进行计算处理 动态创建当有落在这个窗口区间范围的数据到达时才会创建对应的窗口窗口关闭到达窗口结束时间时窗口就触发计算并关闭 窗口分类 按照驱动型分类时间窗口、计数窗口 1、时间窗口 窗口大小 结束时间 - 开始时间 Flink有一个专门的TimeWindow类表示时间窗口该类只有两个私有属性private final long start;private final long end; 可以调用公有getStart()和getEnd()方法直接获取这两个时间戳。另外提供了maxTimeStamp()获取窗口中包含的最大时间戳 public long maxTimestamp(){return end - 1; } 左闭右开[start,end)2、计数窗口 基于元素的个数来截取数据到达固定个数时就触发计算并关闭窗口 底层通过全局窗口(Global Window)实现按照窗口分配数据的规则分类滚动窗口(Tumbling)、滑动窗口(Sliding)、会话窗口(Session)、全局窗口(Global) 滚动窗口 滚动窗口有固定的大小是一种对数据进行均匀切片的划分方式首尾相接每个数据都会被分配且只属于一个窗口 滚动窗口可以基于时间、个数定义滑动窗口 滑动窗口大小固定但窗口之间并不是首尾相接有部分重合 滑动窗口可以基于时间、个数定义 定义滑动窗口的两个参数窗口大小、滑动步长。数据分配到多个窗口的个数 窗口大小 / 滑动步长会话窗口 会话窗口长度不固定、起始和结束时间不确定各个分区窗口之间无关联会话窗口之间不会重叠 会话窗口只能基于时间定义终止的标志是隔一段时间没有数据到来 size两个会话窗口之间的最小距离。可设置静态固定的size也可以自定义提取器动态提取最小间隔gap的值 在Flink底层对会话窗口有特殊的处理每来一个新数据都会创建一个新会话窗口判断已有窗口之间的距离。如果小于给定的size就将它们合并全局窗口 无界流的数据永无止境窗口没有结束的时候默认不做触发计算若希望对数据进行计算处理需自定义触发器(Trigger) 相同key的所有数据都会分配到同一个窗口中窗口函数 定义窗口分配决定数据属于哪个窗口定义窗口函数如何进行计算 由处理方式分为两类增量聚合函数、全窗口函数 增量聚合函数 窗口对无限流的切分可看作得到一个有界数据集。对每来一条数据立即计算中间保持一个简单的聚合状态等到窗口结束时拿出之前聚合的状态直接输出 典型的两个增量函数归约函数(ReduceFunction)、聚合函数(AggregateFunction) 归约函数将窗口收集到的数据两两进行归约实现增量式大聚合 public interface ReduceFunctionT extends Function, Serializable {T reduce(T value1, T value2) throws Exception;//注意输入数据类型、聚合状态类型、输出类型一致 }聚合函数更加灵活的进行窗口聚合操作,AggregateFunction可看作是ReduceFunction的通用版本。输入类型(IN)、累加器类型(ACC)、输出类型(OUT) createAccumulator创建累加器为聚合创建初始状态add将输入元素添加到累加器中返回一个新的累加器值getResult从累加器中提取聚合输出的结果merge合并两个累加器并将合并的状态作为累加器返回。该方法只在合并窗口场景下调用一般是会话窗口 public interface AggregateFunctionIN, ACC, OUT extends Function, Serializable{ACC createAccumulator();ACC add(IN value, ACC accumulator);OUT getResult(ACC accumulator);ACC merge(ACC a, ACC b); }全窗口函数 全窗口需要先收集窗口中的数据并在内部缓存起来等到窗口要输出结果的时候再取出数据进行计算 典型的批处理思路——攒数据等一批到齐后再正式启动处理流程 全窗口函数有两种窗口函数(WindowFunction)、处理窗口(ProcessWindowFunctio) 窗口函数当窗口到达结束时间时需要触发计算调用apply()。从input集合中取出窗口收集的数据结合key和windowx信息通过收集器输出结果。目前WindowFunction的作用已被ProcessWindowFunction全覆盖 public interface WindowFunctionIN, OUT, KEY, W extends Window extends Function,Serializable {void apply(KEY key, W window, IterableIN input, CollectorOUT out) throws Exception; }处理窗口函数Window API中最底层的通用窗口函数接口可以获取到一个上下文对象 public abstract class ProcessWindowFunctionIN, OUT, KEY, W extends Windowextends AbstractRichFunction {public abstract void process(KEY key, Context context, IterableIN elements, CollectorOUT out) throws Exception;public void clear(Context context) throws Exception {}public abstract class Context implements java.io.Serializable {public abstract W window();public abstract long currentProcessingTime();public abstract long currentWatermark();public abstract KeyedStateStore windowState();public abstract KeyedStateStore globalState();public abstract X void output(OutputTagX outputTag, X value);} }增量聚合和全窗口的总结 增量聚合的优点高效、输出更加实时 全窗口的优点提供更多的信息更加通用的窗口操作实际应用中可以兼具这两者的优点结合使用 第一个参数(增量聚合函数)处理窗口数据每来一个数据做一次聚合 第二个参数(全窗口函数)等到窗口需要触发计算时进行逻辑输出此时全窗口函数不再缓存所有数据而将增量聚合的几个当做Iterable输出窗口其他API 对于窗口算子窗口分配器和窗口函数是必须的。选用其他一些API能更加灵活控制窗口行为触发器、移除器、允许延迟、侧输出流 触发器 Trigger是窗口算子的内部属性每个窗口分配器都会有一个对应的默认触发器Flink内置窗口类型的触发器都已经实现 所有事件时间窗口默认的触发器都是EventTimeTrigger类似还有ProcessingTimeTrigger、CountTrigger public abstract class TriggerT, W extends Window implements Serializable {public abstract TriggerResult onElement(T element,long timestamp,W window,TriggerContext ctx) throws Exception;public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;public abstract void clear(W window, TriggerContext ctx) throws Exception;public boolean canMerge() {return false;}public void onMerge(W window, OnMergeContext ctx) throws Exception {throw new UnsupportedOperationException(This trigger does not support merging.);}public interface TriggerContext {long getCurrentProcessingTime();MetricGroup getMetricGroup();long getCurrentWatermark();void registerProcessingTimeTimer(long time);void registerEventTimeTimer(long time);void deleteProcessingTimeTimer(long time);void deleteEventTimeTimer(long time);S extends State S getPartitionedState(StateDescriptorS, ? stateDescriptor);DeprecatedS extends Serializable ValueStateS getKeyValueState( String name, ClassS stateType, S defaultState);DeprecatedS extends Serializable ValueStateS getKeyValueState(String name, TypeInformationS stateType, S defaultState);}public interface OnMergeContext extends TriggerContext {S extends MergingState?, ? void mergePartitionedState(StateDescriptorS, ? stateDescriptor);} onElement窗口中每来一个元素调用该方法onEventTime注册的事件时间定时触发时调用该方法onProcessingTime注册的处理时间定时器触发时调用该方法clear窗口关闭销毁时调用该方法用来清除自定义的状态 上面的前三个方法返回类型都是TriggerResult这是一个枚举类型其中定义了对窗口进行操作的四种类型 CONTINUE(继续)什么都不做FIRE(触发)触发计算输出结果PURGE(清除)清空窗口中所有数据销毁窗口FIRE_AND_PURGE(触发并清除)触发计算输出结果并清除窗口 移除器 用来定义移除某些数据的逻辑默认情况下预实现的移除器都是在执行窗口函数之前移除数据的 public interface EvictorT, W extends Window extends Serializable {void evictBefore(IterableTimestampedValueT elements,int size,W window,EvictorContext evictorContext);void evictAfter(IterableTimestampedValueT elements,int size,W window,EvictorContext evictorContext);interface EvictorContext {long getCurrentProcessingTime();MetricGroup getMetricGroup();long getCurrentWatermark();}evictBefore执行窗口函数之前的移除数据操作evictAfter执行窗口函数之后的移除数据操作 允许延迟 事件时间语义下乱序流中可能出现数据迟到的情况水位线并不一定能保证时间戳更早的所有数据不会再来为窗口设置一个运行的最大延迟。水位线 窗口结束时间 延迟时间 public WindowedStreamT, K, W allowedLateness(Time lateness) {builder.allowedLateness(lateness);return this;}侧输出流 将未收入窗口的迟到数据放入侧输出流(side output)进行另外的处理。 sideOutputLateData() 方法传入一个输出标签用来标记的迟到数据流 public WindowedStreamT, K, W sideOutputLateData(OutputTagT outputTag) {outputTag input.getExecutionEnvironment().clean(outputTag);builder.sideOutputLateData(outputTag);return this;}getSideOutput()方法传入对应的输出标签获取迟到数据所在的流 public X DataStreamX getSideOutput(OutputTagX sideOutputTag) {sideOutputTag clean(requireNonNull(sideOutputTag));sideOutputTag new OutputTagX(sideOutputTag.getId(), sideOutputTag.getTypeInfo());TypeInformation? type requestedSideOutputs.get(sideOutputTag);if (type ! null !type.equals(sideOutputTag.getTypeInfo())) {throw new UnsupportedOperationException(A side output with a matching id was already requested with a different type. This is not allowed, side output ids need to be unique.);}requestedSideOutputs.put(sideOutputTag, sideOutputTag.getTypeInfo());SideOutputTransformationX sideOutputTransformation new SideOutputTransformation(this.getTransformation(), sideOutputTag);return new DataStream(this.getExecutionEnvironment(), sideOutputTransformation);}Flink的Checkpoint(重点) Flink容错机制的核心。检查是针对故障恢复的结果而言在有状态的流处理中任务继续处理新数据不需要之前的计算结果而是需要任务之前的状态。故障恢复之后应该于发生故障前完全一致需要检查结果的正确性因此Checkpoint又称为一致性检查点 checkpoint保存 检查点保存周期性地触发保存 保存的时间点当所有任务都处理完同一条输入数据时对任务状态做快照保存下来。 检查点存储将快照写入外部存储由状态后端的检查点存储(CheckpointStorage)来决定有两种选择 作业管理器的堆内存(JobManagerCheckpointStorage)、文件系统(FileSystemCheckpointStorage) checkpoint恢复 在允许流处理程序是Flink周期性地保存检查点当发生故障时找到最近一次成功保存的检查点来恢复状态 已处理完三个数据后保存了一个检查点之后又正常处理一个数据flink在处理第五个数据hello时发生故障。这里Source任务处理完毕偏移量为5Map任务已经完成在Sum任务处理中发生故障此时状态并未保存需要从检查点来恢复状态 (1)重启应用 遇到故障后第一步重启将所有任务的状态清空 (2)读取检查点重置状态 找到最后一次保存的检查点从中读取每个算子任务状态的快照填充到对应的状态中。此时恢复到第三个数据处理完毕key为flink的数据还没有来 (3)重放数据 从保存检查点后开始重新读取数据通过Source任务向外部数据源重新提交偏移量 (4)继续处理数据 接下来正常处理数据即可当处理到5个数据时就已经追上发生故障时的系统状态了 checkpoint算法 检查点分界线(Barrier)借鉴水位线设计在数据流中插入一个特殊的数据结构专门用来表示触发检查点保存的时间点 JobManager有一个检查点协调器(checkpointcoordinator),专门用来协调处理检查点的相关工作定期向JobManager发出指令要求保存检查点TaskManager会让所有Source任务把偏移量保存起来并将带有检查点ID的分界线插入到当前数据流每个算子任务只要处理到该分界线就会将当前状态进行快照 分布式快照算法Flink使用了Chandy-Lamport算法的一种变体被称为异步分界线快照算法(asynchronous barrier snapshotting) 当上游任务向多个并行下游任务发送barrier时需要广播出去当多个上游任务向同一个下游任务发生barrier时需要在下游任务执行分界线对齐操作等所有并行分区的barrier都到齐才开始状态保存 为了详细介绍检查点算法原理以下对WordCount程序进行扩展考虑所有算子并行度为2的场景 检查点算法保存的具体过程如下 (1)JobManager发送指令触发检查点的保存在Source任务中插入分界线将偏移量保存 JobManager会周期性地向每个TaskManager发送一条带有新检查点ID的消息通过这种方式启动检查点收到指令后TaskManager会在所有Source任务中插入分界线并将偏移量保存到远程的持久化存储中 (2)状态快照保存分界线向下游传递 偏移量存入持久化存储后会返回通知Sourcer任务Source任务就会向JobManager确认检查点完成随后将barrier向下游传递 (3)向下游多个并行子任务广播分界线执行分界线对齐 Map任务没有状态故直接将barrier继续向下游传递这时由于进行keyBy分区故需要将barrier广播到下游并行的两个Sum任务。同时Sum任务可能收到来自上游两个并行Map任务的barrier故需要执行分界线对齐 (4)分界线对齐后保存状态到持久化存储 各个分区的分界线都对齐后就可以对当前状态做快照保存到持久化存储存储完成通知JobManager保存完毕将barrier继续向下游传递。整个过程中每个任务保存自己的状态都是相对独立的互不影响 (5)先处理缓存数据然后正常继续处理 分界线对齐要求先达到的分区做缓存等待一定程度上会影响处理速度当出现背压(backpressure)时下游任务会堆积大量的缓冲数据检查点可能需要很久才可以保存完毕。Flink1.11之后提供了不对齐的检查点保存方式可将未处理的缓冲数据也保存进检查点此时当我们遇到一个分区barrier时就不需要等待对齐了而是直接启动状态的保存 当JobManager收到所有任务成功保存状态的信息就可以确认当前检查点成功保存之后遇到故障就可以从这里恢复了 checkpoint配置 检查点的作用是为了故障恢复我们不能因为保存检查点占据大量时间导致数据处理性能明显降低。可在代码中对检查点进行配置 1、启动检查点 默认情况下Flink程序是禁用检查点启动保存快照功能 StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(1000);// 每隔 1 秒启动一次检查点保存默认500ms对性能影响小调大间隔时间故障重启后迅速赶上实时地数据处理调小间隔时间 2、检查点存储 检查点持久化存储位置取决于CheckpointStorage的设置默认存储在JobManager的堆内存中 两种方案作业管理器的堆内存(JobManagerCheckpointStorage)、文件系统(FileSystemCheckpointStorage) // 配置存储检查点到 JobManager 堆内存 env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage()); // 配置存储检查点到文件系统 env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage(hdfs://namenode:40010/flink/checkpoints));3、其他高级配置 通过检查点配置来进行设置 CheckpointConfig checkpointConfig env.getCheckpointConfig();检查点模式(CheckpointingMode)设置检查点一致性的级别精确一次(exactlt-once)和至少一次(at-least-once)两项默认精确一次超时时间(checkpointTimeout)指定检查点保存的超时时间超时没完成就会被丢弃掉最小间隔时间(minPauseBetweenCheckpoints)指定上一个检查点完成之后检查点协调器最快等多久可以保存下一个检查点的指令。指定该参数时maxConcurrentCheckpoints强制设为1最大并发检查点数量(maxConcurrentCheckpoints)指定运行中的检查点最多可以有多少个开启外部持久化存储(enableExternalizedCheckpoints)默认在作业失败时不会自动清理释放空间需要手动清理 DELETE_ON_CANCELLATION作业取消时会自动删除外部检查点若作业失败退出会保留检查点RETAIN_ON_CANECLLATION作业取消时保留外部检查点 检查点异常任务失败(failOnCheckpointingErrors)指定检查点发生异常时是否应该让任务直接失败退出默认为true不对齐检查点(enableUnalignedCheckpoints)不再执行检查点的分界线对齐操作启用之后可以大大减少产生背压时的检查点保存时间设置要求检查点模式为exactly-once并且并非检查点个数为1 StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(); // 启用检查点间隔时间 1 秒 env.enableCheckpointing(1000); CheckpointConfig checkpointConfig env.getCheckpointConfig(); // 设置精确一次模式 checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 最小间隔时间 500 毫秒 checkpointConfig.setMinPauseBetweenCheckpoints(500); // 超时时间 1 分钟 checkpointConfig.setCheckpointTimeout(60000); // 同时只能有一个检查点 checkpointConfig.setMaxConcurrentCheckpoints(1); // 开启检查点的外部持久化保存作业取消后依然保留 checkpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // 启用不对齐的检查点保存方式 checkpointConfig.enableUnalignedCheckpoints(); // 设置检查点存储可以直接传入一个 String指定文件系统的路径 checkpointConfig.setCheckpointStorage(hdfs://my/checkpoint/dir)Savepoint 除了检查点外Flink提供了另一个非常独特的镜像保存功能——保存点(Savepoint) 它的原理和算法与检查点完全相同通过检查点机制来创建流式作业状态的一致性镜像(consistent image)多了额外的元数据 保存点中的状态快照是以算子ID和状态名称组织的相当于一个键值对。从保存点启动应用程序时Flink会将保存点的状态数据重新分配给相应的算子任务 保存点的用途 保存点与检查点最大的区别就是触发的时机。 检查点由Flnk自动管理定期创建发生故障之后自动读取进行恢复自动存盘的功能。主要用来做故障恢复容错机制的核心保存点由用户明确地手动触发保存手动存盘。有计划的手动备份和恢复 保存点可以当做一个强大的运维工具使用在需要的时候创建保存点然后停止应用做一些处理调整之后再从保存点重启 版本管理和归档存储对重要节点手动备份设置为某一版本归档存储应用程序的状态 更新Flink版本当Flink版本升级时不需要重新执行所有计算只要创建一个保存点停掉应用、升级Flink后从保存点重启处理 更新应用程序在程序兼容情况下状态的拓扑结构和数据类型都不变直接更新应用程序从之前的保存点加载 修复应用应用程序中的逻辑Bug更新后接着处理应用于不同业务逻辑的场景如A/B测试等 调整并行度在应用运行的过程中发现需要的资源不足或已经有了大量剩余可以通过保存点重启的方式将应用程序的并行度增大或减少 暂停应用程序有时我们不需要调整集群或更新程序只是单纯地希望把应用暂停、释放一些资源来处理更处理更加重要的应用程序使用保存点就可以灵活实现应用的暂停、重启可对有限的集群资源做最好的优化配置 注意保存点能够在程序更改的时候依然兼容前提是状态的拓扑结构和数据类型不变。保存点中状态都是以算子ID-状态名称这样的key-value组织起来算子ID可在代码中直接调用SingleOutputStreamOperator的uid()来进行指定 DataStreamString stream env.addSource(new StatefulSource()).uid(source-id).map(new StatefulMapper()).uid(mapper-id).print();对于没有设置DI的算子Flink默认会自动进行设置在重新启动应用后可能会导致ID不同而无法兼容以前状态为了方便后续维护强烈建议在程序中为每一个算子手动指定ID Exactly-One(重点) 概念 一致性是结果的正确性对于分布式系统而言强调的是不同节点中相同数据的副本应该总是一致的。多个节点并行处理不同的任务要保证计算结构的正确性必须不漏掉任何一个数据不重复处理同一个数据在发生故障、需要恢复状态进行回滚时就需要更多的保障机制了通过检查点的保存来保证状态恢复后的结果正确 状态一致性的三种级别最多一次(AT-MOST-ONCE)、至少一次(AT-LEAST-ONCE)、精确一次(EXACTLY-ONCE) 最多一次任务发生故障时直接重启。既不恢复丢失的状态也不会重放丢失的数据每个数据再正常情况下被处理一次至少一次所有数据都会丢、都能被处理有些数据可能被重复处理精确一次最严格的一致性保证所有数据有且只会被处理一次 完整的流处理应用包括了数据源、流处理器、外部存储系统三个部分。这个完整应用的一致性叫做端到端的状态一致性取决于三个组件中最弱的一环。能否达到at-least-once一致性级别主要看数据源能否重放数据。而能否达到exactly-once级别数据源、流处理器内部、外部存储系统都要有相应的保证机制 对于Flink内部来说检查点机制可以保证故障故障恢复后数据不丢失且只处理一次。达到exactly-once的一致性语义了 端到端的一致性关键点输入数据源端、输出外部存储端 输入端保证 Flink读取的外部数据源对于一些数据源来说并不提供数据的缓冲或持久化保存数据被消费之后就彻底不存在了故障后通过检查点恢复到之前状态但保存检查点到发生故障期间的数据不能重发就会导致数据丢失只能保证at-most-once的一致性语义 想要在故障恢复后不丢失数据外部数据源就必须拥有重放数据的能力。常见的做法就是对数据进行持久化保存并且可以重设数据的读取位置。一个最经典的应用就是Kafka在Flink的Source任务中将数据读取的偏移量保存为状态在故障恢复时从检查点中读取处理对数据源重置偏移量重新获取数据 数据可重复 检查点 - at-least-once一致性语义的基本要求 输出端保证 数据有可能重复写入外部系统检查点保存之后继续到来的数据会一一处理任务的状态也会更新最终通过Sink任务将计算结果输出到外部系统 状态改变还没有存入下一个检查点中这时出现故障数据都会重新来一遍计算两次 对于内部系统来说重复计算的动作无影响状态回滚最终改变只有一次对于外部系统来说已经写入的结果无法收回再次执行就会把同一个数据写入两次 为了实现端到端的exactly-once需要对外部存储系统、sink连接器有额外的要求两种方法幂等写入、事务写入 幂等写入 一个操作可以重复执行很多次但只导致一次结果更改。在数据科学领域对e^x求导不变在数据处理领域对HashMap的插入相同键值对的重复插入不起作用 并没有真正解决数据重复计算、写入的问题。限制在于外部存储系统必须支持这样的幂等写入例如Redis中的键值存储关系型数据库(如MySQL)中满足查询条件的更新操作 事务写入 事务的四个基本特性(ACID)原子性(Atomicity)、一致性(Correspondence)、隔离性(Isolation)、持久性(Durability) 事务写入的基本思想用一个事务来进行数据向外部系统的写入这个事务时与检查点绑定在一起的当Sink任务遇到barrier时开始保存状态的同时开启一个事务接下来所有数据都写入到该事务中。待到检查点保存完毕将事务提交所有写入的数据就真正可用了若中间过程出现故障状态会回退到上一个检查点当前事务没有用正常关闭也会回滚写入外部的数据就将被撤销 有两种实现方式预写日志(WAL)、两阶段提交(2PC) 预写日志 1、将结果数据作为日志状态保存起来2、进行检查点保存时将结果一并做持久化存储3、在收到检查点完成的通知时将所有结果一次性写入外部系统 DataStream API提供了一个模板类GenericWriteAheadSink用来实现这种事务型的写入方式 注意预写日志这种一批写入的方式可能会写入失败在执行写入动作之后 必须等待发送成功的返回确认消息在成功写入所有数据后在内部再次确认相应的检查点才代表着检查点的真正完成这里需要将确认信息也进行持久化保存在故障恢复时只有存在对应的确认信息才能保证这批数据已经写入可以恢复到对应的检查点位置 这种再次确认的方式也会有一些缺陷。当检查点已经成功保存、数据也成功地写入到外部系统但最终保存确认信息时出现了故障Flink最终还是会认为没有成功写入。故发生故障不会使用该检查点而是需要回退到上一个会导致这一批数据的重复写入 两阶段提交 分成两个阶段先做预提交等检查点完成之后再正式提交这种提交放是真正基于事务的它需要外部系统提供事务支持 1、当第一条数据到来或收到检查点分界线时Sink任务都会启动一个事务2、接下来收到的所有数据都会通过这个事务写入外部系统这时由于事务没有提交数据尽管写入了外部系统不可用处于预提交状态3、当Sink任务收到JobManager发来检查点完成的通知时正式提交事务写入的结果就真正可用了 Flink提供了TwoPhaseCommitSinkFunction接口方便我们自定义两阶段提交对SinkFunction的实现。两阶段提交精巧同时对外部系统有较高对要求 外部系统必须提供事务支持或者Sink任务必须能模拟外边系统上的事务在检查点的间隔期间必须能够开启一个事务并接受数据写入在收到检查点完成的通知前事务必须是等待提交的状态在故障恢复的情况下可能需要一些时间。如果这时候外部系统关闭事务那么未提交的数据就会丢失Sink任务必须能够在进程失败后恢复事务提交事务必须是幂等操作 Flink的CEP(重点) 概念 复杂事件处理(Complex Event ProcessingCEP)针对流处理而言分析的是低延迟、频繁产生的事件流检测特定的数据组合 CEP的流出可分为三个步骤 1、定义一个匹配规则2、将匹配规则应用到事件流上检测满足规则的复杂事件3、对检测到的复杂事件进行处理得到结果进行输出 模式(Pattern)CEP第一步定义对匹配规则 每个简单事件的特征简单事件之间对组合关系 近一步可扩展模式的功能匹配检测的时间限制每个简单事件是否可重复出现等 事件的组合关系称之为近邻关系 严格的近邻关系两个事件之间不能有任何其他事件宽松的近邻关系相对顺序正确中介可以有其他事件 可以反向定义谁后面不能跟谁。模式还可以设定在时间范围内没有满足匹配条件会导致模式匹配超时 应用场景 CEP主要用于实时流数据对分析处理旨在分析复杂的、看似不相关的事件流中找出有意义的事件组合进而可以实时地分析判断、输出通知信息或报警 风险控制针对用户的异常行为进行实时检。例如当用户短时间内频繁登录失败、大量下单却不支付用户画像对用户行为轨迹进行实时跟踪从而检测出具有特定行为习惯对一些用户做出相应的用户画像精准营销运维监控对于企业服务对运维管理利用CEP灵活配置多指标、多依赖来实现更复杂对监控模式 很多大数据框架如Spark、Samza、Beam等都提供了不同对CEP解决方案但没有专门的库。Flink提供了目前CEP的最佳解决方案 模式API Flink CEP的核心是复杂事件的模式匹配 个体模式每个简单事件都有一定的条件规则。个体模式都是以连接词开始定义比如begin、next等这些都是Pattern对象对象对方法返回Pattern对象个体模式需要一个过滤条件用来指定具体的匹配规则一般调用where()实现传入SimpleCondition内对filter方法 量词给个体模式增加一个量词可以让其循环匹配接收多个事件。 oneOrMore()匹配事件出现至少一次。a.oneOrMore()表示匹配至少1个a的事件组合times(times)匹配事件发生特定次数。a.times(3)表示aaa事件times(fromTimestoTimes)匹配事件出现的次数范围greedy()只能用在循环模式使当前循环模式变得贪心。尽可能多的去匹配optional()当前模式可选 // 匹配事件出现 4 次 pattern.times(4); // 匹配事件出现 4 次或者不出现 pattern.times(4).optional(); // 匹配事件出现 2, 3 或者 4 次 pattern.times(2, 4); // 匹配事件出现 2, 3 或者 4 次并且尽可能多地匹配 pattern.times(2, 4).greedy(); // 匹配事件出现 2, 3, 4 次或者不出现 pattern.times(2, 4).optional(); // 匹配事件出现 2, 3, 4 次或者不出现并且尽可能多地匹配 pattern.times(2, 4).optional().greedy(); // 匹配事件出现 1 次或多次 pattern.oneOrMore(); // 匹配事件出现 1 次或多次并且尽可能多地匹配 pattern.oneOrMore().greedy(); // 匹配事件出现 1 次或多次或者不出现 pattern.oneOrMore().optional(); // 匹配事件出现 1 次或多次或者不出现并且尽可能多地匹配 pattern.oneOrMore().optional().greedy(); // 匹配事件出现 2 次或多次 pattern.timesOrMore(2); // 匹配事件出现 2 次或多次并且尽可能多地匹配 pattern.timesOrMore(2).greedy(); // 匹配事件出现 2 次或多次或者不出现 pattern.timesOrMore(2).optional() // 匹配事件出现 2 次或多次或者不出现并且尽可能多地匹配 pattern.timesOrMore(2).optional().greedy();条件匹配事件的核心在于匹配条件选取事件的规则 主要有简单条件、迭代条件、复合条件、终止条件、调用Pattern对象的subtype()限定匹配事件的子类型 简单条件根据当前事件对特征来决定是否接受它本质上是filter操作迭代事件依靠之前事件作判断组合条件定义多个条件在外部将其连接。如where().where()终止条件遇到某个特定事件时当前模型不再循环匹配调用模式对象的until()。终止条件只有oneOrMore()或者oneOrMore().optional()结合使用限定子类型调用模式对象的subtype()为当前模式增加子类型限制条件 模式组某些场景需要划分多个阶段每个阶段又有一连串的匹配规则我们可以嵌套的方式来定义模式返回一个GroupPattern是Pattern的子类 // 以模式序列作为初始模式 PatternEvent, ? start Pattern.begin(Pattern.Eventbegin(start_start).where(...).followedBy(start_middle).where(...) );匹配后跳过策略用来精准控制循环模式的匹配结果 Pattern.begin(start, AfterMatchSkipStrategy.noSkip()).where(...)模式的检测处理 将模式应用到事件流上、检测提取匹配的复杂事件最终得到想要的输出信息 处理匹配事件PatternStream的转换操作主要分为两种select、process。具体实现是调用API时传入一个函数 处理超时事件在有时间限制的情况下用within()指定模式检测的时间间隔超出这个时间那么这组检测应该失败但又和真正的失败不同是一种部分成功匹配。在开头能正常匹配的前提下在规定时间内没等到后续的匹配事件此时不应该丢弃需要输出一个提示或报警信息 PatternProcessFunction的测输出流实现TimedOutPartialMatchHandler接口的processTimedOutMatch()方法将超时等信息放入一个Map中在方法内定义一个OutputTag可通过调用output()方法将超时的部分匹配事件输出到标签所标识的侧输出流中 class MyPatternProcessFunction extends PatternProcessFunctionEvent, Stringimplements TimedOutPartialMatchHandlerEvent {// 正常匹配事件的处理Overridepublic void processMatch(MapString, ListEvent match, Context ctx,CollectorString out) throws Exception{...}// 超时部分匹配事件的处理Overridepublic void processTimedOutMatch(MapString, ListEvent match, Context ctx) throws Exception{Event startEvent match.get(start).get(0);OutputTagEvent outputTag new OutputTagEvent(time-out){};ctx.output(outputTag, startEvent);} }PatternTimeoutFunction早期版本用于捕获超时事件的接口传入三个参数侧输出标签、超时事件处理函数、匹配事件提取函数 // 定义一个侧输出流标签用于标识超时侧输出流 OutputTagString timeoutTag new OutputTagString(timeout){}; // 将匹配到的和超时部分匹配的复杂事件提取出来然后包装成提示信息输出 SingleOutputStreamOperatorString resultStream patternStream .select(timeoutTag,// 超时部分匹配事件的处理new PatternTimeoutFunctionEvent, String() {Overridepublic String timeout(MapString, ListEvent pattern, long timeoutTimestamp) throws Exception {Event event pattern.get(start).get(0);return 超时 event.toString();}}, // 正常匹配事件的处理new PatternSelectFunctionEvent, String() {Overridepublic String select(MapString, ListEvent pattern) throws Exception{...}} ); // 将正常匹配和超时部分匹配的处理结果流打印输出 resultStream.print(matched); resultStream.getSideOutput(timeoutTag).print(timeout);处理迟到的数据 Flink CEP沿用设置水位线延迟来处理乱序数据。当第一个事件到来时先放入一个缓冲区(buffer)缓冲区内的数据按照时间戳由小到大排序。当一个水位线到来时将时间戳小于水位线的事件依次取出进行检测匹配 有些事件延迟比较大以至于水位线早已超过它的时间戳。借鉴窗口做法基于PatternStream调用sideOutputLateData()方法将迟到数据放入侧输出流处理 PatternStreamEvent patternStream CEP.pattern(input, pattern); // 定义一个侧输出流的标签 OutputTagString lateDataOutputTag new OutputTagString(late-data){};SingleOutputStreamOperatorComplexEvent result patternStream.sideOutputLateData(lateDataOutputTag) // 将迟到数据输出到侧输出流.select(// 处理正常匹配数据new PatternSelectFunctionEvent, ComplexEvent() {...});// 从结果中提取侧输出流 DataStreamString lateData result.getSideOutput(lateDataOutputTag);Flink处理背压 系统在一个临时负载峰值期间接收数据的速率大于其处理速率的一种场景通俗的讲接收速度 接收速度处理不当会导致资源耗尽数据丢失 消息发送太快消息接收太慢产生消息拥堵发生消息拥堵后系统会自动降低消息发生速度 Flink利用自身作为纯数据流引擎的优势来响应背压问题Flink运行时主要由operators、streams两大组件构成。每个operator会消费中间态的流并在流上进行转换然后生成新的流。Flink使用了高效有界的分布式阻塞队列一个较慢的接受者会降低发送者的发送效率。队列容量通过缓冲池(LocalBufferPool)来实现每个生产、消费的流都会被分配以个缓冲区缓冲被消费后可被回收循环利用 Flink SQL解析过程 一条SQL从提交到Calcite解析、优化、执行的步骤如下 1、Sql Parsesql语句通过java cc解析成语法树(AST)在calcite中用SqlNode表示AST2、Sql Validator结合数字字典(catalog)去验证sql语法3、生成Logical Plan将语法树转换成LoglicalPlan用relNode表示4、生成optimized LogicalPlan先基于 calcite rules优化、在基于Flink定制的rules去优化logical Plan5、生成Flink PhysicalPlan基于Flink里的rules将optimized LogicalPlan转化成Flink的PhysicalPlan6、生产Flink ExecutionPlan调用相应的tanslatetoPlan方法转换和利用CodeGen元编程成Flink的各种算子
http://www.dnsts.com.cn/news/247443.html

相关文章:

  • 网页制作基础教程淘宝网素材万能优化大师下载
  • 做视频网站什么平台好企业建网站的工作
  • 网站建设平台价位WordPress修改注册界面
  • 做网站做系统四川省微信网站建设
  • wordpress tag到导航seo的工作内容
  • 网站地图怎么提交全国企业网查询
  • 住房新建网站网站做跳转微信打开
  • 上海的广告公司网站建设怎样做加入购物车的网站
  • 外贸网站建设公司教程wordpress4.9.4安装启动
  • 山东新昌隆建设咨询有限公司网站wap网站分享代码
  • 网站建设的重要张家界网站建设企业
  • 大学生创业做网站厦门做企业网站找谁
  • 基于淘宝的网站开发分析深圳网站建设找哪家公司好
  • 做网站用了别人公司的图片可以吗网站 数据库 模板
  • 网站建设若干意见网站-网站建设定制
  • 网站右下角弹出广告代码晋江文学网
  • 网站seo策划方案设计失信被执行人名单查询系统
  • 网站如何运营管理html网站优化
  • 什么网站可以发布广告一个网站怎么优化
  • 如何优化wordpress网站自助网站建设开发流程步骤
  • react node.js网站开发建筑新网
  • 绍兴网站制作工具对外宣传推广方案
  • 做网站的核验单 是下载的吗手机网站设计咨询
  • 一个网站用多少数据库表wordpress 制作企业站
  • 中文网站站内优化怎么做wordpress add_action do_action
  • 延吉网站开发公司有哪些武威网站建设优化
  • 网站模板修改软件app软件下载大全
  • 厦门 网站优化应用商城官网下载最新版
  • 做网站一般做多大的企业logo设计注意事项
  • 营销网站怎么做合适制作网站首页psd