网站域名查询系统,app开发常用软件,wordpress打字特效,PR做视频需要放网站上下面我们将学习Flink提供的用于处理事件时间戳和水印的API#xff0c;也会介绍有关事件时间、流转时长和摄取时间#xff0c;下面就让我们跟着官网来学习吧
一、水印策略介绍
为了处理事件时间#xff0c;Flink需要知道事件时间戳#xff0c;这意味着流中的每个元素都需要…下面我们将学习Flink提供的用于处理事件时间戳和水印的API也会介绍有关事件时间、流转时长和摄取时间下面就让我们跟着官网来学习吧
一、水印策略介绍
为了处理事件时间Flink需要知道事件时间戳这意味着流中的每个元素都需要分配其事件时间戳。这通常是通过使用TimestampAssigner从元素中的某个字段访问/提取时间戳来完成的。
时间戳分配与生成水印密切相关水印告诉系统事件时间的进度。我们可以通过指定WatermarkGenerator来配置它。
Flink API需要一个包含TimestampAssigner和WatermarkGenerator的WatermarkStrategy。WatermarkStrategy上有许多开箱即用的常用策略作为静态方法用户也可以在需要时构建自己的策略。
以下是一个接口示例
public interface WatermarkStrategyT extends TimestampAssignerSupplierT,WatermarkGeneratorSupplierT{/***实例化一个{link Timestamp Assigner}用于根据此策略分配时间戳。*/OverrideTimestampAssignerT createTimestampAssigner(TimestampAssignerSupplier.Context context);/*** 实例化一个水印生成器根据此策略生成水印。*/OverrideWatermarkGeneratorT createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}
通常不会自己实现此接口而是使用WatermarkStrategy上的静态帮助方法来实现常见的水印策略或者将自定义TimestampAssigner与WatermarkGenerator捆绑在一起。例如要将bounded-out-of-orderness水印和lambda函数用作时间戳赋值器我们可以使用以下内容
WatermarkStrategy.Tuple2Long, StringforBoundedOutOfOrderness(Duration.ofSeconds(20)).withTimestampAssigner((event, timestamp) - event.f0);
指定TimestampAssigner是可选的比如当使用Kafka或Kinesis时将直接从Kafka/Kinesis记录中获取时间戳。
注意时间戳和水印都指定为自1970-01-01T00:00:00Z的Java纪元以来的毫秒。
二、使用水印策略
Flink应用程序中有两个地方可以使用WatermarkStrategy 1直接在源代码上 2在非源代码操作之后。
第一个选项更可取因为它允许源利用水印逻辑中有关分片/分区/拆分的知识。然后源通常可以在更精细的级别跟踪水印源生成的整体水印将更准确。直接在源上指定水印策略通常意味着我们必须使用特定于源的接口我们我们详细看下水印策略和Kafka连接器了解它在Kafka连接器上的工作原理以及有关每个分区水印如何工作的更多详细信息。
第二个选项在任意操作后设置WatermarkStrategy仅在您不能直接在源上设置策略时才应使用
final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamMyEvent stream env.readFile(myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,FilePathFilter.createDefaultFilter(), typeInfo);DataStreamMyEvent withTimestampsAndWatermarks stream.filter( event - event.severity() WARNING ).assignTimestampsAndWatermarks(watermark strategy);withTimestampsAndWatermarks.keyBy( (event) - event.getGroup() ).window(TumblingEventTimeWindows.of(Time.seconds(10))).reduce( (a, b) - a.add(b) ).addSink(...);
以这种方式使用WatermarkStrategy获取一个流并生成一个带有时间戳元素和水印的新流。如果原始流已经有时间戳和/或水印则时间戳分配程序会覆盖它们。
三、处理空闲源
如果其中一个输入拆分/分区/分片有一段时间没有携带事件这意味着WatermarkGenerator也没有获得任何新信息来建立水印。我们称之为空闲输入或空闲源。这是一个问题因为我们的一些分区可能仍然携带事件。在这种情况下水印将被保留因为它被计算为所有不同并行水印的最小值。
为了解决这个问题我们可以使用WatermarkStrategy来检测空闲并将输入标记为空闲。WatermarkStrategy为此提供了一个方便的代码
WatermarkStrategy.Tuple2Long, StringforBoundedOutOfOrderness(Duration.ofSeconds(20)).withIdleness(Duration.ofMinutes(1));
四、水印对齐
上面我知道拆分/分区/分片或源处于空闲状态并且可能会停止增加水印的情况。在光谱的另一方面拆分/分区/分片或源可能会非常快地处理记录从而相对更快地增加其水印。这本身不是问题。然而对于使用水印发出一些数据的下游处理者来说可能会成为一个问题。
在这种情况下与空闲源相反这种下游运算符的水印如聚合上的窗口连接可以继续。然而这种运算符可能需要缓冲来自快速输入的过多数据因为来自其所有输入的最小水印被滞后输入所抑制。因此快速输入发出的所有记录都必须在所述下游运算符状态下进行缓冲这可能导致运算符状态的不可控增长。
为了解决这个问题我们可以启用水印对齐这将确保没有源/拆分/分片/分区的水印比其他水印增加得太多。可以分别为每个源启用对齐
WatermarkStrategy.Tuple2Long, StringforBoundedOutOfOrderness(Duration.ofSeconds(20)).withWatermarkAlignment(alignment-group-1, Duration.ofSeconds(20), Duration.ofSeconds(1));
注意我们只能为FLIP-27源启用水印对齐
启用对齐时我们需要告诉Flink源应该属于哪个组。通过提供一个标签例如对齐组-1来做到这一点该标签将共享它的所有源绑定在一起。此外我们必须告诉属于该组的所有源的当前最小水印的最大漂移。第三个参数描述了当前最大水印应该更新的频率。频繁更新的缺点是会有更多的RPC消息在TM和JM之间传输。
为了实现对齐Flink将暂停从源/任务中的消费这会产生太远的未来水印。与此同时它将继续从其他来源/任务中读取记录这些记录可以向前移动组合水印从而解除屏蔽更快的水印。
注意从Flink 1.17开始FLIP-27源框架支持拆分级水印对齐。源连接器必须实现一个接口来恢复和暂停拆分以便拆分/分区/分片可以在同一个任务中对齐。
如果从1.15. x和1.16.x之间的Flink版本升级可以通过设置管道.水印对齐来禁用拆分级别对齐。allow-unaligned-source-splits为真。此外可以通过检查源代码是否在运行时抛出UnsupportedOperationException或读取javadocs来判断它是否支持拆分级别对齐。在这种情况下最好禁用拆分级别水印对齐以避免致命异常。
当将标志设置为true时只有当拆分/分片/分区的数量等于源运算符的并行性时水印对齐才会正常工作。这会导致每个子任务都被分配一个工作单元。另一方面如果有两个Kafka分区它们以不同的速度产生水印并被分配给同一个任务那么水印可能不会按预期运行。幸运的是即使在最坏的情况下基本对齐的性能也不会比没有对齐差。
此外Flink还支持跨相同源和/或不同源的任务对齐这在我们有两个不同的源例如Kafka和File以不同的速度生成水印时很有用。
五、编写水印生成器
TimestampAssigner是一个从事件中提取字段的简单函数因此我们不需要详细了解它们。另一方面WatermarkGenerator的编写有点复杂。这是WatermarkGenerator接口
/*** WatermarkGenerator 根据事件或定期生成水印**/
Public
public interface WatermarkGeneratorT {/*** 为每个事件调用允许水印生成器检查和记住事件时间戳或根据事件本身发出水印。*/void onEvent(T event, long eventTimestamp, WatermarkOutput output);/*** 定期调用并且可能会发出新的水印或者不** 调用此方法和生成水印的间隔取决于{link ExecutionConfig#getAutoWatermarkInterval()}*/void onPeriodicEmit(WatermarkOutput output);
}
水印生成有两种不同的风格周期性和标点
周期性生成器通常通过onEvent()观察传入的事件然后在框架调用onperiodicEmit()时发出水印。
穿孔生成器将查看onEvent中的事件并等待流中携带水印信息的特殊标记事件或标点符号。当它看到这些事件之一时它会立即发出水印。通常标点生成器不会从onPeriodicEmit()发出水印。
接下来我们将看看如何为每种样式实现生成器。
1、编写周期性水印生成器
周期性生成器观察流事件并周期性地生成水印可能取决于流元素或者纯粹基于流转时长。
生成水印的间隔每n毫秒是通过ExecutionConfig定义的。setAutoWatermarkInterval…。每次都会调用生成器的onperiodicEmit方法如果返回的水印非空且大于前一个水印则会发出一个新的水印。
/*** 该生成器生成水印假设元素到达时顺序错误但仅在一定程度上。某个时间戳t的最新元素将在时间戳t最早元素之后最多n毫秒到达。*/
public class BoundedOutOfOrdernessGenerator implements WatermarkGeneratorMyEvent {private final long maxOutOfOrderness 3500; // 3.5 secondsprivate long currentMaxTimestamp;Overridepublic void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {currentMaxTimestamp Math.max(currentMaxTimestamp, eventTimestamp);}Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 以当前最高时间戳减去无序界限的形式发出水印output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));}}/*** 此生成器生成的水印比处理时间滞后固定量。它假设元素在有界延迟后到达Flink。*/
public class TimeLagWatermarkGenerator implements WatermarkGeneratorMyEvent {private final long maxTimeLag 5000; // 5 secondsOverridepublic void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {// 不需要做任何事情因为我们需要处理时间}Overridepublic void onPeriodicEmit(WatermarkOutput output) {output.emitWatermark(new Watermark(System.currentTimeMillis() - maxTimeLag));}
}
注意可以在每个事件上生成水印。但是由于每个水印都会导致下游的一些计算过多的水印会降低性能。
六、水印策略和Kafka连接器
当使用Apache Kafka作为数据源时每个Kafka分区可能有一个简单的事件时间模式升序时间戳或有界无秩序。然而当使用来自Kafka的流时多个分区通常会被并行使用交错来自分区的事件并破坏每个分区的模式这是Kafka的消费者客户端工作方式所固有的。
在这种情况下我们可以使用Flink的Kafka-partition-aware水印生成。使用该功能水印在Kafka消费者内部每个Kafka分区生成每个分区的水印被合并就像水印在流洗牌上合并一样。
例如如果每个Kafka分区的事件时间戳严格升序则使用升序时间戳水印生成器生成每个分区的水印将产生完美的整体水印。请注意我们在示例中没有提供TimestampAssigner而是使用Kafka记录本身的时间戳。
下面的插图展示了如何使用每个Kafka分区的水印生成以及在这种情况下水印如何通过流数据流传播。
KafkaSourceString kafkaSource KafkaSource.Stringbuilder().setBootstrapServers(brokers).setTopics(my-topic).setGroupId(my-group).setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamString stream env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)), mySource); 七、如何处理水印
一般来说需要在向下游转发之前去处理给定的水印。例如WindowOperator将首先评估应触发的所有窗口只有在产生水印触发的所有输出后水印本身才会被发送到下游。换句话说由于水印的出现而产生的所有元素都将在水印之前发出。
同样的规则也适用于TwoInputStreamOperator。但是在这种情况下运算符的当前水印定义为其两个输入的最小值。 ------------------------------------------------------------------------------------------------------------------------------- 大多数高校硕博生毕业要求需要参加学术会议发表EI或者SCI检索的学术论文会议论文 可访问艾思科蓝官网浏览即将召开的学术会议列表。会议如下 2025年人工智能、数字媒体技术与社会计算国际学术会议
https://ais.cn/u/byAVfu
第二届边缘计算与并行、分布式计算国际学术会议ECPDC 2025)
https://ais.cn/u/77FJ3u
2025人工智能与计算机网络技术国际学术会议ICAICN 2025
https://ais.cn/u/jUfAVz
2025年数据挖掘与项目管理国际研讨会
https://ais.cn/u/nIbMvm