辽宁城乡住房建设厅网站,ui界面设计教程,手工活外发一手货源,成都高标建设有限公司官方网站文章目录
SQL水印操作#xff08;Watermark#xff09;
一、为什么要有WaterMark
二、Watermark解决的问题
三、代码演示 SQL水印操作#xff08;Watermark#xff09;
一、为什么要…
文章目录
SQL水印操作Watermark
一、为什么要有WaterMark
二、Watermark解决的问题
三、代码演示 SQL水印操作Watermark
一、为什么要有WaterMark 当 flink 以 EventTime 模式处理流数据时它会根据数据里的时间戳来处理基于时间的算子。但是由于网络、分布式等原因会导致数据乱序的情况。如下图所示
假设在一个5秒的Tumble窗口有一个EventTime是 11秒的数据在第16秒时候到来了。图示第11秒的数据在16秒到来了如下图该如何处理迟到数据 二、Watermark解决的问题 上面的问题在于如何将迟来的EventTime 为11的元素正确处理
当Watermark的时间戳等于Event中携带的EventTime时候上面场景WatermarkEventTime)的计算结果如下 如果想正确处理迟来的数据可以定义Watermark生成策略为 Watermark EventTime -5s 如下 通过watermark来解决简单来说就是延迟窗口关闭的时间等一会迟到的数据窗口关闭不在依据数据的时间而是到达的watermark的时间。
watermark可以理解为一个特殊的数据这个数据不参与计算仅仅是对窗口的触发关闭起作用。 三、代码演示
使用Socket模拟接收数据设置WaterMark 设置的逻辑在第一条数据进来时设置WaterMark为0指定第一条数据的时间戳后获取该时间戳与当前 WaterMark的最大值并将最大值设置为下一条数据的WaterMark以此类推使用滚动Event Time窗口将5秒内的同组数据进行聚合输出
CREATE TABLE watermark_zero (
item STRING,
ts TIMESTAMP(3), -- TIMESTAMP 类型的时间戳
WATERMARK FOR ts AS ts - INTERVAL 0 SECOND
) WITH (
connector socket,
hostname 178.23.142.233,
port 9999,
format csv
);SELECT
date_format(TUMBLE_START(ts, INTERVAL 5 SECOND),yyyy-MM-dd hh:mm:ss.SSS) AS window_start,
date_format(TUMBLE_END(ts, INTERVAL 5 SECOND),yyyy-MM-dd hh:mm:ss.SSS) AS window_end,
date_format(TUMBLE_ROWTIME(ts, INTERVAL 5 SECOND),yyyy-MM-dd hh:mm:ss.SSS) as window_rowtime,
item,count(item) as total_item
FROM watermark_zero
GROUP BY TUMBLE(ts, INTERVAL 5 SECOND), item;
若输入第一条数据hello,2022-03-25 16:39:45
那么我先假设后续的数据Event Time间隔为1秒推断一下WaterMark的设定如下图所示
1.第一条数据的Event Time为1648197585000那么当前窗口时间为1648197585000- 1648197589000即下图中红色框线
2.第一条数据进来时这条数据之前的WaterMark为0当第一条数据已经进入后指定Event Time位置并与现在的WaterMark比较将两者中大的那个值设置为新的WaterMark那么当前数据的WaterMark为1648197585000
3.第二条数据进来时前一条数据的WaterMark为1648197585000第二条数据的Event Time比之前的WaterMark大于是更新WaterMark将当前的WaterMark更新为1648197586000但还没到窗口触发时间不进行计算
4.后面几个以此类推直到Event Time为1648197590000的数据进来的时候前一条数据的WaterMark为1648197589000于是更新当前的WaterMark为1648197590000Flink认为1648197590000之前的数据都已经到达且达到了窗口的触发条件开始进行计算
根据上面的推断启动程序验证一下向9999端口监听终端输入以下内容
hello,2022-03-25 16:39:45
hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:47
hello,2022-03-25 16:39:48
hello,2022-03-25 16:39:49
hello,2022-03-25 16:39:50 Flink输出结果 Rowtime列在经过窗口操作后其Event Time属性将丢失。可以使用辅助函数TUMBLE_ROWTIME、HOP_ROWTIME或SESSION_ROWTIME获取窗口中的Rowtime列的最大值max(rowtime)作为时间窗口的Rowtime其类型是具有Rowtime属性的TIMESTAMP取值为 window_end - 1 。 例如[00:00, 00:15) 的窗口返回值为00:14:59.999 。
数据乱序的场景
上面的实例Event Time是有序现在来做一下数据乱序的场景模拟启动程序注意要关闭之前的查询重新运行查询语句在监听终端中输入如下数据
其中在触发了了第一个窗口计算后又来了两条迟到数据hello,2022-03-25 16:39:47hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:45
hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:47
hello,2022-03-25 16:39:48
hello,2022-03-25 16:39:49
hello,2022-03-25 16:39:50
hello,2022-03-25 16:39:47
hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:51
hello,2022-03-25 16:39:52
hello,2022-03-25 16:39:53
hello,2022-03-25 16:39:54
hello,2022-03-25 16:39:55
Flink结果 从结果中可以看到在第二个窗口中那两条迟到数据并没有进行处理这个就是迟到丢弃。
乱序时间的设置
为了解决上面的问题我们允许Flink处理延迟在5秒内的迟到数据
修改最大乱序时间新建的表仅水印与之前不同
CREATE TABLE watermark_five (
item STRING,
ts TIMESTAMP(3), -- TIMESTAMP 类型的时间戳
WATERMARK FOR ts AS ts - INTERVAL 5 SECOND
) WITH (
connector socket,
hostname 178.23.142.233,
port 9999,
format csv
);SELECT
date_format(TUMBLE_START(ts, INTERVAL 5 SECOND),yyyy-MM-dd hh:mm:ss.SSS) AS window_start,
date_format(TUMBLE_END(ts, INTERVAL 5 SECOND),yyyy-MM-dd hh:mm:ss.SSS) AS window_end,
date_format(TUMBLE_ROWTIME(ts, INTERVAL 5 SECOND),yyyy-MM-dd hh:mm:ss.SSS) as window_rowtime,
item,count(item) as total_item
FROM watermark_five
GROUP BY TUMBLE(ts, INTERVAL 5 SECOND), item;
在监听终端中输入数据
hello,2022-03-25 16:39:45
hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:47
hello,2022-03-25 16:39:48
hello,2022-03-25 16:39:49
hello,2022-03-25 16:39:50
hello,2022-03-25 16:39:47
hello,2022-03-25 16:39:46
hello,2022-03-25 16:39:51
hello,2022-03-25 16:39:52
hello,2022-03-25 16:39:53
hello,2022-03-25 16:39:54
hello,2022-03-25 16:39:55
Flink输出结果 可以看到之前迟到的两条数据在第一个窗口中进行了处理。因为设置了最大允许乱序时间后WaterMark要比原来低5秒可以对延迟5秒内的数据进行处理窗口的触发条件也同样会往后延迟关于延迟时间请结合业务场景进行设置。 博客主页https://lansonli.blog.csdn.net欢迎点赞 收藏 ⭐留言 如有错误敬请指正本文由 Lansonli 原创首发于 CSDN博客停下休息的时候不要忘了别人还在奔跑希望大家抓紧时间学习全力奔赴更美好的生活✨