在哪些网站做推广,网站单页是什么意思,wordpress实现前台登录功能,自适应网站ui做几套文章目录
Flink SQL的时间属性
一、Flink 三种时间属性简介
二、Flink 三种时间属性的应用场景
三、SQL 指定时间属性的两种方式
四、SQL 处理时间DDL定义
五、SQL 事件时间DDL定义 Flink SQL的时…
文章目录
Flink SQL的时间属性
一、Flink 三种时间属性简介
二、Flink 三种时间属性的应用场景
三、SQL 指定时间属性的两种方式
四、SQL 处理时间DDL定义
五、SQL 事件时间DDL定义 Flink SQL的时间属性
先看一下本文整体的思路
与离线处理中常见的时间分区字段一样在实时处理中时间属性也是一个核心概念。Flink 支持 处理时间、事件时间、摄入时间 三种时间语义。分别介绍三种时间语义的应用场景及案例。三种时间在生产环境的使用频次事件时间SQL 常用 处理时间SQL 几乎不用DataStream 少用 摄入时间不用 一、Flink 三种时间属性简介 事件时间指的是数据本身携带的时间这个时间是在事件产生时的时间而且在 Flink SQL 触发计算时也使用数据本身携带的时间。这就叫做 事件时间。目前生产环境中用的最多。处理时间指的是具体算子计算数据执行时的机器时间例如在算子中 Java 取 System.currentTimeMillis()) 在生产环境中用的次多。摄入时间指的是数据从数据源进入 Flink 的时间。摄入时间用的最少可以说基本不使用。
要注意到
上述的三种时间概念不是由于有了数据而诞生的而是有了 Flink 之后根据实际的应用场景而诞生的。以事件时间举个例子如果只是数据携带了时间Flink 也消费了这个数据但是在 Flink 中没有使用数据的这个时间作为计算的触发条件也不能把这个 Flink 任务叫做事件时间的任务。其次要认识到一般一个 Flink 任务只会有一个时间属性所以时间属性通常认为是一个任务粒度的。举例我们可以说 A 任务是事件时间语义的任务B 任务是处理时间语义的任务。当然了一个任务也可以存在多个时间属性。 二、Flink 三种时间属性的应用场景
以上三种时间属性到底对我们的任务有啥影响呢三种时间属性的应用场景是啥
先说结论在 Flink 中时间的作用
主要体现在包含时间窗口的计算中用于标识任务的时间进度来判断是否需要触发窗口的计算。比如常用的滚动窗口、滑动窗口等都需要时间推动触发。这些窗口的应用场景后续会详细介绍。次要体现在自定义时间语义的计算中举个例子比如用户可以自定义每隔 10s 的本地时间或者消费到的数据的时间戳每增大 10s就把计算结果输出一次时间在此类应用中也是一种标识任务进度的作用。
以 滚动窗口 的聚合任务为例来介绍一下事件时间和处理时间的对比区别。
事件时间案例还是以之前的 clicks 表拿来举例。 上面这个案例的窗口大小是 1 小时需求方需要按照用户点击时间戳 cTime 划分数据划分滚动窗口然后计算出 count 聚合结果这样计算能反映出事件的真实发生时间那么就需要把 cTime 设置为窗口的划分时间戳即代码中 tumble(cTime, interval 1 hour)。
上面这种就叫做事件时间。即用数据中自带的时间戳进行窗口的划分点击操作真实的发生时间。
后续 Flink SQL 任务在运行的过程中也会实际按照 cTime 的当前时间作为一小时窗口结束触发条件并计算一个小时窗口内的数据。
处理时间案例还是以之前的 clicks 表拿来举例。
还是上面那个案例但是这次需求方不需要按照数据上的时间戳划分数据划分滚动窗口只需要数据来了之后 在 Flink 机器上的时间作为一小时窗口结束的触发条件并计算。
那么这种触发机制就是处理时间。
摄入时间案例在 Flink 从外部数据源读取到数据时给这条数据带上的当前数据源算子的本地时间戳。下游可以用这个时间戳进行窗口聚合不过这种几乎不使用。 三、SQL 指定时间属性的两种方式
如果要满足 Flink SQL 时间窗口类的聚合操作SQL 或 Table API 中的 数据源表 就需要提供时间属性相当于我们把这个时间属性在 数据源表 上面进行声明以及支持时间相关的操作。
那么来看看 Flink SQL 为我们提供的两种指定时间戳的方式
CREATE TABLE DDL 创建表的时候指定推荐可以在 DataStream 中指定在后续的 DataStream 转的 Table 中使用略过授课以Flink SQL为主
一旦时间属性定义好它就可以像普通列一样使用也可以在时间相关的操作中使用。 四、SQL 处理时间DDL定义
处理时间语义下使用当前机器的系统时间作为处理时间。它是时间的最简单概念。它既不需要提取时间戳也不需要生成watermark。
来看看 Flink SQL 中如何指定处理时间。
CREATE TABLE DDL 指定时间戳的方式。
CREATE TABLE user_actions (user_name STRING,data STRING,-- 使用下面这句来将 user_action_time 声明为处理时间user_action_time AS PROCTIME()
) WITH (...
);
使用案例
点击Flink开发平台左侧资源管理点击上传资源将资料中的order.csv文件进行上传。
可以点击复制按钮复制其在oss的路径。
可以在oss对应路径看到此文件 读取order.csv文件的数据在原本的Schema上添加一个虚拟的时间戳列时间戳列由PROCTIME()函数计算产生。建表语句如下path后面路径需要修改 create table proctime_ddl_table (
userid varchar,
timestamp bigint,
money double,
category varchar,
pt AS PROCTIME()
) with (
connector filesystem,
path oss://lanson-bucket/artifacts/namespaces/lanson-workspace-default/order.csv,
format csv
);
查询表数据调试
select * from proctime_ddl_table;
查询结果如下 点击左侧导航栏元数据管理查看表信息。 五、SQL 事件时间DDL定义
来看看 Flink 中如何指定事件时间。
Event Time时间语义使用一条数据实际发生的时间作为时间属性在Table API SQL中这个字段通常被称为rowtime。这种模式下多次重复计算时计算结果是确定的。这意味着Event Time时间语义可以保证流处理和批处理的统一。
Event Time时间语义下我们需要设置每条数据发生时的时间戳并提供一个Watermark。Watermark表示迟于该时间的数据都作为迟到数据对待。
CREATE TABLE DDL 指定时间戳的方式。
CREATE TABLE user_actions (user_name STRING,data STRING,user_action_time TIMESTAMP(3),-- 使用下面这句来将 user_action_time 声明为事件时间并且声明 watermark 的生成规则即 user_action_time 减 5 秒-- 事件时间列的字段类型必须是 TIMESTAMP 或者 TIMESTAMP_LTZ 类型WATERMARK FOR user_action_time AS user_action_time - INTERVAL 5 SECOND
) WITH (...
);
在上面的DDL中WATERMARK起到了定义Event Time时间属性的作用在这里暂时不讲解watermark知识点后续会讲到。 如果想使用事件时间那么我们的时间戳类型必须是 TIMESTAMP 或者 TIMESTAMP_LTZ 类型。
但是实际应用中时间戳一般都是秒或者是毫秒BIGINT 类型那这种情况怎么办
解决方案如下
CREATE TABLE user_actions (user_name STRING,data STRING,-- 1. 这个 ts 就是常见的毫秒级别时间戳ts BIGINT,-- 2. 将毫秒时间戳转换成 TIMESTAMP_LTZ 类型time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),-- 3. 使用下面这句来将 time_ltz 声明为事件时间并且声明 watermark 的生成规则即 time_ltz 减 5 秒-- 事件时间列的字段类型必须是 TIMESTAMP 或者 TIMESTAMP_LTZ 类型WATERMARK FOR time_ltz AS time_ltz - INTERVAL 5 SECOND
) WITH (...
);
使用案例
读取order.csv文件的数据定义现有事件时间字段上的 watermark 生成表达式该表达式将事件时间字段标记为事件时间属性。
建表语句如下
create table eventime_ddl_table (
userid varchar,
timestamp bigint,
money double,
category varchar,
rt AS TO_TIMESTAMP(FROM_UNIXTIME(timestamp)),
watermark for rt as rt - interval 1 second
) with (
connector filesystem,
path oss://bucketnanfeng/artifacts/namespaces/lanson-flinkworkspace1-default/order.csv,
format csv
);
查询表数据调试
select * from eventime_ddl_table;
查询结果如下 点击左侧导航栏元数据管理查看表信息。 博客主页https://lansonli.blog.csdn.net欢迎点赞 收藏 ⭐留言 如有错误敬请指正本文由 Lansonli 原创首发于 CSDN博客停下休息的时候不要忘了别人还在奔跑希望大家抓紧时间学习全力奔赴更美好的生活✨