p2p网站建设方案书,南充高端网站建设,石狮网站建设公司,长沙广告网页设计招聘网Watermark水印、水位线 水位线概述水印本质生成WatermarkWatermark策略WatermarkStrategy工具类使用Watermark策略 内置Watermark生成器单调递增时间戳分配器固定延迟的时间戳分配器 自定义WatermarkGenerator周期性Watermark生成器标记Watermark生成器Watermark策略与Kafka连接… Watermark水印、水位线 水位线概述水印本质生成WatermarkWatermark策略WatermarkStrategy工具类使用Watermark策略 内置Watermark生成器单调递增时间戳分配器固定延迟的时间戳分配器 自定义WatermarkGenerator周期性Watermark生成器标记Watermark生成器Watermark策略与Kafka连接器 其他处理空闲数据源并行度下的水位线传递迟到数据的处理 水位线
概述 在Apache Flink中Watermark水印是一种用于处理事件时间eventtime的时间指示器。它模拟了事件流中事件时间进展的概念。 事件时间是指事件实际发生的时间在分布式流处理中经常用于处理无序事件流。然而由于网络延迟、乱序事件的到达以及分布式处理的特点事件时间可能不按顺序到达处理器。在这种情况下处理程序需要一种机制来标识它们已经处理过的事件时间并据此生成或更新水印。 水印是一个特殊的事件包含了一个时间戳。它表示截至到该时间戳的事件已经全部到达或预期已到达并且可以被认为是完整的。水印告知系统在事件时间维度上处理事件的进展情况并在触发窗口计算、事件乱序处理等方面提供辅助。 水印的生成通常基于事件数据中的时间戳通过一些策略来推断出未到达的事件的时间戳。简单的策略可以是事件时间减去一个固定的延迟值例如如果我们有一个事件的时间戳我们可以生成一个比该事件时间戳小一定固定时间的水印。 Flink通过处理数据流中的时间戳和水印来衡量事件时间进展并通过水印来驱动事件时间的处理。可以根据应用程序的需要自定义水印生成的策略。 水印本质 Watermark是水印、水位线的意思水印的出现是为了解决实时计算中的数据乱序问题它的本质是DataStream中一个带有时间戳的元素。 水位线可以看作一条特殊的数据记录它是插入到数据流中的一个标记点主要内容就是一个时间戳用来指示当前的事件时间。通过使用水位线机制Flink能够动态地处理乱序事件并在保证准确性的同时提供低延迟的数据处理。 如果Flink系统中出现了一个WaterMarkT那么就意味着EventTimeT的数据都已经到达窗口的结束时间和T相同的那个窗口被触发进行计算了。因此水印是Flink判断迟到数据的标准同时也是窗口触发的标记。 在程序并行度大于1的情况下会有多个流产生水印和窗口这时候Flink会选取时间戳最小的水印。 生成Watermark 生成水位线使用assignTimestampsAndWatermarks()方法它主要用来为流中的数据分配时间戳并生成水位线来指示事件时间。 dataStream.assignTimestampsAndWatermarks(WatermarkStrategyT watermarkStrategy);需要传入一个WatermarkStrategy作为参数也就是所谓的水位线生成策略 Watermark策略 Flink程序需要知道事件时间戳对应的字段意味着数据流中的每个元素都需要拥有可分配的事件时间戳。通过使用TimestampAssigner API从元素中的某个字段去访问/提取时间戳。 时间戳的分配与 watermark 的生成是齐头并进的其可以告诉Flink应用程序事件时间的进度。其可以通过指定WatermarkGenerator 来配置watermark的生成方式。 需要设置一个同时包含TimestampAssigner 和WatermarkGenerator的WatermarkStrateg WatermarkStrategy是一个接口该接口中包含了一个时间戳分配器TimestampAssigner和一个水位线生成器WatermarkGenerator。 WatermarkStrategy接口如下
public interface WatermarkStrategyT extends TimestampAssignerSupplierT, WatermarkGeneratorSupplierT {/*** 根据策略实例化一个 watermark 生成器* 主要负责按照既定的方式基于时间戳生成水位线*/WatermarkGeneratorT createWatermarkGenerator(WatermarkGeneratorSupplier.Context var1);/*** 负责从流中数据元素的某个字段中提取时间戳并分配给元素* 时间戳的分配是生成水位线的基础*/ default TimestampAssignerT createTimestampAssigner(TimestampAssignerSupplier.Context context) {return new RecordTimestampAssigner();}
}WatermarkStrategy工具类 工具类WatermarkStrategy中也提供了几个常用的watermark策略并且可以在某些必要场景下构建自己的 watermark策略。 /*** 为时间戳单调递增的情况创建水印策略,适用于有序流*/static T WatermarkStrategy T forMonotonousTimestamps() {return (ctx) - new AscendingTimestampsWatermarks();}/*** 为记录无序流的情况创建水印策略但可以设置事件无序程度的上限。*/static T WatermarkStrategy T forBoundedOutOfOrderness(Duration maxOutOfOrderness) {return (ctx) - new BoundedOutOfOrdernessWatermarks(maxOutOfOrderness);}/*** 基于watermarkgeneratorsupper自定义创建水印策略 */static T WatermarkStrategy T forGenerator(WatermarkGeneratorSupplier T generatorSupplier) {return generatorSupplier::createWatermarkGenerator;}/*** 创建完全不生成水印的水印策略。这在进行纯基于处理时间的流处理的场景中可能是有用*/static T WatermarkStrategy T noWatermarks() {return (ctx) - new NoWatermarksGenerator();}使用forBoundedOutOfOrderness watermark生成器和一个lambda表达式作为时间戳分配器 DataStreamSourceTuple2String, Integer dataStreamSource env.fromElements(Tuple2.of(a, 1),Tuple2.of(a, 2),Tuple2.of(b, 3),Tuple2.of(c, 4));SingleOutputStreamOperatorTuple2String, Integer assignTimestampsAndWatermarks dataStreamSource.assignTimestampsAndWatermarks(WatermarkStrategy.Tuple2String, IntegerforMonotonousTimestamps().withTimestampAssigner((value, ts) - value.f1 * 1000L));注意 时间戳和 watermark 都是从 1970-01-01T00:00:00Z 起的 Java 纪元开始并以毫秒为单位。 使用Watermark策略
WatermarkStrategy在哪里使用
1.直接在数据源上使用2.直接在非数据源的操作之后使用StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamMyEvent stream env.readFile(myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,FilePathFilter.createDefaultFilter(), typeInfo);DataStreamMyEvent withTimestampsAndWatermarks stream.filter( event - event.severity() WARNING ).assignTimestampsAndWatermarks(watermark strategy);withTimestampsAndWatermarks.keyBy( (event) - event.getGroup() ).window(TumblingEventTimeWindows.of(Time.seconds(10))).reduce( (a, b) - a.add(b) ).addSink(...)注意 使用 WatermarkStrategy 去获取流并生成带有时间戳的元素和 watermark 的新流时如果原始流已经具有时间戳或 watermark则新指定的时间戳分配器将覆盖原有的时间戳和 watermark。 内置Watermark生成器
Flink内置了两个WaterMark生成器
1.forMonotonousTimestamps: 时间戳单调增长:其实就是允许的延迟为0
WatermarkStrategy.forMonotonousTimestamps();2.forBoundedOutOfOrderness: 允许固定时间的延迟
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));单调递增时间戳分配器 对于有序流主要特点就是时间戳单调增长永远不会出现迟到数据的问题。因此当前时间戳就可以充当 watermark因为后续到达数据的时间戳不会比当前的小。这是周期性生成水位线的最简单的场景直接调用WatermarkStrategy.forMonotonousTimestamps()方法就可以实现 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从socket接收数据流SingleOutputStreamOperatorString source env.socketTextStream(IP, 8888);// 将输入数据转换为IntegerDataStreamInteger dataStream source.map(str - Integer.parseInt(str));// 定义Watermark策略WatermarkStrategyInteger watermarkStrategy WatermarkStrategy// 升序的watermark没有等待时间即当 数字转时间戳 达到 滚动处理时间窗口10s 就触发窗口执行.IntegerforMonotonousTimestamps()// TimestampAssigner是一个可以从事件数据中提取时间戳字段的简单函数// 指定时间戳分配器从数据中提取.withTimestampAssigner(new SerializableTimestampAssignerInteger() {Overridepublic long extractTimestamp(Integer element, long recordTimestamp) {// 将输入数字转时间戳单位毫秒当作数据的时间戳System.out.println(数据 element);return element * 1000L;}});// 指定watermark策略SingleOutputStreamOperatorInteger singleOutputStreamOperator dataStream.assignTimestampsAndWatermarks(watermarkStrategy);singleOutputStreamOperator// 事件时间语义窗口.windowAll(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessAllWindowFunctionInteger, String, TimeWindow() {Overridepublic void process(Context context, IterableInteger input, CollectorString out) throws Exception {long startTs context.window().getStart();long endTs context.window().getEnd();String windowStart DateFormatUtils.format(startTs, yyyy-MM-dd HH:mm:ss);String windowEnd DateFormatUtils.format(endTs, yyyy-MM-dd HH:mm:ss);long count input.spliterator().estimateSize();out.collect(窗口在时间区间 windowStart - windowEnd 产生 count 条数据,具体数据 input.toString());}}).print();env.execute();}nc -lk 8888
1
2
5
8
9
10
15
18
20
21数据 1
数据 2
数据 5
数据 8
数据 9
数据 10
窗口在时间区间 1970-01-01 08:00:00-1970-01-01 08:00:10 产生5条数据,具体数据[1, 2, 5, 8, 9]
数据 15
数据 18
数据 20
窗口在时间区间 1970-01-01 08:00:10-1970-01-01 08:00:20 产生3条数据,具体数据[10, 15, 18]
数据 21 固定延迟的时间戳分配器 乱序流中需要等待迟到数据到齐必须设置一个固定量的延迟时间数据流中的数据可能遇到的最大延迟。此时生成水位线的时间戳就是当前数据流中最大的时间戳减去延迟时间的结果。 调用WatermarkStrategy.forBoundedOutOfOrderness()方法可以实现方法传入一个maxOutOfOrderness参数表示最大乱序程度它表示数据流中乱序数据时间戳的最大差值如果能确定乱序程度那么设置对应时间长度的延迟就可以等到所有的乱序数据 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从socket接收数据流SingleOutputStreamOperatorString source env.socketTextStream(IP, 8888);// 将输入数据转换为IntegerDataStreamInteger dataStream source.map(str - Integer.parseInt(str));// 定义Watermark策略WatermarkStrategyInteger watermarkStrategy WatermarkStrategy// 最大容忍的延迟时间: 定watermark生成 乱序 等待3s 即当输入 (数字转时间戳 - 3) 达到 滚动处理时间窗口10s 就触发窗口执行.IntegerforBoundedOutOfOrderness(Duration.ofSeconds(3))// 指定时间戳分配器 从数据中提取.withTimestampAssigner((element, recordTimestamp) - {// 将输入数字转时间戳单位毫秒当作数据的时间戳System.out.println(数据 element);return element * 1000L;});// 指定 watermark策略SingleOutputStreamOperatorInteger singleOutputStreamOperator dataStream.assignTimestampsAndWatermarks(watermarkStrategy);singleOutputStreamOperator// 使用事件时间语义窗口.windowAll(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessAllWindowFunctionInteger, String, TimeWindow() {Overridepublic void process(Context context, IterableInteger input, CollectorString out) throws Exception {long startTs context.window().getStart();long endTs context.window().getEnd();String windowStart DateFormatUtils.format(startTs, yyyy-MM-dd HH:mm:ss.SSS);String windowEnd DateFormatUtils.format(endTs, yyyy-MM-dd HH:mm:ss.SSS);long count input.spliterator().estimateSize();out.collect(窗口在时间区间 windowStart - windowEnd 产生 count 条数据,具体数据 input.toString());}}).print();env.execute();}nc -lk 8888
1
5
8
6
7
11
4
13
15
20
19
23
26数据 1
数据 5
数据 8
数据 6
数据 7
数据 11
数据 4
数据 13
窗口在时间区间 1970-01-01 08:00:00.000-1970-01-01 08:00:10.000 产生6条数据,具体数据[1, 5, 8, 6, 7, 4]
数据 15
数据 20
数据 19
数据 23
窗口在时间区间 1970-01-01 08:00:10.000-1970-01-01 08:00:20.000 产生4条数据,具体数据[11, 13, 15, 19]
数据 26自定义WatermarkGenerator TimestampAssigner是一个可以从事件数据中提取时间戳字段的简单函数 watermark 的生成方式本质上是有两种
1.周期性生成 周期性生成器通常通过 onEvent() 观察传入的事件数据然后在框架调用 onPeriodicEmit() 时发出 watermark。 2.标记生成 标记生成器将查看 onEvent() 中的事件数据并等待检查在流中携带 watermark 的特殊标记事件或打点数据。当获取到这些事件数据时它将立即发出 watermark。通常情况下标记生成器不会通过 onPeriodicEmit() 发出 watermark。 都需要继承接口WatermarkGenerator接口如下
/*** {code WatermarkGenerator} 可以基于事件或者周期性的生成 watermark。** pb注意/b WatermarkGenerator 将以前互相独立的 {code AssignerWithPunctuatedWatermarks} * 和 {code AssignerWithPeriodicWatermarks} 一同包含了进来。*/
Public
public interface WatermarkGeneratorT {/*** 每来一条事件数据调用一次可以检查或者记录事件的时间戳或者也可以基于事件数据本身去生成 watermark*/void onEvent(T event, long eventTimestamp, WatermarkOutput output);/*** 周期性的调用也许会生成新的 watermark也许不会** p调用此方法生成 watermark 的间隔时间由 {link ExecutionConfig#getAutoWatermarkInterval()} 决定*/void onPeriodicEmit(WatermarkOutput output);
}周期性Watermark生成器 周期性生成器通常通过 onEvent() 观察传入的事件数据然后在框架调用onPeriodicEmit()时发出watermark 生成watermark的时间间隔每 n 毫秒可以通过ExecutionConfig.setAutoWatermarkInterval(…) 指定。每次都会调用生成器的onPeriodicEmit()方法如果返回的watermark非空且值大于前一个watermark则将发出新的watermark 示例1 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从socket接收数据流SingleOutputStreamOperatorString source env.socketTextStream(IP, 8888);// 将输入数据转换为IntegerDataStreamInteger dataStream source.map(str - Integer.parseInt(str));// 默认周期 200ms 修改默认周期时间为1000msenv.getConfig().setAutoWatermarkInterval(1000);WatermarkStrategyInteger watermarkStrategy WatermarkStrategy// 自定义 周期性生成器 3000L:延迟时间.IntegerforGenerator(ctx - new MyWatermarkGenerator(3000L)).withTimestampAssigner((element, recordTimestamp) - {// 将输入数字转时间戳单位毫秒当作数据的时间戳System.out.println(数据 element);return element * 1000L;});SingleOutputStreamOperatorInteger singleOutputStreamOperator dataStream.assignTimestampsAndWatermarks(watermarkStrategy);singleOutputStreamOperator// 使用事件时间语义窗口.windowAll(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessAllWindowFunctionInteger, String, TimeWindow() {Overridepublic void process(Context context, IterableInteger input, CollectorString out) throws Exception {long startTs context.window().getStart();long endTs context.window().getEnd();String windowStart DateFormatUtils.format(startTs, yyyy-MM-dd HH:mm:ss.SSS);String windowEnd DateFormatUtils.format(endTs, yyyy-MM-dd HH:mm:ss.SSS);long count input.spliterator().estimateSize();out.collect(窗口在时间区间 windowStart - windowEnd 产生 count 条数据,具体数据 input.toString());}}).print();env.execute();}/*** 该 watermark 生成器可以覆盖的场景是数据源在一定程度上乱序。* 即某个最新到达的时间戳为 t 的元素将在最早到达的时间戳为 t 的元素之后最多 n 毫秒到达。*/public static class MyWatermarkGeneratorT implements WatermarkGeneratorT {/*** 乱序等待时间* 允许的最大延迟时间 ms*/private long maxOutOfOrderness;/*** 保存 当前为止 最大的事件时间*/private long currentMaxTimestamp;public MyWatermarkGenerator(long maxOutOfOrderness) {this.maxOutOfOrderness maxOutOfOrderness;}/*** 每条数据来都会调用一次 用来生产WaterMark中的时间戳* 为每个事件调用允许水印生成器检查和记住事件时间戳或根据事件本身发出水印。** param event* param eventTimestamp 提取到数据的事件时间* param output*/Overridepublic void onEvent(T event, long eventTimestamp, WatermarkOutput output) {currentMaxTimestamp Math.max(currentMaxTimestamp, eventTimestamp);System.out.println(调用onEvent 目前为止最大时间戳 currentMaxTimestamp);}/*** 周期性调用 发送watermark 默认200ms调用一次* p* 调用此方法和生成水印的时间间隔取决于ExecutionConfig.getAutoWatermarkInterval()** param output*/Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 发出的 watermark 当前最大时间戳 - 最大乱序时间output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));System.out.println(调用onPeriodicEmit 生成watermark (currentMaxTimestamp - maxOutOfOrderness - 1));}}}调用onPeriodicEmit 生成watermark -3001
调用onPeriodicEmit 生成watermark -3001
数据 5
调用onEvent 目前为止最大时间戳 5000
调用onPeriodicEmit 生成watermark 1999
数据 6
调用onEvent 目前为止最大时间戳 6000
调用onPeriodicEmit 生成watermark 2999
调用onPeriodicEmit 生成watermark 2999
数据 3
调用onEvent 目前为止最大时间戳 6000
调用onPeriodicEmit 生成watermark 2999
调用onPeriodicEmit 生成watermark 2999
数据 13
调用onEvent 目前为止最大时间戳 13000
调用onPeriodicEmit 生成watermark 9999
窗口在时间区间 1970-01-01 08:00:00.000-1970-01-01 08:00:10.000 产生3条数据,具体数据[5, 6, 3]
调用onPeriodicEmit 生成watermark 9999
调用onPeriodicEmit 生成watermark 9999
调用onPeriodicEmit 生成watermark 9999
数据 10
调用onEvent 目前为止最大时间戳 13000
调用onPeriodicEmit 生成watermark 9999
调用onPeriodicEmit 生成watermark 9999示例2
/*** 该生成器生成的 watermark 滞后于处理时间固定量。它假定元素会在有限延迟后到达 Flink。*/
public class TimeLagWatermarkGenerator implements WatermarkGeneratorMyEvent {private final long maxTimeLag 5000; // 5 秒Overridepublic void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {// 处理时间场景下不需要实现}Overridepublic void onPeriodicEmit(WatermarkOutput output) {output.emitWatermark(new Watermark(System.currentTimeMillis() - maxTimeLag));}
}标记Watermark生成器 标记watermark生成器会不停地检测onEvent()中的事件当发现带有水位线信息的事件时就立即发出水位线。把发送水位线的逻辑写在onEvent方法当中即可 标记生成器将查看onEvent()中的事件数据并等待检查在流中携带watermark的特殊标记事件或打点数据。当获取到这些事件数据时它将立即发出watermark。通常情况下标记生成器不会通过onPeriodicEmit()发出 watermark。 WatermarkStrategyInteger watermarkStrategy WatermarkStrategy// 自定义间歇性生成器.IntegerforGenerator(ctx - new MyWatermarkGenerator(3000L)).withTimestampAssigner((element, recordTimestamp) - {// 将输入数字转时间戳单位毫秒当作数据的时间戳System.out.println(数据 element);return element * 1000L;});public static class MyWatermarkGeneratorT implements WatermarkGeneratorT {/*** 乱序等待时间* 允许的最大延迟时间 ms*/private long maxOutOfOrderness;/*** 保存 当前为止 最大的事件时间*/private long currentMaxTimestamp;public MyWatermarkGenerator(long maxOutOfOrderness) {this.maxOutOfOrderness maxOutOfOrderness;}/*** 每条数据来都会调用一次 用来提取最大的事件时间保存下来,并发送watermark** param event* param eventTimestamp 提取到的数据的 事件时间* param output*/Overridepublic void onEvent(T event, long eventTimestamp, WatermarkOutput output) {currentMaxTimestamp Math.max(currentMaxTimestamp, eventTimestamp);output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));System.out.println(调用onEvent 目前为止最大时间戳 currentMaxTimestamp 生成watermark (currentMaxTimestamp - maxOutOfOrderness - 1));}/*** 周期性调用 不需要** param output*/Overridepublic void onPeriodicEmit(WatermarkOutput output) {}}数据 5
调用onEvent 目前为止最大时间戳 5000 生成watermark 1999
数据 6
调用onEvent 目前为止最大时间戳 6000 生成watermark 2999
数据 3
调用onEvent 目前为止最大时间戳 6000 生成watermark 2999
数据 13
调用onEvent 目前为止最大时间戳 13000 生成watermark 9999
窗口在时间区间 1970-01-01 08:00:00.000-1970-01-01 08:00:10.000 产生3条数据,具体数据[5, 6, 3]Watermark策略与Kafka连接器
使用 Apache Kafka 连接器作为数据源时每个Kafka分区可能有一个简单的事件时间模式递增的时间戳或有界无序当使用Kafka数据源时多个分区常常并行使用因此交错来自各个分区的事件数据就会破坏每个分区的事件时间模式在这种情况下可以使用Flink中可识别Kafka分区的watermark生成机制。使用此特性将在Kafka消费端内部针对每个Kafka分区生成watermark并且不同分区watermark的合并方式与在数据流shuffle时的合并方式相同。注意 在自定义数据源中发送水位线以后就不能再在程序中使用assignTimestampsAndWatermarks方法来生成水位线。 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);KafkaSourceString kafkaSource KafkaSource.Stringbuilder()// 指定kafka节点的地址和端口.setBootstrapServers(node01:9092,node02:9092,node03:9092)// 指定消费者组的id.setGroupId(flink_group)// 指定消费的 Topic.setTopics(flink_topic)// 指定反序列化器反序列化value.setValueOnlyDeserializer(new SimpleStringSchema())// flink消费kafka的策略.setStartingOffsets(OffsetsInitializer.latest()).build();DataStreamSourceString stream env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), kafka_source);DataStreamSinkString kafka_source env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), kafka_source).print(Kafka);stream.print(Kafka);env.execute();}
其他
处理空闲数据源 如果数据源中的某一个分区/分片在一段时间内未发送事件数据则意味着WatermarkGenerator也不会获得任何新数据去生成watermark。我们称这类数据源为空闲输入或空闲源。 在这种情况下当某些其他分区仍然发送事件数据的时候就会出现问题。由于下游算子watermark的计算方式是取所有不同的上游并行数据源watermark的最小值则其watermark将不会发生变化。 为了解决这个问题可以使用WatermarkStrategy来检测空闲输入并将其标记为空闲状态。WatermarkStrategy为此提供了一个工具接口
WatermarkStrategy.Tuple2Long, StringforBoundedOutOfOrderness(Duration.ofSeconds(20)).withIdleness(Duration.ofMinutes(1));并行度下的水位线传递 在多并行度下当一个任务接收到多个上游并行任务传递来的水位线时应该以最小的那个作为当前任务的事件时钟。 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// 从socket接收数据流SingleOutputStreamOperatorString source env.socketTextStream(IP, 8888);// 将输入数据转换为IntegerDataStreamInteger dataStream source.map(str - Integer.parseInt(str));// 将数据合理地分发到不同的分区中DataStreamInteger partitionCustom dataStream.partitionCustom(new MyPartitioner(), value - value);// 定义Watermark策略WatermarkStrategyInteger watermarkStrategy WatermarkStrategy// 时间序列递增没有等待时间即当输入 数字转时间戳 达到 滚动处理时间窗口10s 就触发窗口执行.IntegerforMonotonousTimestamps()// 将输入数字转时间戳单位毫秒当作数据的时间戳.withTimestampAssigner((r, ts) - r * 1000L);// 指定 watermark策略SingleOutputStreamOperatorInteger singleOutputStreamOperator partitionCustom.assignTimestampsAndWatermarks(watermarkStrategy);// 分2组窗口 数据%分区数分成两组 奇数一组偶数一组SingleOutputStreamOperatorString process singleOutputStreamOperator.keyBy(a - a % 2)// 使用事件时间语义窗.window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunctionInteger, String, Integer, TimeWindow() {Overridepublic void process(Integer key, Context context, IterableInteger input, CollectorString out) throws Exception {long startTs context.window().getStart();long endTs context.window().getEnd();String windowStart DateFormatUtils.format(startTs, yyyy-MM-dd HH:mm:ss.SSS);String windowEnd DateFormatUtils.format(endTs, yyyy-MM-dd HH:mm:ss.SSS);long count input.spliterator().estimateSize();out.collect(分组 key 的窗口在时间区间 windowStart - windowEnd 产生 count 条数据,具体数据 input.toString());}});process.print();env.execute();}public static class MyPartitioner implements PartitionerInteger {Overridepublic int partition(Integer key, int numPartitions) {if (key % 2 0) {// 将偶数分配到第一个分区return 0;} else {// 将奇数分配到第二个分区return 1;}}}发送测试数据 nc -lk 8888
1
3
5
7
9
11
13
15
17此时控制台不会有任何输出原因如下 偶数窗口中没有任何数据由于当前Task是以最小的那个作为当前任务的事件时钟就会导致当前Task的水位线无法推进就导致窗口无法触发。 因此这里可以使用上面提到的处理空闲数据源设置空闲等待即可解决 // 定义Watermark策略WatermarkStrategyInteger watermarkStrategy WatermarkStrategy// 升序的watermark没有等待时间即当输入 数字 达到 滚动处理时间窗口10s 就触发窗口执行.IntegerforMonotonousTimestamps()// 指定时间戳分配器 从数据中提取.withTimestampAssigner((r, ts) - r * 1000L)//空闲等待5s.withIdleness(Duration.ofSeconds(5));2 分组 1 的窗口在时间区间 1970-01-01 08:00:00.000-1970-01-01 08:00:10.000 产生5条数据,具体数据[1, 3, 5, 7, 9]迟到数据的处理 设置窗口推迟关窗时间在关窗之前迟到数据来了还能被窗口计算来一条迟到数据触发一次计算。关窗后迟到数据不会被计算放入侧输出流 在设置一定的窗口允许迟到时间时只考虑大部分的迟到数据忽略不考虑极端小部分迟到很久的数据 极端小部分迟到的数据 放到侧输出流。 获取到之后可以做各种处理 1.推迟水印推进 在水印产生时设置一个乱序容忍度推迟系统时间的推进保证窗口计算被延迟执行为乱序的数据争取更多的时间进入窗口。 WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));2.设置窗口延迟关闭 Flink的窗口也允许迟到数据。当触发了窗口计算后会先计算当前的结果但是此时并不会关闭窗口。以后每来一条迟到数据就触发一次这条数据所在窗口计算(增量计算)。直到wartermark 超过了窗口结束时间推迟时间此时窗口会真正关闭。 .window(TumblingEventTimeWindows.of(Time.seconds(5))).allowedLateness(Time.seconds(3))3.使用侧流接收迟到的数据
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))).allowedLateness(Time.seconds(3)).sideOutputLateData(lateWS)实现示例 接收窗口关闭之后的迟到数据 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 从socket接收数据流SingleOutputStreamOperatorString source env.socketTextStream(IP, 8888);// 将输入数据转换为IntegerDataStreamInteger dataStream source.map(str - Integer.parseInt(str));// 定义Watermark策略WatermarkStrategyInteger watermarkStrategy WatermarkStrategy.IntegerforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, recordTimestamp) - element * 1000L);// 指定 watermark策略SingleOutputStreamOperatorInteger sensorDSwithWatermark dataStream.assignTimestampsAndWatermarks(watermarkStrategy);OutputTagInteger lateTag new OutputTag(late-data, Types.POJO(Integer.class));SingleOutputStreamOperatorString process sensorDSwithWatermark.keyBy(sensor - sensor % 2).window(TumblingEventTimeWindows.of(Time.seconds(10))).allowedLateness(Time.seconds(2)) // 推迟2s关窗.sideOutputLateData(lateTag) // 关窗后的迟到数据放入侧输出流.process(new ProcessWindowFunctionInteger, String, Integer, TimeWindow() {Overridepublic void process(Integer key, Context context, IterableInteger input, CollectorString out) throws Exception {long startTs context.window().getStart();long endTs context.window().getEnd();String windowStart DateFormatUtils.format(startTs, yyyy-MM-dd HH:mm:ss.SSS);String windowEnd DateFormatUtils.format(endTs, yyyy-MM-dd HH:mm:ss.SSS);long count input.spliterator().estimateSize();out.collect(分组 key 的窗口在时间区间 windowStart - windowEnd 产生 count 条数据,具体数据 input.toString());}});process.print();// 从主流获取侧输出流打印process.getSideOutput(lateTag).printToErr(关窗后的迟到数据);env.execute();}