淄博高端网站建设乐达,网线制作规范,新闻热点,html5魔塔文章目录Table API 和 SQL快速上手基本 API程序架构创建表环境创建表表的查询输出表表和流的转换流处理中的表动态表和持续查询将流转换成动态表原理用 SQL 持续查询-更新查询追加查询将动态表转换为流(Append-only、Retract、Upsert)时间属性和窗口事件时间处理时间窗口追加查询将动态表转换为流(Append-only、Retract、Upsert)时间属性和窗口事件时间处理时间窗口Window聚合Aggregation查询分组聚合窗口聚合开窗Over聚合应用实例—Top N联结Join查询常规联结查询间隔联结查询函数系统函数自定义函数UDFSQL 客户端连接到外部系统Kafka文件系统JDBCElasticsearchHBaseHiveTable API 和 SQL
SQL 是结构化查询语言Structured Query Language的缩写是我们对关系型数据库进行查询和修改的通用编程语言。在关系型数据库中数据是以表table的形式组织起来的所以也可以认为 SQL 是用来对表进行处理的工具语言。无论是传统架构中进行数据存储的MySQL、PostgreSQL还是大数据应用中的 Hive都少不了 SQL 的身影而 Spark 作为大数据处理引擎为了更好地支持在 Hive 中的 SQL 查询也提供了 Spark SQL 作为入口。
Flink 同样提供了对于“表”处理的支持这就是更高层级的应用 API在 Flink 中被称为Table API 和 SQL。Table API 顾名思义就是基于“表”Table的一套 API它是内嵌在 Java、Scala 等语言中的一种声明式领域特定语言DSL也就是专门为处理表而设计的在此基础上Flink 还基于 Apache Calcite 实现了对 SQL 的支持。这样一来我们就可以在 Flink 程序中直接写 SQL 来实现处理需求了。
快速上手
我们想要在代码中使用 Table API必须引入相关的依赖。
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-api-java-bridge_${scala.binary.version}/artifactIdversion${flink.version}/version
/dependency这里的依赖是一个 Java 的“桥接器”bridge主要就是负责 Table API 和下层 DataStream API 的连接支持按照不同的语言分为 Java 版和 Scala 版。
如果我们希望在本地的集成开发环境IDE里运行 Table API 和 SQL还需要引入以下依赖
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-planner-blink_${scala.binary.version}/artifactIdversion${flink.version}/version
/dependency
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-scala_${scala.binary.version}/artifactIdversion${flink.version}/version
/dependency这里主要添加的依赖是一个“计划器”planner它是 Table API 的核心组件负责提供运行时环境并生成程序的执行计划。这里我们用到的是新版的 blink planner。由于 Flink 安装包的 lib 目录下会自带 planner所以在生产集群环境中提交的作业不需要打包这个依赖。
而在 Table API 的内部实现上部分相关的代码是用 Scala 实现的所以还需要额外添加一个 Scala 版流处理的相关依赖。
另外如果想实现自定义的数据格式来做序列化可以引入下面的依赖
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-common/artifactIdversion${flink.version}/version
/dependency实例
public class TableExample {public static void main(String[] args) throws Exception {// 获取流执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 读取数据源SingleOutputStreamOperatorEvent eventStream env.fromElements(new Event(Alice, ./home, 1000L),new Event(Bob, ./cart, 1000L),new Event(Alice, ./prod?id1, 5 * 1000L),new Event(Cary, ./home, 60 * 1000L),new Event(Bob, ./prod?id3, 90 * 1000L),new Event(Alice, ./prod?id7, 105 * 1000L));// 获取表环境StreamTableEnvironment tableEnv StreamTableEnvironment.create(env);// 将数据流转换成表Table eventTable tableEnv.fromDataStream(eventStream);// 用执行 SQL 的方式提取数据Table visitTable tableEnv.sqlQuery(select url, user from eventTable);// 将表转换成数据流打印输出tableEnv.toDataStream(visitTable).print();// 执行程序env.execute();}
}这里我们需要创建一个“表环境”TableEnvironment然后将数据流DataStream转换成一个表Table之后就可以执行 SQL 在这个表中查询数据了。查询得到的结果依然是一个表把它重新转换成流就可以打印输出了。
代码执行的结果如下
I[./home, Alice]
I[./cart, Bob]
I[./prod?id1, Alice]
I[./home, Cary]
I[./prod?id3, Bob]
I[./prod?id7, Alice]可以看到我们将原始的 Event 数据转换成了(urluser)这样类似二元组的类型。每行输出前面有一个“I”标志这是表示每条数据都是“插入”Insert到表中的新增数据。
Table 是 Table API 中的核心接口类对应着我们熟悉的“表”的概念。基于 Table 我们也可以调用一系列查询方法直接进行转换这就是所谓 Table API 的处理方式
// 用 Table API 方式提取数据
Table clickTable2 eventTable.select($(url), $(user));这里的$符号是 Table API 中定义的“表达式”类 Expressions 中的一个方法传入一个字段名称就可以指代数据中对应字段。将得到的表转换成流打印输出会发现结果与直接执行SQL 完全一样。
基本 API
程序架构
在 Flink 中Table API 和 SQL 可以看作联结在一起的一套 API这套 API 的核心概念就是“表”Table。在我们的程序中输入数据可以定义成一张表然后对这张表进行查询就可以得到新的表这相当于就是流数据的转换操作最后还可以定义一张用于输出的表负责将处理结果写入到外部系统。
我们可以看到程序的整体处理流程与 DataStream API 非常相似也可以分为读取数据源Source、转换Transform、输出数据Sink三部分只不过这里的输入输出操作不需要额外定义只需要将用于输入和输出的表定义出来然后进行转换查询就可以了。程序基本架构如下
// 创建表环境
TableEnvironment tableEnv ...;
// 创建输入表连接外部系统读取数据
tableEnv.executeSql(CREATE TEMPORARY TABLE inputTable ... WITH ( connector ... ));
// 注册一个表连接到外部系统用于输出
tableEnv.executeSql(CREATE TEMPORARY TABLE outputTable ... WITH ( connector ... ));
// 执行 SQL 对表进行查询转换得到一个新的表
Table table1 tableEnv.sqlQuery(SELECT ... FROM inputTable... );
// 使用 Table API 对表进行查询转换得到一个新的表
Table table2 tableEnv.from(inputTable).select(...);
// 将得到的结果写入输出表
TableResult tableResult table1.executeInsert(outputTable);这里不是从一个 DataStream 转换成 Table而是通过执行 DDL 来直接创建一个表。这里执行的 CREATE 语句中用 WITH 指定了外部系统的连接器于是就可以连接外部系统读取数据了。这其实是更加一般化的程序架构因为这样我们就可以完全抛开DataStream API直接用 SQL 语句实现全部的流处理过程。
而后面对于输出表的定义是完全一样的。可以发现在创建表的过程中其实并不区分“输入”还是“输出”只需要将这个表“注册”进来、连接到外部系统就可以了这里的 inputTable、outputTable 只是注册的表名并不代表处理逻辑可以随意更换。至于表的具体作用则要等到执行后面的查询转换操作时才能明确。我们直接从 inputTable 中查询数据那么 inputTable就是输入表而 outputTable 会接收另外表的结果进行写入那么就是输出表。
创建表环境
对于 Flink 这样的流处理框架来说数据流和表在结构上还是有所区别的。所以使用 Table API 和 SQL 需要一个特别的运行时环境这就是所谓的“表环境”TableEnvironment。它主要负责
注册 Catalog 和表执行 SQL 查询注册用户自定义函数UDFDataStream 和表之间的转换。
这里的 Catalog 就是“目录”与标准 SQL 中的概念是一致的主要用来管理所有数据库database和表table的元数据metadata。通过 Catalog 可以方便地对数据库和表进行查询的管理所以可以认为我们所定义的表都会“挂靠”在某个目录下这样就可以快速检索。在表环境中可以由用户自定义 Catalog并在其中注册表和自定义函数UDF。默认的 Catalog就叫作 default_catalog。
每个表和 SQL 的执行都必须绑定在一个表环境TableEnvironment中。TableEnvironment是 Table API 中提供的基本接口类可以通过调用静态的 create() 方法来创建一个表环境实例。方法需要传入一个环境的配置参数 EnvironmentSettings它可以指定当前表环境的执行模式和计划器planner。执行模式有批处理和流处理两种选择默认是流处理模式计划器默认使用 blink planner。
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
EnvironmentSettings settings EnvironmentSettings.newInstance().inStreamingMode() // 使用流处理模式.build();
TableEnvironment tableEnv TableEnvironment.create(settings);对于流处理场景其实默认配置就完全够用了。所以我们也可以用另一种更加简单的方式来创建表环境
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv StreamTableEnvironment.create(env);这 里 我 们 引 入 了 一 个 “ 流 式 表 环 境 ” StreamTableEnvironment 它 是 继 承 自TableEnvironment 的子接口。调用它的 create()方法只需要直接将当前的流执行环境StreamExecutionEnvironment传入就可以创建出对应的流式表环境了。
创建表
表Table是我们非常熟悉的一个概念它是关系型数据库中数据存储的基本形式也是 SQL 执行的基本对象。Flink 中的表概念也并不特殊是由多个“行”数据构成的每个行Row又可以有定义好的多个列Column字段整体来看表就是固定类型的数据组成的二维矩阵。
为了方便地查询表表环境中会维护一个目录Catalog和表的对应关系。所以表都是通过 Catalog 来进行注册创建的。表在环境中有一个唯一的 ID由三部分组成目录catalog名数据库database名以及表名。在默认情况下目录名为 default_catalog数据库名为default_database。所以如果我们直接创建一个叫作 MyTable 的表它的 ID 就是
default_catalog.default_database.MyTable具体创建表的方式有通过**连接器connector和虚拟表virtual tables**两种。
1连接器表Connector Tables
最直观的创建表的方式就是通过连接器connector连接到一个外部系统然后定义出对应的表结构。例如我们可以连接到 Kafka 或者文件系统将存储在这些外部系统的数据以“表”的形式定义出来这样对表的读写就可以通过连接器转换成对外部系统的读写了。当我们在表环境中读取这张表连接器就会从外部系统读取数据并进行转换而当我们向这张表写入数据连接器就会将数据输出Sink到外部系统中。
在代码中我们可以调用表环境的 executeSql()方法可以传入一个 DDL 作为参数执行SQL 操作。这里我们传入一个 CREATE 语句进行表的创建并通过 WITH 关键字指定连接到外部系统的连接器
tableEnv.executeSql(CREATE [TEMPORARY] TABLE MyTable ... WITH ( connector ... ));这里的 TEMPORARY 关键字可以省略。
例子
String createDDL CREATE TABLE clickTable ( user_name STRING, url STRING, ts BIGINT ) WITH ( connector filesystem, path input/clicks.csv, format csv );这里没有定义 Catalog 和 Database 所 以 都 是 默 认 的 表 的 完 整 ID 就 是default_catalog.default_database.MyTable。如果希望使用自定义的目录名和库名可以在环境中进行设置
tEnv.useCatalog(custom_catalog);
tEnv.useDatabase(custom_database);2虚拟表Virtual Tables
在环境中注册之后我们就可以在 SQL 中直接使用这张表进行查询转换了。
Table newTable tableEnv.sqlQuery(SELECT ... FROM MyTable... );这里调用了表环境的 sqlQuery()方法直接传入一条 SQL 语句作为参数执行查询得到的结果是一个 Table 对象。Table 是 Table API 中提供的核心接口类就代表了一个 Java 中定义的表实例。
得到的 newTable 是一个中间转换结果如果之后又希望直接使用这个表执行 SQL又该怎么做呢 方式一 Table visitTable tableEnv.sqlQuery(select url, user from newTable);方式二由于 newTable 是一个 Table 对象并没有在表环境中注册所以我们还需要将这个中间结果表注册到环境中才能在 SQL 中使用 tableEnv.createTemporaryView(NewTable, newTable);这里的注册其实是创建了一个“虚拟表”Virtual Table。这个概念与 SQL 语法中的视图View非常类似所以调用的方法也叫作创建“虚拟视图”createTemporaryView。视图之所以是“虚拟”的是因为我们并不会直接保存这个表的内容并没有“实体”只是在用到这张表的时候会将它对应的查询语句嵌入到 SQL 中。 Table visitTable tableEnv.sqlQuery(select url, user from newTable);表的查询
创建好了表接下来自然就是对表进行查询转换了。对一个表的查询Query操作就对应着流数据的转换Transform处理。
Flink 为我们提供了两种查询方式SQL 和 Table API。
1执行 SQL 进行查询
基于表执行 SQL 语句是我们最为熟悉的查询方式。Flink 基于 Apache Calcite 来提供对SQL 的支持Calcite 是一个为不同的计算平台提供标准 SQL 查询的底层工具很多大数据框架比如 Apache Hive、Apache Kylin 中的 SQL 支持都是通过集成 Calcite 来实现的。
在代码中我们只要调用表环境的 sqlQuery()方法传入一个字符串形式的 SQL 查询语句就可以了。执行得到的结果是一个 Table 对象。
// 创建表环境
TableEnvironment tableEnv ...;
// 创建表
tableEnv.executeSql(CREATE TABLE EventTable ... WITH ( connector ... ));
// 查询用户 Alice 的点击事件并提取表中前两个字段
Table aliceVisitTable tableEnv.sqlQuery(SELECT user, url FROM EventTable WHERE user Alice
);目前 Flink 支持标准 SQL 中的绝大部分用法并提供了丰富的计算函数。这样我们就可以把已有的技术迁移过来像在 MySQL、Hive 中那样直接通过编写 SQL 实现自己的处理需求从而大大降低了 Flink 上手的难度。
例如我们也可以通过 GROUP BY 关键字定义分组聚合调用 COUNT()、SUM()这样的函数来进行统计计算
Table urlCountTable tableEnv.sqlQuery(SELECT user, COUNT(url) FROM EventTable GROUP BY user
);上面的例子得到的是一个新的 Table 对象我们可以再次将它注册为虚拟表继续在 SQL中调用。另外我们也可以直接将查询的结果写入到已经注册的表中这需要调用表环境的executeSql()方法来执行 DDL传入的是一个 INSERT 语句
// 注册表
tableEnv.executeSql(CREATE TABLE EventTable ... WITH ( connector ... ));
tableEnv.executeSql(CREATE TABLE OutputTable ... WITH ( connector ... ));// 将查询结果输出到 OutputTable 中
tableEnv.executeSql (INSERT INTO OutputTable SELECT user, url FROM EventTable WHERE user Alice
);2调用 Table API 进行查询
另外一种查询方式就是调用 Table API。这是嵌入在 Java 和 Scala 语言内的查询 API核心就是 Table 接口类通过一步步链式调用 Table 的方法就可以定义出所有的查询转换操作。每一步方法调用的返回结果都是一个 Table。
由于Table API是基于Table的Java实例进行调用的因此我们首先要得到表的Java对象。基于环境中已注册的表可以通过表环境的 from()方法非常容易地得到一个 Table 对象
Table eventTable tableEnv.from(EventTable);传入的参数就是注册好的表名。注意这里 eventTable 是一个 Table 对象而 EventTable 是在环境中注册的表名。得到 Table 对象之后就可以调用 API 进行各种转换操作了得到的是一个新的 Table 对象
Table maryClickTable eventTable.where($(user).isEqual(Alice)).select($(url), $(user));这里每个方法的参数都是一个“表达式”Expression用方法调用的形式直观地说明了想要表达的内容“$”符号用来指定表中的一个字段。上面的代码和直接执行 SQL 是等效的。
Table API 是嵌入编程语言中的 DSLSQL 中的很多特性和功能必须要有对应的实现才可以使用因此跟直接写 SQL 比起来肯定就要麻烦一些。目前 Table API 支持的功能相对更少可以预见未来 Flink 社区也会以扩展 SQL 为主为大家提供更加通用的接口方式所以我们接下来也会以介绍 SQL 为主简略地提及 Table API。
输出表
表的创建和查询就对应着流处理中的读取数据源Source和转换Transform而最后一个步骤 Sink也就是将结果数据输出到外部系统就对应着表的输出操作。
在代码上输出一张表最直接的方法就是调用 Table 的方法 executeInsert()方法将一个Table 写入到注册过的表中方法传入的参数就是注册的表名。
// 注册表用于输出数据到外部系统
tableEnv.executeSql(CREATE TABLE OutputTable ... WITH ( connector ... ));
// 经过查询转换得到结果表
Table result ...
// 将结果表写入已注册的输出表中
result.executeInsert(OutputTable);在底层表的输出是通过将数据写入到 TableSink 来实现的。TableSink 是 Table API 中提供的一个向外部系统写入数据的通用接口可以支持不同的文件格式比如 CSV、Parquet、存储数据库比如 JDBC、HBase、Elasticsearch和消息队列比如 Kafka。它有些类似于DataStream API 中调用 addSink()方法时传入的 SinkFunction有不同的连接器对它进行了实现。
表和流的转换
从创建表环境开始历经表的创建、查询转换和输出我们已经可以使用 Table API 和 SQL进行完整的流处理了。不过在应用的开发过程中我们测试业务逻辑一般不会直接将结果直接写入到外部系统而是在本地控制台打印输出。对于 DataStream 这非常容易直接调用 print()方法就可以看到结果数据流的内容了但对于 Table 就比较悲剧——它没有提供 print()方法。
在 Flink 中我们可以将 Table 再转换成 DataStream然后进行打印输出。这就涉及了表和流的转换。
1将表Table转换成流DataStream 调用 toDataStream()方法 将一个 Table 对象转换成 DataStream 非常简单只要直接调用表环境的方法 toDataStream()就可以了。例如我们可以将前面查询转换得到的表 maryClickTable 转换成流打印输出这代表了“Mary 点击的 url 列表” Table aliceVisitTable tableEnv.sqlQuery(SELECT user, url FROM EventTable WHERE user Alice
);
// 将表转换成数据流
tableEnv.toDataStream(aliceVisitTable).print();注意toDataStream只适用于“仅插入流”Insert-Only Streams它们的特点是数据只会插入、不会更新。 调用 toChangelogStream()方法 将 maryClickTable 转换成流打印输出是很简单的然而如果我们同样希望将“用户点击次数统计”表 urlCountTable 进行打印输出就会抛出一个 TableException 异常 Exception in thread main org.apache.flink.table.api.TableException: Table sink default_catalog.default_database.Unregistered_DataStream_Sink_1 doesnt support consuming update changes ...这表示当前的 TableSink 并不支持表的更新update操作。这是什么意思呢 因为 print 本身也可以看作一个 Sink 操作所以这个异常就是说打印输出的 Sink 操作不支持对数据进行更新。具体来说urlCountTable 这个表中进行了分组聚合统计所以表中的每一行是会“更新”的。也就是说Alice 的第一个点击事件到来表中会有一行(Alice, 1)第二个点击事件到来这一行就要更新为(Alice, 2)。但之前的(Alice, 1)已经打印输出了“覆水难收”我们怎么能对它进行更改呢所以就会抛出异常。 解决的思路是对于这样有更新操作的表我们不要试图直接把它转换成 DataStream 打印输出而是记录一下它的“更新日志”change log。这样一来对于表的所有更新操作就变成了一条更新日志的流我们就可以转换成流打印输出了。 代码中需要调用的是表环境的 toChangelogStream()方法 Table urlCountTable tableEnv.sqlQuery(SELECT user, COUNT(url) FROM EventTable GROUP BY user
);
// 将表转换成更新日志流
tableEnv.toChangelogStream(urlCountTable).print();2将流DataStream转换成表Table 调用 fromDataStream()方法 想要将一个 DataStream 转换成表也很简单可以通过调用表环境的 fromDataStream()方法来实现返回的就是一个 Table 对象。例如我们可以直接将事件流 eventStream 转换成一个表 StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();
// 获取表环境
StreamTableEnvironment tableEnv StreamTableEnvironment.create(env);
// 读取数据源
SingleOutputStreamOperatorEvent eventStream env.addSource(...);
// 将数据流转换成表
Table eventTable tableEnv.fromDataStream(eventStream);由于流中的数据本身就是定义好的 POJO 类型 Event所以我们将流转换成表之后每一行数据就对应着一个 Event而表中的列名就对应着 Event 中的属性。 另外我们还可以在 fromDataStream()方法中增加参数用来指定提取哪些属性作为表中的字段名并可以任意指定位置 // 提取 Event 中的 timestamp 和 url 作为表中的列
Table eventTable2 tableEnv.fromDataStream(eventStream, $(timestamp), $(url));需要注意的是timestamp 本身是 SQL 中的关键字所以我们在定义表名、列名时要尽量避免。这时可以通过表达式的 as()方法对字段进行重命名 // 将 timestamp 字段重命名为 ts
Table eventTable2 tableEnv.fromDataStream(eventStream, $(timestamp).as(ts), $(url));调用 createTemporaryView()方法 调用 fromDataStream()方法简单直观可以直接实现 DataStream 到 Table 的转换不过如果我们希望直接在 SQL 中引用这张表就还需要调用表环境的 createTemporaryView()方法来创建虚拟视图了。 对于这种场景也有一种更简洁的调用方式。我们可以直接调用 createTemporaryView()方法创建虚拟表传入的两个参数第一个依然是注册的表名而第二个可以直接就是DataStream。之后仍旧可以传入多个参数用来指定表中的字段 tableEnv.createTemporaryView(EventTable, eventStream, $(timestamp).as(ts),$(url));调用 fromChangelogStream ()方法 表环境还提供了一个方法 fromChangelogStream()可以将一个更新日志流转换成表。这个方法要求流中的数据类型只能是 Row而且每一个数据都需要指定当前行的更新类型RowKind所以一般是由连接器帮我们实现的直接应用比较少见。
3支持的数据类型
前面示例中的 DataStream流中的数据类型都是定义好的 POJO 类。如果 DataStream 中的类型是简单的基本类型还可以直接转换成表吗这就涉及了 Table 中支持的数据类型。
整体来看DataStream 中支持的数据类型Table 中也是都支持的只不过在进行转换时需要注意一些细节。 原子类型 在 Flink 中基础数据类型Integer、Double、String和通用数据类型也就是不可再拆分的数据类型统一称作“原子类型”。原子类型的 DataStream转换之后就成了只有一列的Table列字段field的数据类型可以由原子类型推断出。另外还可以在 fromDataStream()方法里增加参数用来重新命名列字段。 StreamTableEnvironment tableEnv ...;
DataStreamLong stream ...;
// 将数据流转换成动态表动态表只有一个字段重命名为 myLong
Table table tableEnv.fromDataStream(stream, $(myLong));Tuple 类型 当原子类型不做重命名时默认的字段名就是“f0”容易想到这其实就是将原子类型看作了一元组 Tuple1 的处理结果。 Table 支持 Flink 中定义的元组类型 Tuple对应在表中字段名默认就是元组中元素的属性名 f0、f1、f2…。所有字段都可以被重新排序也可以提取其中的一部分字段。字段还可以通过调用表达式的 as()方法来进行重命名。 StreamTableEnvironment tableEnv ...;
DataStreamTuple2Long, Integer stream ...;
// 将数据流转换成只包含 f1 字段的表
Table table tableEnv.fromDataStream(stream, $(f1));
// 将数据流转换成包含 f0 和 f1 字段的表在表中 f0 和 f1 位置交换
Table table tableEnv.fromDataStream(stream, $(f1), $(f0));
// 将 f1 字段命名为 myIntf0 命名为 myLong
Table table tableEnv.fromDataStream(stream, $(f1).as(myInt), $(f0).as(myLong));POJO 类型 Flink 也支持多种数据类型组合成的“复合类型”最典型的就是简单 Java 对象POJO 类型。由于 POJO 中已经定义好了可读性强的字段名这种类型的数据流转换成 Table 就显得无比顺畅了。 将 POJO 类型的 DataStream 转换成 Table如果不指定字段名称就会直接使用原始 POJO 类型中的字段名称。POJO 中的字段同样可以被重新排序、提却和重命名这在之前的例子中已经有过体现。 StreamTableEnvironment tableEnv ...;
DataStreamEvent stream ...;
Table table tableEnv.fromDataStream(stream);
Table table tableEnv.fromDataStream(stream, $(user));
Table table tableEnv.fromDataStream(stream, $(user).as(myUser), $(url).as(myUrl));Row 类型 Flink 中还定义了一个在关系型表中更加通用的数据类型——行Row它是 Table 中数据的基本组织形式。Row 类型也是一种复合类型它的长度固定而且无法直接推断出每个字段的类型所以在使用时必须指明具体的类型信息我们在创建 Table 时调用的 CREATE语句就会将所有的字段名称和类型指定这在 Flink 中被称为表的“模式结构”Schema。除此之外Row 类型还附加了一个属性 RowKind用来表示当前行在更新操作中的类型。这样Row 就可以用来表示更新日志流changelog stream中的数据从而架起了 Flink 中流和表的转换桥梁。 所以在更新日志流中元素的类型必须是 Row而且需要调用 ofKind()方法来指定更新类型。下面是一个具体的例子 DataStreamRow dataStream env.fromElements(Row.ofKind(RowKind.INSERT, Alice, 12),Row.ofKind(RowKind.INSERT, Bob, 5),Row.ofKind(RowKind.UPDATE_BEFORE, Alice, 12),Row.ofKind(RowKind.UPDATE_AFTER, Alice, 100));
// 将更新日志流转换为表
Table table tableEnv.fromChangelogStream(dataStream);4综合应用示例
现在我们可以将介绍过的所有 API 整合起来写出一段完整的代码。同样还是用户的一组点击事件我们可以查询出某个用户例如 Alice点击的 url 列表也可以统计出每个用户累计的点击次数这可以用两句 SQL 来分别实现。具体代码如下
public static void main(String[] args) throws Exception {// 获取流环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 读取数据源SingleOutputStreamOperatorEvent eventStream env.fromElements(new Event(Alice, ./home, 1000L),new Event(Bob, ./cart, 1000L),new Event(Alice, ./prod?id1, 5 * 1000L),new Event(Cary, ./home, 60 * 1000L),new Event(Bob, ./prod?id3, 90 * 1000L),new Event(Alice, ./prod?id7, 105 * 1000L));// 获取表环境StreamTableEnvironment tableEnv StreamTableEnvironment.create(env);// 将数据流转换成表tableEnv.createTemporaryView(EventTable, eventStream);// 查询Alice的访问url列表Table aliceVisitTable tableEnv.sqlQuery(SELECT url, user FROM EventTable WHERE user Alice);// 统计每个用户的点击次数Table urlCountTable tableEnv.sqlQuery(SELECT user, COUNT(url) FROM EventTable GROUP BY user);// 将表转换成数据流在控制台打印输出tableEnv.toDataStream(aliceVisitTable).print(alice visit);tableEnv.toChangelogStream(urlCountTable).print(count);// 执行程序env.execute();
}用户 Alice 的点击 url 列表只需要一个简单的条件查询就可以得到对应的表中只有插入操作所以我们可以直接调用 toDataStream()将它转换成数据流然后打印输出。控制台输出的结果如下
alice visit I[./home, Alice]
alice visit I[./prod?id1, Alice]
alice visit I[./prod?id7, Alice]这里每条数据前缀的I 就是 RowKind表示 INSERT插入。
而由于统计点击次数时用到了分组聚合造成结果表中数据会有更新操作所以在打印输出时需要将表 urlCountTable 转换成更新日志流changelog stream。控制台输出的结果如下
count I[Alice, 1]
count I[Bob, 1]
count -U[Alice, 1]
count U[Alice, 2]
count I[Cary, 1]
count -U[Bob, 1]
count U[Bob, 2]
count -U[Alice, 2]
count U[Alice, 3]这里数据的前缀出现了I、-U 和U 三种 RowKind分别表示 INSERT插入、UPDATE_BEFORE更新前和 UPDATE_AFTER更新后。当收到每个用户的第一次点击事件时会在表中插入一条数据例如I[Alice, 1]、I[Bob, 1]。而之后每当用户增加一次点击事件就会带来一次更新操作更新日志流changelog stream中对应会出现两条数据分别表示之前数据的失效和新数据的生效例如当 Alice 的第二条点击数据到来时会出现一个-U[Alice, 1]和一个U[Alice, 2]表示 Alice 的点击个数从 1 变成了 2。
这种表示更新日志的方式有点像是声明“撤回”了之前的一条数据、再插入一条更新后的数据所以也叫作“撤回流”Retract Stream。
流处理中的表
在 Flink 中使用表和 SQL基本上跟其它场景是一样的不过对于表和流的转换却稍显复杂。当我们将一个 Table 转换成 DataStream 时有“仅插入流”Insert-Only Streams和“更新日志流”Changelog Streams两种不同的方式具体使用哪种方式取决于表中是否存在更新update操作。
这种麻烦其实是不可避免的。我们知道Table API 和 SQL 本质上都是基于关系型表的操作方式而关系型表Table本身是有界的更适合批处理的场景。所以在 MySQL、Hive这样的固定数据集中进行查询使用 SQL 就会显得得心应手。而对于 Flink 这样的流处理框架来说要处理的是源源不断到来的无界数据流我们无法等到数据都到齐再做查询每来一条数据就应该更新一次结果这时如果一定要使用表和 SQL 进行处理就会显得有些别扭了需要引入一些特殊的概念。
我们可以将关系型表/SQL 与流处理做一个对比如表
关系型表/SQL流处理处理的数据对象字段元组的有界集合字段元组的无限序列查询Query可以访问到完整的数据输入无法访问到所有数据必须“持续”等待流式输入对数据的访问查询终止条件生成固定大小的结果集后终止永不停止根据持续收到的数据不断更新查询结果
动态表和持续查询
流处理面对的数据是连续不断的这导致了流处理中的“表”跟我们熟悉的关系型数据库中的表完全不同而基于表执行的查询操作也就有了新的含义。
如果我们希望把流数据转换成表的形式那么这表中的数据就会不断增长如果进一步基于表执行 SQL 查询那么得到的结果就不是一成不变的而是会随着新数据的到来持续更新。
1动态表Dynamic Tables
当流中有新数据到来初始的表中会插入一行而基于这个表定义的 SQL 查询就应该在之前的基础上更新结果。这样得到的表就会不断地动态变化被称为“动态表”Dynamic Tables。
动态表是Flink在Table API和SQL中的核心概念它为流数据处理提供了表和SQL支持。我们所熟悉的表一般用来做批处理面向的是固定的数据集可以认为是“静态表”而动态表则完全不同它里面的数据会随时间变化。
其实动态表的概念我们在传统的关系型数据库中已经有所接触。数据库中的表其实是一系列 INSERT、UPDATE 和 DELETE 语句执行的结果在关系型数据库中我们一般把它称为更新日志流changelog stream。如果我们保存了表在某一时刻的快照snapshot那么接下来只要读取更新日志流就可以得到表之后的变化过程和最终结果了。在很多高级关系型数据库比如 Oracle、DB2中都有“物化视图”Materialized Views的概念可以用来缓存 SQL 查询的结果它的更新其实就是不停地处理更新日志流的过程。
Flink 中的动态表就借鉴了物化视图的思想。
2持续查询Continuous Query
动态表可以像静态的批处理表一样进行查询操作。由于数据在不断变化因此基于它定义的 SQL 查询也不可能执行一次就得到最终结果。这样一来我们对动态表的查询也就永远不会停止一直在随着新数据的到来而继续执行。这样的查询就被称作“持续查询”Continuous Query。对动态表定义的查询操作都是持续查询而持续查询的结果也会是一个动态表。
由于每次数据到来都会触发查询操作因此可以认为一次查询面对的数据集就是当前输入动态表中收到的所有数据。这相当于是对输入动态表做了一个“快照”snapshot当作有限数据集进行批处理流式数据的到来会触发连续不断的快照查询像动画一样连贯起来就构成了“持续查询”。
如图描述了持续查询的过程。这里我们也可以清晰地看到流、动态表和持续查询的关系 持续查询的步骤如下
流stream被转换为动态表dynamic table对动态表进行持续查询continuous query生成新的动态表生成的动态表被转换成流。
将流转换成动态表原理
为了能够使用 SQL 来做流处理我们必须先把流stream转换成动态表。当然之前在讲解基本 API 时已经介绍过代码中的 DataStream 和 Table 如何转换现在我们则要抛开具体的数据类型从原理上理解流和动态表的转换过程。
如果把流看作一张表那么流中每个数据的到来都应该看作是对表的一次插入Insert操作会在表的末尾添加一行数据。因为流是连续不断的而且之前的输出结果无法改变、只能在后面追加所以我们其实是通过一个只有插入操作insert-only的更新日志changelog流来构建一个表。
// 获取流环境
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 读取数据源
SingleOutputStreamOperatorEvent eventStream env.fromElements(new Event(Alice, ./home, 1000L),new Event(Bob, ./cart, 1000L),new Event(Alice, ./prod?id1, 5 * 1000L),new Event(Cary, ./home, 60 * 1000L),new Event(Bob, ./prod?id3, 90 * 1000L),new Event(Alice, ./prod?id7, 105 * 1000L)
);
// 获取表环境
StreamTableEnvironment tableEnv StreamTableEnvironment.create(env);
// 将数据流转换成表
tableEnv.createTemporaryView(EventTable, eventStream, $(user), $(url), $(timestamp).as(ts));// 统计每个用户的点击次数
Table urlCountTable tableEnv.sqlQuery(SELECT user, COUNT(url) as cnt FROM EventTable GROUP BY user);
// 将表转换成数据流在控制台打印输出
tableEnv.toChangelogStream(urlCountTable).print(count);// 执行程序
env.execute();我们现在的输入数据就是用户在网站上的点击访问行为数据类型被包装为 POJO 类型Event。我们将它转换成一个动态表注册为 EventTable。表中的字段定义如下
[user: VARCHAR, // 用户名url: VARCHAR, // 用户访问的 URLts: BIGINT // 时间戳
]如图所示当用户点击事件到来时就对应着动态表中的一次插入Insert操作每条数据就是表中的一行随着插入更多的点击事件得到的动态表将不断增长。 用 SQL 持续查询-更新查询追加查询
1 更新Update查询
我们在代码中定义了一个 SQL 查询。
Table urlCountTable tableEnv.sqlQuery(SELECT user, COUNT(url) as cnt FROM EventTable GROUP BY user);这个查询很简单主要是分组聚合统计每个用户的点击次数。我们把原始的动态表注册为EventTable经过查询转换后得到 urlCountTable这个结果动态表中包含两个字段具体定义如下
[user: VARCHAR, // 用户名cnt: BIGINT // 用户访问 url 的次数
]如图所示当原始动态表不停地插入新的数据时查询得到的 urlCountTable 会持续地进行更改。由于 count 数量可能会叠加增长因此这里的更改操作可以是简单的插入Insert也可以是对之前数据的更新Update。换句话说用来定义结果表的更新日志changelog流中包含了 INSERT 和 UPDATE 两种操作。这种持续查询被称为更新查询Update Query更新查询得到的结果表如果想要转换成 DataStream必须调用 toChangelogStream()方法。 具体步骤解释如下
当查询启动时原始动态表 EventTable 为空当第一行 Alice 的点击数据插入 EventTable 表时查询开始计算结果表urlCountTable中插入一行数据[Alice1]。当第二行 Bob 点击数据插入 EventTable 表时查询将更新结果表并插入新行[Bob1]。第三行数据到来同样是 Alice 的点击事件这时不会插入新行而是生成一个针对已有行的更新操作。这样结果表中第一行[Alice1]就更新为[Alice2]。当第四行 Cary 的点击数据插入到 EventTable 表时查询将第三行[Cary1]插入到结果表中。
2追加Append查询
上面的例子中查询过程用到了分组聚合结果表中就会产生更新操作。如果我们执行一个简单的条件查询结果表中就会像原始表 EventTable 一样只有插入Insert操作了。
Table aliceVisitTable tableEnv.sqlQuery(SELECT url, user FROM EventTable WHERE user Cary);这样的持续查询就被称为追加查询Append Query它定义的结果表的更新日志changelog流中只有 INSERT 操作。追加查询得到的结果表转换成 DataStream 调用方法没有限制可以直接用 toDataStream()也可以像更新查询一样调用 toChangelogStream()。 这样看来我们似乎可以总结一个规律只要用到了聚合在之前的结果上有叠加就会产生更新操作就是一个更新查询。但事实上更新查询的判断标准是结果表中的数据是否会有 UPDATE 操作如果聚合的结果不再改变那么同样也不是更新查询。 什么时候聚合的结果会保持不变呢一个典型的例子就是窗口聚合。
我们考虑开一个滚动窗口统计每一小时内所有用户的点击次数并在结果表中增加一个endT 字段表示当前统计窗口的结束时间。这时结果表的字段定义如下
[user: VARCHAR, // 用户名endT: TIMESTAMP, // 窗口结束时间cnt: BIGINT // 用户访问 url 的次数
]如图所示与之前的分组聚合一样当原始动态表不停地插入新的数据时查询得到的结果 result 会持续地进行更改。比如时间戳在 12:00:00 到 12:59:59 之间的有四条数据其中 Alice 三次点击、Bob 一次点击所以当水位线达到 13:00:00 时窗口关闭输出到结果表中的就是新增两条数据[Alice, 13:00:00, 3]和[Bob, 13:00:00, 1]。同理当下一小时的窗口关闭时也会将统计结果追加到 result 表后面而不会更新之前的数据。 所以我们发现由于窗口的统计结果是一次性写入结果表的所以结果表的更新日志流中只会包含插入 INSERT 操作而没有更新 UPDATE 操作。所以这里的持续查询依然是一个追加Append查询。结果表 result 如果转换成 DataStream可以直接调用 toDataStream()方法。
需要注意的是由于涉及时间窗口我们还需要为事件时间提取时间戳和生成水位线。完整代码如下
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 读取数据源并分配时间戳、生成水位线SingleOutputStreamOperatorEvent eventStream env.fromElements(new Event(Alice, ./home, 1000L),new Event(Bob, ./cart, 1000L),new Event(Alice, ./prod?id1, 25 * 60 * 1000L),new Event(Alice, ./prod?id4, 55 * 60 * 1000L),new Event(Bob, ./prod?id5, 3600 * 1000L 60 * 1000L),new Event(Cary, ./home, 3600 * 1000L 30 * 60 * 1000L),new Event(Cary, ./prod?id7, 3600 * 1000L 59 * 60 * 1000L)).assignTimestampsAndWatermarks(WatermarkStrategy.EventforMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssignerEvent() {Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));// 创建表环境StreamTableEnvironment tableEnv StreamTableEnvironment.create(env);// 将数据流转换成表并指定时间属性Table eventTable tableEnv.fromDataStream(eventStream,$(user),$(url),$(timestamp).rowtime().as(ts));// 为方便在SQL中引用在环境中注册表EventTabletableEnv.createTemporaryView(EventTable, eventTable);// 设置累积窗口执行SQL统计查询Table result tableEnv.sqlQuery(SELECT user, window_end AS endT, COUNT(url) AS cnt FROM TABLE( CUMULATE( TABLE EventTable, // 定义累积窗口DESCRIPTOR(ts), INTERVAL 30 MINUTE, INTERVAL 1 HOUR)) GROUP BY user, window_start, window_end );tableEnv.toDataStream(result).print();env.execute();
}运行结果
I[Alice, 1970-01-01T01:00, 3]
I[Bob, 1970-01-01T01:00, 1]
I[Cary, 1970-01-01T02:00, 2]
I[Bob, 1970-01-01T02:00, 1]可以看到所有输出结果都以I 为前缀表示都是以 INSERT 操作追加到结果表中的这是一个追加查询所以我们直接使用 toDataStream()转换成流是没有问题的。这里输出的window_end 是一个 TIMESTAMP 类型由于我们直接以一个长整型数作为事件发生的时间戳所以可以看到对应的都是 1970 年 1 月 1 日的时间。
3查询限制
在实际应用中有些持续查询会因为计算代价太高而受到限制。所谓的“代价太高”可能是由于需要维护的状态持续增长也可能是由于更新数据的计算太复杂。 状态大小用持续查询做流处理往往会运行至少几周到几个月所以持续查询处理的数据总量可能非常大。例如我们之前举的更新查询的例子需要记录每个用户访问 url 的次数。如果随着时间的推移用户数越来越大那么要维护的状态也将逐渐增长最终可能会耗尽存储空间导致查询失败。 SELECT user, COUNT(url) FROM clicks GROUP BY user;更新计算对于有些查询来说更新计算的复杂度可能很高。每来一条新的数据更新结果的时候可能需要全部重新计算并且对很多已经输出的行进行更新。一个典型的例子就是 RANK()函数它会基于一组数据计算当前值的排名。例如下面的 SQL 查询会根据用户最后一次点击的时间为每个用户计算一个排名。当我们收到一个新的数据用户的最后一次点击时间lastAction就会更新进而所有用户必须重新排序计算一个新的排名。当一个用户的排名发生改变时被他超过的那些用户的排名也会改变这样的更新操作无疑代价巨大而且还会随着用户的增多越来越严重。 SELECT user, RANK() OVER (ORDER BY lastAction)FROM (SELECT user, MAX(ts) AS lastAction FROM EventTable GROUP BY user
);将动态表转换为流(Append-only、Retract、Upsert)
与关系型数据库中的表一样动态表也可以通过插入Insert、更新Update和删除Delete操作进行持续的更改。将动态表转换为流或将其写入外部系统时就需要对这些更改操作进行编码通过发送编码消息的方式告诉外部系统要执行的操作。在 Flink 中Table API 和 SQL支持三种编码方式 仅追加Append-only流仅通过插入Insert更改来修改的动态表可以直接转换为“仅追加”流。这个流中发出的数据其实就是动态表中新增的每一行。 撤回Retract流撤回流是包含两类消息的流添加add消息和撤回retract消息。具体的编码规则是INSERT 插入操作编码为 add 消息DELETE 删除操作编码为 retract消息而 UPDATE 更新操作则编码为被更改行的 retract 消息和更新后行新行的 add 消息。这样我们可以通过编码后的消息指明所有的增删改操作一个动态表就可以转换为撤回流了。 可以看到更新操作对于撤回流来说对应着两个消息之前数据的撤回删除和新数据的插入。 更新插入Upsert流更新插入流中只包含两种类型的消息更新插入upsert消息和删除delete消息。所谓的“upsert”其实是“update”和“insert”的合成词所以对于更新插入流来说INSERT 插入操作和UPDATE更新操作统一被编码为upsert消息而DELETE删除操作则被编码为delete消息。 既然更新插入流中不区分插入insert和更新update那我们自然会想到一个问题如果希望更新一行数据时怎么保证最后做的操作不是插入呢 这就需要动态表中必须有唯一的键key。通过这个 key 进行查询如果存在对应的数据就做更新update如果不存在就直接插入insert。这是一个动态表可以转换为更新插入流的必要条件。当然收到这条流中数据的外部系统也需要知道这唯一的键key这样才能正确地处理消息。
需要注意的是在代码里将动态表转换为 DataStream 时只支持仅追加append-only和撤回retract流我们调用 toChangelogStream()得到的其实就是撤回流这也很好理解DataStream 中并没有 key 的定义所以只能通过两条消息一减一增来表示更新操作。而连接到外部系统时则可以支持不同的编码方法这取决于外部系统本身的特性。
时间属性和窗口
基于时间的操作比如时间窗口需要定义相关的时间语义和时间数据来源的信息。在Table API 和 SQL 中会给表单独提供一个逻辑上的时间字段专门用来在表处理程序中指示时间。
所以所谓的时间属性time attributes其实就是每个表模式结构schema的一部分。它可以在创建表的 DDL 里直接定义为一个字段也可以在 DataStream 转换成表时定义。一旦定义了时间属性它就可以作为一个普通字段引用并且可以在基于时间的操作中使用。
时间属性的数据类型为 TIMESTAMP它的行为类似于常规时间戳可以直接访问并且进行计算。
按照时间语义的不同我们可以把时间属性的定义分成事件时间event time和处理时间processing time两种情况。
事件时间
我们在实际应用中最常用的就是事件时间。在事件时间语义下允许表处理程序根据每个数据中包含的时间戳也就是事件发生的时间来生成结果。
事件时间语义最大的用途就是处理乱序事件或者延迟事件的场景。我们通过设置水位线watermark来表示事件时间的进展而水位线可以根据数据的最大时间戳设置一个延迟时间。这样即使在出现乱序的情况下对数据的处理也可以获得正确的结果。
为了处理无序事件并区分流中的迟到事件。Flink 需要从事件数据中提取时间戳并生成水位线用来推进事件时间的进展。
事件时间属性可以在创建表 DDL 中定义也可以在数据流和表的转换中定义。
1在创建表的 DDL 中定义
在创建表的 DDLCREATE TABLE 语句中可以增加一个字段通过 WATERMARK 语句来定义事件时间属性。WATERMARK 语句主要用来定义水位线watermark的生成表达式这个表达式会将带有事件时间戳的字段标记为事件时间属性并在它基础上给出水位线的延迟时间。具体定义方式如下
CREATE TABLE EventTable(user STRING,url STRING,ts TIMESTAMP(3),WATERMARK FOR ts AS ts - INTERVAL 5 SECOND
) WITH (...
);这里我们把 ts 字段定义为事件时间属性而且基于 ts 设置了 5 秒的水位线延迟。这里的“5 秒”是以“时间间隔”的形式定义的格式是 INTERVAL 数值 时间单位
INTERVAL 5 SECOND 这里的数值必须用单引号引起来而单位用 SECOND 和 SECONDS 是等效的。
Flink 中支持的事件时间属性数据类型必须为 TIMESTAMP 或者 TIMESTAMP_LTZ。这里TIMESTAMP_LTZ 是指带有本地时区信息的时间戳TIMESTAMP WITH LOCAL TIME ZONE一般情况下如果数据中的时间戳是“年-月-日-时-分-秒”的形式那就是不带时区信息的可以将事件时间属性定义为 TIMESTAMP 类型。
而如果原始的时间戳就是一个长整型的毫秒数这时就需要另外定义一个字段来表示事件时间属性类型定义为 TIMESTAMP_LTZ 会更方便
CREATE TABLE events (user STRING,url STRING,ts BIGINT,ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),WATERMARK FOR ts_ltz AS time_ltz - INTERVAL 5 SECOND
) WITH (...
);这里我们另外定义了一个字段 ts_ltz是把长整型的 ts 转换为 TIMESTAMP_LTZ 得到的进而使用 WATERMARK 语句将它设为事件时间属性并设置 5 秒的水位线延迟。
2在数据流转换为表时定义
事件时间属性也可以在将 DataStream 转换为表的时候来定义。我们调用 fromDataStream() 方法创建表时可以追加参数来定义表中的字段结构这时可以给某个字段加上.rowtime() 后缀就表示将当前字段指定为事件时间属性。
这个字段可以是数据中本不存在、额外追加上去的“逻辑字段”就像之前 DDL 中定义的第二种情况也可以是本身固有的字段那么这个字段就会被事件时间属性所覆盖类型也会被转换为 TIMESTAMP。
不论那种方式时间属性字段中保存的都是事件的时间戳TIMESTAMP 类型。
需要注意的是这种方式只负责指定时间属性而时间戳的提取和水位线的生成应该之前就在 DataStream 上定义好了。由于 DataStream 中没有时区概念因此 Flink 会将事件时间属性解析成不带时区的 TIMESTAMP 类型所有的时间值都被当作 UTC 标准时间。
在代码中的定义方式如下
// 方法一:
// 流中数据类型为二元组 Tuple2包含两个字段需要自定义提取时间戳并生成水位线
DataStreamTuple2String, String stream inputStream.assignTimestampsAndWatermarks(...);
// 声明一个额外的逻辑字段作为事件时间属性
Table table tEnv.fromDataStream(stream, $(user), $(url), $(ts).rowtime());
// 方法二:
// 流中数据类型为三元组 Tuple3最后一个字段就是事件时间戳
DataStreamTuple3String, String, Long stream inputStream.assignTimestampsAndWatermarks(...);
// 不再声明额外字段直接用最后一个字段作为事件时间属性
Table table tEnv.fromDataStream(stream, $(user), $(url), $(ts).rowtime());处理时间
相比之下处理时间就比较简单了它就是我们的系统时间使用时不需要提取时间戳timestamp和生成水位线watermark。因此在定义处理时间属性时必须要额外声明一个字段专门用来保存当前的处理时间。
类似地处理时间属性的定义也有两种方式创建表 DDL 中定义或者在数据流转换成表时定义。
1在创建表的 DDL 中定义
在创建表的 DDLCREATE TABLE 语句中可以增加一个额外的字段通过调用系统内置的 PROCTIME() 函数来指定当前的处理时间属性返回的类型是 TIMESTAMP_LTZ。
CREATE TABLE EventTable(user STRING,url STRING,ts AS PROCTIME()
) WITH (...
);这里的时间属性其实是以“计算列”computed column的形式定义出来的。所谓的计算列是 Flink SQL 中引入的特殊概念可以用一个 AS 语句来在表中产生数据中不存在的列并且可以利用原有的列、各种运算符及内置函数。在前面事件时间属性的定义中将 ts 字段转换成 TIMESTAMP_LTZ 类型的 ts_ltz也是计算列的定义方式。
2在数据流转换为表时定义
处 理 时 间 属 性 同 样 可 以 在 将 DataStream 转 换 为 表 的 时 候 来 定 义 。 我们调用fromDataStream()方法创建表时可以用**.proctime()后缀来指定处理时间属性字段。由于处理时间是系统时间原始数据中并没有这个字段所以处理时间属性一定不能定义在一个已有字段上只能定义在表结构所有字段的最后作为额外的逻辑字段出现**。
代码中定义处理时间属性的方法如下
DataStreamTuple2String, String stream ...;
// 声明一个额外的字段作为处理时间属性字段
Table table tEnv.fromDataStream(stream, $(user), $(url), $(ts).proctime());窗口Window
有了时间属性接下来就可以定义窗口进行计算了。我们知道窗口可以将无界流切割成大小有限的“桶”bucket来做计算通过截取有限数据集来处理无限的流数据。在 DataStream API 中提供了对不同类型的窗口进行定义和处理的接口而在 Table API 和 SQL 中类似的功能也都可以实现。
1分组窗口Group Window老版本
在 Flink 1.12 之前的版本中Table API 和 SQL 提供了一组“分组窗口”Group Window函数常用的时间窗口如滚动窗口、滑动窗口、会话窗口都有对应的实现具体在 SQL 中就是调用 TUMBLE()、HOP()、SESSION()传入时间属性字段、窗口大小等参数就可以了。以滚动窗口为例
TUMBLE(ts, INTERVAL 1 HOUR)这里的 ts 是定义好的时间属性字段窗口大小用“时间间隔”INTERVAL 来定义。
在进行窗口计算时分组窗口是将窗口本身当作一个字段对数据进行分组的可以对组内的数据进行聚合。基本使用方式如下
Table result tableEnv.sqlQuery(SELECT user, TUMBLE_END(ts, INTERVAL 1 HOUR) as endT, COUNT(url) AS cnt FROM EventTable GROUP BY // 使用窗口和用户名进行分组user, TUMBLE(ts, INTERVAL 1 HOUR) // 定义 1 小时滚动窗口
);这里定义了 1 小时的滚动窗口将窗口和用户 user 一起作为分组的字段。用聚合函数COUNT()对分组数据的个数进行了聚合统计并将结果字段重命名为cnt用TUPMBLE_END()函数获取滚动窗口的结束时间重命名为 endT 提取出来。
分组窗口的功能比较有限只支持窗口聚合所以目前已经处于弃用deprecated的状态。
2窗口表值函数Windowing TVFs新版本
从 1.13 版本开始Flink 开始使用窗口表值函数Windowing table-valued functionsWindowing TVFs来定义窗口。窗口表值函数是 Flink 定义的多态表函数PTF可以将表进行扩展后返回。表函数table function可以看作是返回一个表的函数。
目前 Flink 提供了以下几个窗口 TVF
滚动窗口Tumbling Windows滑动窗口Hop Windows跳跃窗口累积窗口Cumulate Windows会话窗口Session Windows目前尚未完全支持。
窗口表值函数可以完全替代传统的分组窗口函数。窗口 TVF 更符合 SQL 标准性能得到了优化拥有更强大的功能可以支持基于窗口的复杂计算例如窗口 Top-N、窗口联结window join等等。当然目前窗口 TVF 的功能还不完善会话窗口和很多高级功能还不支持不过正在快速地更新完善。
在窗口 TVF 的返回值中除去原始表中的所有列还增加了用来描述窗口的额外 3 个列“窗口起始点”window_start、“窗口结束点”window_end、“窗口时间”window_time。起始点和结束点比较好理解这里的“窗口时间”指的是窗口中的时间属性它的值等于window_end - 1ms所以相当于是窗口中能够包含数据的最大时间戳。
在 SQL 中的声明方式与以前的分组窗口是类似的直接调用 TUMBLE()、HOP()、CUMULATE()就可以实现滚动、滑动和累积窗口不过传入的参数会有所不同。下面我们就分别对这几种窗口 TVF 进行介绍。
1滚动窗口TUMBLE
滚动窗口在 SQL 中的概念与 DataStream API 中的定义完全一样是长度固定、时间对齐、无重叠的窗口一般用于周期性的统计计算。
在 SQL 中通过调用 TUMBLE()函数就可以声明一个滚动窗口只有一个核心参数就是窗口大小size。在 SQL 中不考虑计数窗口所以滚动窗口就是滚动时间窗口参数中还需要将当前的时间属性字段传入。具体声明如下
TUMBLE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL 1 HOUR)这里基于时间字段 ts对表 EventTable 中的数据开了大小为 1 小时的滚动窗口。窗口会将表中的每一行数据按照它们 ts 的值分配到一个指定的窗口中。
2滑动窗口HOP
滑动窗口的使用与滚动窗口类似可以通过设置滑动步长来控制统计输出的频率。在 SQL中通过调用 HOP()来声明滑动窗口除了也要传入表名、时间属性外还需要传入窗口大小size和滑动步长slide两个参数。
HOP(TABLE EventTable, DESCRIPTOR(ts), INTERVAL 5 MINUTES, INTERVAL 1 HOURS));紧跟在时间属性字段后面的第三个参数是步长slide第四个参数才是窗口大小size。
3累积窗口CUMULATE
滚动窗口和滑动窗口可以用来计算大多数周期性的统计指标。不过在实际应用中还会遇到这样一类需求我们的统计周期可能较长因此希望中间每隔一段时间就输出一次当前的统计值与滑动窗口不同的是在一个统计周期内我们会多次输出统计值它们应该是不断叠加累积的。
例如我们按天来统计网站的 PVPage View页面浏览量如果用 1 天的滚动窗口那需要到每天 24 点才会计算一次输出频率太低如果用滑动窗口计算频率可以更高但统计的就变成了“过去 24 小时的 PV”。所以我们真正希望的是还是按照自然日统计每天的PV不过需要每隔 1 小时就输出一次当天到目前为止的 PV 值。这种特殊的窗口就叫作“累积窗口”Cumulate Window。
累积窗口是窗口 TVF 中新增的窗口功能它会在一定的统计周期内进行累积计算。累积窗口中有两个核心的参数最大窗口长度max window size和累积步长step。
CUMULATE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL 1 HOURS, INTERVAL 1 DAYS))第三个参数为步长 step第四个参数则是最大窗口长度最大窗口长度其实就是我们所说的“统计周期”最终目的就是统计这段时间内的数据。
聚合Aggregation查询
在 SQL 中一个很常见的功能就是对某一列的多条数据做一个合并统计得到一个或多个结果值比如求和、最大最小值、平均值等等这种操作叫作聚合Aggregation查询。Flink 中的 SQL 是流处理与标准 SQL 结合的产物所以聚合查询也可以分成两种流处理中特有的聚合主要指窗口聚合以及 SQL 原生的聚合查询方式。
分组聚合
SQL 中一般所说的聚合我们都很熟悉主要是通过内置的一些聚合函数来实现的比如SUM()、MAX()、MIN()、AVG()以及 COUNT()。它们的特点是对多条输入数据进行计算得到一个唯一的值属于“多对一”的转换。比如我们可以通过下面的代码计算输入数据的个数
Table eventCountTable tableEnv.sqlQuery(select COUNT(*) from EventTable);而更多的情况下我们可以通过 GROUP BY 子句来指定分组的键key从而对数据按照某个字段做一个分组统计。例如之前我们举的例子可以按照用户名进行分组统计每个用户点击 url 的次数
SELECT user, COUNT(url) as cnt FROM EventTable GROUP BY user在流处理中分组聚合同样是一个持续查询而且是一个更新查询得到的是一个动态表每当流中有一个新的数据到来时都会导致结果表的更新操作。因此想要将结果表转换成流或输出到外部系统必须采用撤回流retract stream或更新插入流upsert stream的编码方式如果在代码中直接转换成 DataStream 打印输出需要调用 toChangelogStream()。
窗口聚合
在流处理中往往需要将无限数据流划分成有界数据集这就是所谓的“窗口”。
在 Flink 的 Table API 和 SQL 中窗口的计算是通过“窗口聚合”window aggregation来实现的。与分组聚合类似窗口聚合也需要调用 SUM()、MAX()、MIN()、COUNT()一类的聚合函数通过 GROUP BY 子句来指定分组的字段。只不过窗口聚合时需要将窗口信息作为分组 key 的一部分定义出来。在 Flink 1.12 版本之前是直接把窗口自身作为分组 key 放在GROUP BY 之后的所以也叫“分组窗口聚合”而 1.13 版本开始使用了“窗口表值函数”Windowing TVF窗口本身返回的是就是一个表所以窗口会出现在 FROM 后面GROUP BY 后面的则是窗口新增的字段 window_start 和 window_end。
比如用窗口 TVF 重新实现一下
Table result tableEnv.sqlQuery(SELECT user, window_end AS endT, COUNT(url) AS cnt FROM TABLE( TUMBLE( TABLE EventTable, DESCRIPTOR(ts), INTERVAL 1 HOUR)) GROUP BY user, window_start, window_end
);这里我们以 ts 作为时间属性字段、基于 EventTable 定义了 1 小时的滚动窗口希望统计出每小时每个用户点击 url 的次数。用来分组的字段是用户名 user以及表示窗口的window_start 和 window_end而 TUMBLE()是表值函数所以得到的是一个表Table我们的聚合查询就是在这个 Table 中进行的。
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 读取数据源并分配时间戳、生成水位线SingleOutputStreamOperatorEvent eventStream env.fromElements(new Event(Alice, ./home, 1000L),new Event(Bob, ./cart, 1000L),new Event(Alice, ./prod?id1, 25 * 60 * 1000L),new Event(Alice, ./prod?id4, 55 * 60 * 1000L),new Event(Bob, ./prod?id5, 3600 * 1000L 60 * 1000L),new Event(Cary, ./home, 3600 * 1000L 30 * 60 * 1000L),new Event(Cary, ./prod?id7, 3600 * 1000L 59 * 60 * 1000L)).assignTimestampsAndWatermarks(WatermarkStrategy.EventforMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssignerEvent() {Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));// 创建表环境StreamTableEnvironment tableEnv StreamTableEnvironment.create(env);// 将数据流转换成表并指定时间属性Table eventTable tableEnv.fromDataStream(eventStream,$(user),$(url),$(timestamp).rowtime().as(ts));// 为方便在SQL中引用在环境中注册表EventTabletableEnv.createTemporaryView(EventTable, eventTable);// 设置累积窗口执行SQL统计查询Table result tableEnv.sqlQuery(SELECT user, window_end AS endT, COUNT(url) AS cnt FROM TABLE( CUMULATE( TABLE EventTable, // 定义累积窗口DESCRIPTOR(ts), INTERVAL 30 MINUTE, INTERVAL 1 HOUR)) GROUP BY user, window_start, window_end );tableEnv.toDataStream(result).print();env.execute();
}这里我们使用了统计周期为 1 小时、累积间隔为 30 分钟的累积窗口。执行结果
I[Alice, 1970-01-01T00:30, 2]
I[Bob, 1970-01-01T00:30, 1]
I[Alice, 1970-01-01T01:00, 3]
I[Bob, 1970-01-01T01:00, 1]
I[Bob, 1970-01-01T01:30, 1]
I[Cary, 1970-01-01T02:00, 2]
I[Bob, 1970-01-01T02:00, 1]与分组聚合不同窗口聚合不会将中间聚合的状态输出只会最后输出一个结果。我们可以看到所有数据都是以 INSERT 操作追加到结果动态表中的因此输出每行前面都有I 的前缀。所以窗口聚合查询都属于追加查询没有更新操作代码中可以直接用 toDataStream()将结果表转换成流。
开窗Over聚合
在标准 SQL 中还有另外一类比较特殊的聚合方式可以针对每一行计算一个聚合值。比如说我们可以以每一行数据为基准计算它之前 1 小时内所有数据的平均值也可以计算它之前 10 个数的平均值。就好像是在每一行上打开了一扇窗户、收集数据进行统计一样这就是所谓的“开窗函数”。
开窗函数的聚合与之前两种聚合有本质的不同分组聚合、窗口 TVF聚合都是“多对一”的关系将数据分组之后每组只会得到一个聚合结果而开窗函数是对每行都要做一次开窗聚合因此聚合之后表中的行数不会有任何减少是一个“多对多”的关系。与标准 SQL 中一致Flink SQL 中的开窗函数也是通过 OVER 子句来实现的所以有时开窗聚合也叫作“OVER 聚合”Over Aggregation。基本语法如下
SELECT聚合函数 OVER ([PARTITION BY 字段 1[, 字段 2, ...]]ORDER BY 时间属性字段开窗范围),...
FROM ...这里 OVER 关键字前面是一个聚合函数它会应用在后面 OVER 定义的窗口上。在 OVER子句中主要有以下几个部分 PARTITION BY可选用来指定分区的键key类似于 GROUP BY 的分组这部分是可选的 ORDER BYOVER 窗口是基于当前行扩展出的一段数据范围选择的标准可以基于时间也可以基于数量。不论那种定义数据都应该是以某种顺序排列好的而表中的数据本身是无序的。所以在OVER 子句中必须用 ORDER BY 明确地指出数据基于那个字段排序。在 Flink 的流处理中目前只支持按照时间属性的升序排列所以这里 ORDER BY 后面的字段必须是定义好的时间属性。 开窗范围对于开窗函数而言还有一个必须要指定的就是开窗的范围也就是到底要扩展多少行来做聚合。这个范围是由 BETWEEN 下界 AND 上界 来定义的也就是“从下界到上界”的范围。目前支持的上界只能是 CURRENT ROW也就是定义一个“从之前某一行到当前行”的范围所以一般的形式为 BETWEEN ... PRECEDING AND CURRENT ROW开窗选择的范围可以基于时间也可以基于数据的数量。所以开窗范围还应该在两种模式之间做出选择范围间隔RANGE intervals和行间隔ROW intervals。 范围间隔范围间隔以 RANGE 为前缀就是基于 ORDER BY 指定的时间字段去选取一个范围一般就是当前行时间戳之前的一段时间。例如开窗范围选择当前行之前 1 小时的数据 RANGE BETWEEN INTERVAL 1 HOUR PRECEDING AND CURRENT ROW行间隔行间隔以 ROWS 为前缀就是直接确定要选多少行由当前行出发向前选取就可以了。例如开窗范围选择当前行之前的 5 行数据最终聚合会包括当前行所以一共 6 条数据 ROWS BETWEEN 5 PRECEDING AND CURRENT ROW具体例子
SELECT user, ts,COUNT(url) OVER (PARTITION BY userORDER BY tsRANGE BETWEEN INTERVAL 1 HOUR PRECEDING AND CURRENT ROW) AS cnt
FROM EventTable这里我们以 ts 作为时间属性字段对 EventTable 中的每行数据都选取它之前 1 小时的所有数据进行聚合统计每个用户访问 url 的总次数并重命名为 cnt。最终将表中每行的 userts 以及扩展出 cnt 提取出来。
由于聚合范围上界只能到当前行新到的数据一般不会影响之前数据的聚合结果所以结果表只需要不断插入INSERT就可以了。执行上面 SQL 得到的结果表可以用 toDataStream()直接转换成流打印输出。
应用实例—Top N
1普通 Top N
在 Flink SQL 中是通过 OVER 聚合和一个条件筛选来实现 Top N 的。具体来说是通过将一个特殊的聚合函数**ROW_NUMBER()**应用到OVER窗口上统计出每一行排序后的行号作为一个字段提取出来然后再用 WHERE 子句筛选行号小于等于 N 的那些行返回。
SELECT user, url, ts, row_num
FROM (SELECT *,ROW_NUMBER() OVER (PARTITION BY userORDER BY CHAR_LENGTH(url) desc ) AS row_numFROM EventTable)
WHERE row_num 22窗口 Top N
除了直接对数据进行 Top N 的选取我们也可以针对窗口来做 Top N。
例如电商行业实际应用中往往有这样的需求统计一段时间内的热门商品。这就需要先开窗口在窗口中统计每个商品的点击量然后将统计数据收集起来按窗口进行分组并按点击量大小降序排序选取前 N 个作为结果返回。
具体来说可以先做一个窗口聚合将窗口信息 window_start、window_end 连同每个商品的点击量一并返回这样就得到了聚合的结果表包含了窗口信息、商品和统计的点击量。接下来就可以像一般的 Top N 那样定义 OVER 窗口了按窗口分组按点击量排序用ROW_NUMBER()统计行号并筛选前 N 行就可以得到结果。所以窗口 Top N 的实现就是窗口聚合与 OVER 聚合的结合使用。
下面是一个具体案例的代码实现。由于用户访问事件 Event 中没有商品相关信息因此我们统计的是每小时内有最多访问行为的用户取前两名相当于是一个每小时活跃用户的查询。
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 读取数据源并分配时间戳、生成水位线SingleOutputStreamOperatorEvent eventStream env.fromElements(new Event(Alice, ./home, 1000L),new Event(Bob, ./cart, 1000L),new Event(Alice, ./prod?id1, 25 * 60 * 1000L),new Event(Alice, ./prod?id4, 55 * 60 * 1000L),new Event(Bob, ./prod?id5, 3600 * 1000L 60 * 1000L),new Event(Cary, ./home, 3600 * 1000L 30 * 60 * 1000L),new Event(Cary, ./prod?id7, 3600 * 1000L 59 * 60 * 1000L)).assignTimestampsAndWatermarks(WatermarkStrategy.EventforMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssignerEvent() {Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));// 创建表环境StreamTableEnvironment tableEnv StreamTableEnvironment.create(env);// 将数据流转换成表并指定时间属性Table eventTable tableEnv.fromDataStream(eventStream,$(user),$(url),$(timestamp).rowtime().as(ts)// 将timestamp指定为事件时间并命名为ts);// 为方便在SQL中引用在环境中注册表EventTabletableEnv.createTemporaryView(EventTable, eventTable);// 定义子查询进行窗口聚合得到包含窗口信息、用户以及访问次数的结果表String subQuery SELECT window_start, window_end, user, COUNT(url) as cnt FROM TABLE ( TUMBLE( TABLE EventTable, DESCRIPTOR(ts), INTERVAL 1 HOUR )) GROUP BY window_start, window_end, user ;// 定义Top N的外层查询String topNQuery SELECT * FROM ( SELECT *, ROW_NUMBER() OVER ( PARTITION BY window_start, window_end ORDER BY cnt desc ) AS row_num FROM ( subQuery )) WHERE row_num 2;// 执行SQL得到结果表Table result tableEnv.sqlQuery(topNQuery);tableEnv.toDataStream(result).print();env.execute();
}这里为了更好的代码可读性我们将 SQL 拆分成了用来做窗口聚合的内部子查询和套用 Top N 模板的外层查询。
首先基于 ts 时间字段定义 1 小时滚动窗口统计 EventTable 中每个用户的访问次数重命名为 cnt为了方便后面做排序我们将窗口信息 window_start 和 window_end 也提取出来与 user 和 cnt 一起作为聚合结果表中的字段。然后套用 Top N 模板对窗口聚合的结果表中每一行数据进行 OVER 聚合统计行号。这里以窗口信息进行分组按访问次数 cnt 进行排序并筛选行号小于等于 2 的数据就可以得到每个窗口内访问次数最多的前两个用户了。
运行结果如下
I[1970-01-01T00:00, 1970-01-01T01:00, Alice, 3, 1]
I[1970-01-01T00:00, 1970-01-01T01:00, Bob, 1, 2]
I[1970-01-01T01:00, 1970-01-01T02:00, Cary, 2, 1]
I[1970-01-01T01:00, 1970-01-01T02:00, Bob, 1, 2]由于窗口的统计结果只会最终输出一次所以排名也是确定的这里结果表中只有插入INSERT操作。也就是说窗口 Top N 是追加查询可以直接用 toDataStream()将结果表转换成流打印输出。
联结Join查询
按照数据库理论关系型表的设计往往至少需要满足第三范式3NF表中的列都直接依赖于主键这样就可以避免数据冗余和更新异常。不过这样一来我们就无法从一个单独的表中提取所有想要的数据了。在标准 SQL 中可以将多个表连接合并起来从中查询出想要的信息这种操作就是表的联结Join。
在流处理中动态表的 Join 对应着两条数据流的 Join 操作。与上一节的聚合查询类似Flink SQL 中的联结查询大体上也可以分为两类SQL 原生的联结查询方式和流处理中特有的联结查询。
常规联结查询
常规联结Regular Join是 SQL 中原生定义的 Join 方式是最通用的一类联结操作。它的具体语法与标准 SQL 的联结完全相同通过关键字 JOIN 来联结两个表后面用关键字 ON来指明联结条件。
在两个动态表的联结中任何一侧表的插入INSERT或更改UPDATE操作都会让联结的结果表发生改变。例如如果左侧有新数据到来那么它会与右侧表中所有之前的数据进行联结合并右侧表之后到来的新数据也会与这条数据连接合并。所以常规联结查询一般是更新Update查询。
与标准 SQL 一致Flink SQL 的常规联结也可以分为内联结INNER JOIN和外联结OUTER JOIN区别在于结果中是否包含不符合联结条件的行。目前**仅支持“等值条件”**作为联结条件也就是关键字 ON 后面必须是判断两表中字段相等的逻辑表达式。
1等值内联结INNER Equi-JOIN
内联结用 INNER JOIN 来定义会返回两表中符合联接条件的所有行的组合也就是所谓的笛卡尔积Cartesian product。目前仅支持等值联结条件。
SELECT * FROM Order INNER JOIN Product ON Order.product_id Product.id这里是一个内联结联结条件是订单数据的 product_id 和商品数据的 id 相等。
2等值外联结OUTER Equi-JOIN
与内联结类似外联结也会返回符合联结条件的所有行的笛卡尔积另外还可以将某一侧表中找不到任何匹配的行也单独返回。Flink SQL 支持左外LEFT JOIN、右外RIGHT JOIN和全外FULL OUTER JOIN分别表示会将左侧表、右侧表以及双侧表中没有任何匹配的行返回。例如订单表中未必包含了商品表中的所有 ID为了将哪些没有任何订单的商品信息也查询出来我们就可以使用右外联结RIGHT JOIN。当然外联结查询目前也仅支持等值联结条件。具体用法如下
SELECT * FROM Order LEFT JOIN Product ON Order.product_id Product.id
SELECT * FROM Order RIGHT JOIN Product ON Order.product_id Product.id
SELECT * FROM Order FULL OUTER JOIN Product ON Order.product_id Product.id间隔联结查询
我们曾经学习过 DataStream API 中的双流 Join包括窗口联结window join和间隔联结interval join。两条流的 Join 就对应着 SQL 中两个表的 Join这是流处理中特有的联结方式。目前 Flink SQL 还不支持窗口联结而间隔联结则已经实现。
间隔联结Interval Join返回的同样是符合约束条件的两条中数据的笛卡尔积。只不过这里的“约束条件”除了常规的联结条件外还多了一个时间间隔的限制。具体语法有以下要点 间隔联结不需要用 JOIN 关键字直接在 FROM 后将要联结的两表列出来就可以用逗号分隔。这与标准 SQL 中的语法一致表示一个“交叉联结”Cross Join会返回两表中所有行的笛卡尔积。 联结条件用 WHERE 子句来定义用一个等值表达式描述。交叉联结之后再用 WHERE进行条件筛选效果跟内联结 INNER JOIN … ON …非常类似。 时间间隔限制我们可以在 WHERE 子句中联结条件后用 AND 追加一个时间间隔的限制条件做法是提取左右两侧表中的时间字段然后用一个表达式来指明两者需要满足的间隔限制。 具体定义方式有下面三种这里分别用 ltime 和 rtime 表示左右表中的时间字段 ltime rtimeltime rtime AND ltime rtime INTERVAL ‘10’ MINUTEltime BETWEEN rtime - INTERVAL ‘10’ SECOND AND rtime INTERVAL ‘5’ SECOND
例如
SELECT *
FROM Order o, Shipment s
WHERE o.id s.order_id
AND o.order_time BETWEEN s.ship_time - INTERVAL 4 HOUR AND s.ship_time函数
在 SQL 中我们可以把一些数据的转换操作包装起来嵌入到 SQL 查询中统一调用这就是“函数”functions。
Flink 的 Table API 和 SQL 同样提供了函数的功能。两者在调用时略有不同 Table API 中的函数是通过数据对象的方法调用来实现的 str.upperCase();而 SQL 则是直接引用函数名称传入数据作为参数 UPPER(str)由于 Table API 是内嵌在 Java 语言中的很多方法需要在类中额外添加因此扩展功能比较麻烦目前支持的函数比较少而且 Table API 也不如 SQL 的通用性强所以一般情况下较少使用。
Flink SQL 中的函数可以分为两类
一类是 SQL 中内置的系统函数直接通过函数名调用就可以能够实现一些常用的转换操作比如之前我们用到的 COUNT()、CHAR_LENGTH()、UPPER()等等而另一类函数则是用户自定义的函数UDF需要在表环境中注册才能使用。
系统函数
系统函数System Functions也叫内置函数Built-in Functions是在系统中预先实现好的功能模块。
Flink SQL 中的系统函数又主要可以分为两大类标量函数Scalar Functions和聚合函数Aggregate Functions。
1标量函数Scalar Functions
所谓的“标量”是指只有数值大小、没有方向的量所以标量函数指的就是只对输入数据做转换操作、返回一个值的函数。这里的输入数据对应在表中一般就是一行数据中 1 个或多个字段因此这种操作有点像流处理转换算子中的 map。另外对于一些没有输入参数、直接可以得到唯一结果的函数也属于标量函数。
标量函数是最常见、也最简单的一类系统函数数量非常庞大很多在标准 SQL 中也有定义。所以我们这里只对一些常见类型列举部分函数做一个简单概述。 比较函数Comparison Functions比较函数其实就是一个比较表达式用来判断两个值之间的关系返回一个布尔类型的值。这个比较表达式可以是用 、、 等符号连接两个值也可以是用关键字定义的某种判断。例如 value1 value2 判断两个值相等value1 value2 判断两个值不相等value IS NOT NULL 判断 value 不为空。 逻辑函数Logical Functions逻辑函数就是一个逻辑表达式也就是用与AND、或OR、非NOT将布尔类型的值连接起来也可以用判断语句IS、IS NOT进行真值判断返回的还是一个布尔类型的值。例如 boolean1 OR boolean2 布尔值 boolean1 与布尔值 boolean2 取逻辑或boolean IS FALSE 判断布尔值 boolean 是否为 falseNOT boolean 布尔值 boolean 取逻辑非 算术函数Arithmetic Functions进行算术计算的函数包括用算术符号连接的运算和复杂的数学运算。例如 numeric1 numeric2 两数相加POWER(numeric1, numeric2) 幂运算取数 numeric1 的 numeric2 次方RAND() 返回0.0, 1.0区间内的一个 double 类型的伪随机数 字符串函数String Functions进行字符串处理的函数。例如 string1 || string2 两个字符串的连接UPPER(string) 将字符串 string 转为全部大写CHAR_LENGTH(string) 计算字符串 string 的长度 时间函数Temporal Functions进行与时间相关操作的函数。例如 DATE string 按格式yyyy-MM-dd解析字符串 string返回类型为 SQL DateTIMESTAMP string 按格式yyyy-MM-dd HH:mm:ss[.SSS]解析返回类型为 SQL timestampCURRENT_TIME 返回本地时区的当前时间类型为 SQL time与 LOCALTIME等价INTERVAL string range 返回一个时间间隔。string 表示数值range 可以是 DAYMINUTEDAT TO HOUR 等单位也可以是 YEAR TO MONTH 这样的复合单位。如“2 年 10 个月”可以写成INTERVAL ‘2-10’ YEAR TO MONTH
2 聚合函数Aggregate Functions
聚合函数是以表中多个行作为输入提取字段进行聚合操作的函数会将唯一的聚合值作为结果返回。聚合函数应用非常广泛不论分组聚合、窗口聚合还是开窗Over聚合对数据的聚合操作都可以用相同的函数来定义。
标准 SQL 中常见的聚合函数 Flink SQL 都是支持的目前也在不断扩展为流处理应用提供更强大的功能。例如
COUNT(*) 返回所有行的数量统计个数SUM([ ALL | DISTINCT ] expression) 对某个字段进行求和操作。默认情况下省略了关键字 ALL表示对所有行求和如果指定 DISTINCT则会对数据进行去重每个值只叠加一次。RANK() 返回当前值在一组值中的排名ROW_NUMBER() 对一组值排序后返回当前值的行号。与 RANK()的功能相似
自定义函数UDF
系统函数尽管庞大也不可能涵盖所有的功能如果有系统函数不支持的需求我们就需要用自定义函数User Defined FunctionsUDF来实现了。
Flink 的 Table API 和 SQL 提供了多种自定义函数的接口以抽象类的形式定义。当前 UDF主要有以下几类
标量函数Scalar Functions将输入的标量值转换成一个新的标量值表函数Table Functions将标量值转换成一个或多个新的行数据也就是扩展成一个表聚合函数Aggregate Functions将多行数据里的标量值转换成一个新的标量值表聚合函数Table Aggregate Functions将多行数据里的标量值转换成一个或多个新的行数据。
1整体调用流程
要想在代码中使用自定义的函数我们需要首先自定义对应 UDF 抽象类的实现并在表环境中注册这个函数然后就可以在 Table API 和 SQL 中调用了。 注册函数 注册函数时需要调用表环境的 createTemporarySystemFunction()方法传入注册的函数名以及 UDF 类的 Class 对象 // 注册函数
tableEnv.createTemporarySystemFunction(MyFunction, MyFunction.class);这里 createTemporarySystemFunction()方法的意思是创建了一个“临时系统函数”所以MyFunction 函 数 名 是 全 局 的 可 以 当 作 系 统 函 数 来 使 用 我 们 也 可 以 用createTemporaryFunction()方法注册的函数就依赖于当前的数据库database和目录catalog了所以这就不是系统函数而是“目录函数”catalog function它的完整名称应该包括所属的 database 和catalog。 一般情况下我们直接用 createTemporarySystemFunction()方法将 UDF 注册为系统函数就可以了。 使用 Table API 调用函数 在 Table API 中需要使用 call()方法来调用自定义函数 tableEnv.from(MyTable).select(call(MyFunction, $(myField)));这里 call()方法有两个参数一个是注册好的函数名 MyFunction另一个则是函数调用时本身的参数。这里我们定义 MyFunction 在调用时需要传入的参数是 myField 字段。 此外在 Table API 中也可以不注册函数直接用“内联”inline的方式调用 UDF tableEnv.from(MyTable).select(call(SubstringFunction.class, $(myField)));区别只是在于 call()方法第一个参数不再是注册好的函数名而直接就是函数类的 Class对象了。 在 SQL 中调用函数 当我们将函数注册为系统函数之后在 SQL 中的调用就与内置系统函数完全一样了 tableEnv.sqlQuery(SELECT MyFunction(myField) FROM MyTable);2标量函数Scalar Functions
自定义标量函数可以把 0 个、 1 个或多个标量值转换成一个标量值它对应的输入是一行数据中的字段输出则是唯一的值。所以从输入和输出表中行数据的对应关系看标量函数是“一对一”的转换。
想要实现自定义的标量函数我们需要自定义一个类来继承抽象类 ScalarFunction并实现叫作 eval() 的求值方法。标量函数的行为就取决于求值方法的定义它必须是公有的public而且名字必须是 eval。求值方法 eval 可以重载多次任何数据类型都可作为求值方法的参数和返回值类型。
这里需要特别说明的是ScalarFunction 抽象类中并没有定义 eval()方法所以我们不能直接在代码中重写override ScalarFunction 以及其它所有的 UDF 接口都在 org.apache.flink.table.functions 中。 下面我们来看一个具体的例子。我们实现一个自定义的哈希hash函数 HashFunction用来求传入对象的哈希值。
public static class HashFunction extends ScalarFunction {// 接受任意类型输入返回 INT 型输出public int eval(DataTypeHint(inputGroup InputGroup.ANY) Object o) {return o.hashCode();}
}
// 注册函数
tableEnv.createTemporarySystemFunction(HashFunction, HashFunction.class);
// 在 SQL 里调用注册好的函数
tableEnv.sqlQuery(SELECT HashFunction(myField) FROM MyTable);3表函数Table Functions
跟标量函数一样表函数的输入参数也可以是 0 个、1 个或多个标量值不同的是它可以返回任意多行数据。“多行数据”事实上就构成了一个表所以“表函数”可以认为就是返回一个表的函数这是一个“一对多”的转换关系。之前我们介绍过的窗口 TVF本质上就是表函数。
类似地要实现自定义的表函数需要自定义类来继承抽象类 TableFunction内部必须要实现的也是一个名为 eval 的求值方法。与标量函数不同的是TableFunction 类本身是有一个泛型参数T 的这就是表函数返回数据的类型而 eval()方法没有返回类型内部也没有 return语句是通过调用 collect()方法来发送想要输出的行数据的。多么熟悉的感觉——回忆一下DataStream API 中的 FlatMapFunction 和 ProcessFunction它们的 flatMap 和 processElement 方法也没有返回值也是通过 out.collect()来向下游发送数据的。
我们使用表函数可以对一行数据得到一个表这和 Hive 中的 UDTF 非常相似。那对于原先输入的整张表来说又该得到什么呢一个简单的想法是就让输入表中的每一行与它转换得到的表进行联结join然后再拼成一个完整的大表这就相当于对原来的表进行了扩展。在 Hive 的 SQL 语法中提供了“侧向视图”lateral view也叫横向视图的功能可以将表中的一行数据拆分成多行Flink SQL 也有类似的功能是用 LATERAL TABLE 语法来实现的。
在 SQL 中调用表函数需要使用 **LATERAL TABLE(TableFunction)**来生成扩展的“侧向表”然后与原始表进行联结Join。这里的 Join 操作可以是直接做交叉联结cross join在 FROM 后用逗号分隔两个表就可以也可以是以 ON TRUE 为条件的左联结LEFT JOIN。
下面是表函数的一个具体示例。我们实现了一个分隔字符串的函数 SplitFunction可以将一个字符串转换成字符串长度的二元组。
// 注意这里的类型标注输出是 Row 类型Row 中包含两个字段word 和 length。
FunctionHint(output DataTypeHint(ROWword STRING, length INT))
public static class SplitFunction extends TableFunctionRow {public void eval(String str) {for (String s : str.split( )) {// 使用 collect()方法发送一行数据collect(Row.of(s, s.length()));}} }
// 注册函数
tableEnv.createTemporarySystemFunction(SplitFunction, SplitFunction.class);
// 在 SQL 里调用注册好的函数
// 1. 交叉联结
tableEnv.sqlQuery(SELECT myField, word, length FROM MyTable, LATERAL TABLE(SplitFunction(myField)));
// 2. 带 ON TRUE 条件的左联结
tableEnv.sqlQuery(SELECT myField, word, length FROM MyTable LEFT JOIN LATERAL TABLE(SplitFunction(myField)) ON TRUE);
// 重命名侧向表中的字段
tableEnv.sqlQuery(SELECT myField, newWord, newLength FROM MyTable LEFT JOIN LATERAL TABLE(SplitFunction(myField)) AS T(newWord, newLength) ON TRUE);4 聚合函数Aggregate Functions
用户自定义聚合函数User Defined AGGregate functionUDAGG会把一行或多行数据也就是一个表聚合成一个标量值。这是一个标准的“多对一”的转换。
自定义聚合函数需要继承抽象类 AggregateFunction。AggregateFunction 有两个泛型参数T, ACCT 表示聚合输出的结果类型ACC 则表示聚合的中间状态类型。
Flink SQL 中的聚合函数的工作原理如下
首先它需要创建一个累加器accumulator用来存储聚合的中间结果。这与DataStream API 中的 AggregateFunction 非常类似累加器就可以看作是一个聚合状态。调用createAccumulator()方法可以创建一个空的累加器对于输入的每一行数据都会调用 accumulate()方法来更新累加器这是聚合的核心过程当所有的数据都处理完之后通过调用 getValue()方法来计算并返回最终的结果。
所以每个 AggregateFunction 都必须实现以下几个方法
createAccumulator()这是创建累加器的方法。没有输入参数返回类型为累加器类型 ACC。accumulate()这是进行聚合计算的核心方法每来一行数据都会调用。它的第一个参数是确定的就是当前的累加器类型为 ACC表示当前聚合的中间状态后面的参数则是聚合函数调用时传入的参数可以有多个类型也可以不同。这个方法主要是更新聚合状态所以没有返回类型。需要注意的是accumulate()与之前的求值方法 eval()类似也是底层架构要求的必须为 public方法名必须为 accumulate且无法直接 override、只能手动实现。getValue()这是得到最终返回结果的方法。输入参数是 ACC 类型的累加器输出类型为 T。在遇到复杂类型时Flink 的类型推导可能会无法得到正确的结果。所以AggregateFunction也可以专门对累加器和返回结果的类型进行声明这是通过 getAccumulatorType()和getResultType()两个方法来指定的。
除了上面的方法还有几个方法是可选的。这些方法有些可以让查询更加高效有些是在某些特定场景下必须要实现的。比如
如果是对会话窗口进行聚合**merge()**方法就是必须要实现的它会定义累加器的合并操作而且这个方法对一些场景的优化也很有用而如果聚合函数用在 OVER 窗口聚合中就必须实现 **retract()**方法保证数据可以进行撤回操作**resetAccumulator()**方法则是重置累加器这在一些批处理场景中会比较有用。
AggregateFunction 的所有方法都必须是 公有的public不能是静态的static而且名字必须跟上面写的完全一样。 createAccumulator 、 getValue 、 getResultType 以及 getAccumulatorType 这几个方法是在抽象类 AggregateFunction 中定义的可以 override而其他则都是底层架构约定的方法。
比如我们要从学生的分数表 ScoreTable 中计算每个学生的加权平均分。为了计算加权平均值应该从输入的每行数据中提取两个值作为参数要计算的分数值 score以及它的权重weight。而在聚合过程中累加器accumulator需要存储当前的加权总和 sum以及目前数据的个数 count。这可以用一个二元组来表示也可以单独定义一个类 WeightedAvgAccum里面包含 sum 和 count 两个属性用它的对象实例来作为聚合的累加器。
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv StreamTableEnvironment.create(env);// 1. 自定义数据源从流转换SingleOutputStreamOperatorEvent eventStream env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.EventforBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssignerEvent() {Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));// 2. 将流转换成表Table eventTable tableEnv.fromDataStream(eventStream,$(user),$(url),$(timestamp).as(ts),$(rt).rowtime());tableEnv.createTemporaryView(EventTable, eventTable);// 3. 注册自定义表函数tableEnv.createTemporarySystemFunction(WeightedAverage, WeightedAverage.class);// 4. 调用UDF查询转换这里权重直接给1Table resultTable tableEnv.sqlQuery(select user, WeightedAverage(ts, 1) as weighted_avg from EventTable group by user);// 5. 输出到控制台tableEnv.executeSql(create table output ( uname STRING, weighted_avg BIGINT) WITH ( connector print));resultTable.executeInsert(output);
}// 单独定义一个累加器类型
public static class WeightedAvgAccumulator {public long sum 0; // 加权和public int count 0; // 数据个数
}// 自定义一个AggregateFunction求加权平均值
public static class WeightedAverage extends AggregateFunctionLong, WeightedAvgAccumulator{Overridepublic Long getValue(WeightedAvgAccumulator accumulator) {if (accumulator.count 0)return null; // 防止除数为0elsereturn accumulator.sum / accumulator.count;}Overridepublic WeightedAvgAccumulator createAccumulator() {return new WeightedAvgAccumulator();}// 累加计算方法类似于addpublic void accumulate(WeightedAvgAccumulator accumulator, Long iValue, Integer iWeight){accumulator.sum iValue * iWeight; // 这个值要算iWeight次accumulator.count iWeight;}}5表聚合函数Table Aggregate Functions
用户自定义表聚合函数UDTAGG可以把一行或多行数据也就是一个表聚合成另一张表结果表中可以有多行多列。很明显这就像表函数和聚合函数的结合体是一个“多对多”的转换。
自定义表聚合函数需要继承抽象类 TableAggregateFunction。TableAggregateFunction 的结构和原理与 AggregateFunction 非常类似同样有两个泛型参数T, ACC用一个 ACC 类型的累加器accumulator来存储聚合的中间结果。聚合函数中必须实现的三个方法在TableAggregateFunction 中也必须对应实现
createAccumulator()创建累加器的方法与 AggregateFunction 中用法相同。accumulate()聚合计算的核心方法与 AggregateFunction 中用法相同。emitValue()所有输入行处理完成后输出最终计算结果的方法。这个方法对应着 AggregateFunction中的 getValue()方法区别在于 emitValue 没有输出类型而输入参数有两个第一个是 ACC类型的累加器第二个则是用于输出数据的“收集器”out它的类型为 CollectT。所以很明显表聚合函数输出数据不是直接 return而是调用 out.collect()方法调用多次就可以输出多行数据了这一点与表函数非常相似。另外emitValue()在抽象类中也没有定义无法 override必须手动实现。
表聚合函数得到的是一张表在流处理中做持续查询应该每次都会把这个表重新计算输出。如果输入一条数据后只是对结果表里一行或几行进行了更新Update这时我们重新计算整个表、全部输出显然就不够高效了。为了提高处理效率TableAggregateFunction 还提供了一个 emitUpdateWithRetract() 方法它可以在结果表发生变化时以“撤回”retract老数据、发送新数据的方式增量地进行更新。如果同时定义了 emitValue()和 emitUpdateWithRetract() 两个方法在进行更新操作时会优先调用 emitUpdateWithRetract()。
表聚合函数相对比较复杂它的一个典型应用场景就是 Top N 查询。比如我们希望选出一组数据排序后的前两名这就是最简单的 TOP-2 查询。没有线程的系统函数那么我们就可以自定义一个表聚合函数来实现这个功能。在累加器中应该能够保存当前最大的两个值每当来一条新数据就在 accumulate()方法中进行比较更新最终在 emitValue()中调用两次out.collect()将前两名数据输出。
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv StreamTableEnvironment.create(env);// 1. 自定义数据源从流转换SingleOutputStreamOperatorEvent eventStream env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.EventforBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssignerEvent() {Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));// 2. 将流转换成表Table eventTable tableEnv.fromDataStream(eventStream,$(user),$(url),$(timestamp).as(ts),$(rt).rowtime());tableEnv.createTemporaryView(EventTable, eventTable);// 3. 开滚动窗口聚合得到每个用户在每个窗口中的浏览量Table windowAggTable tableEnv.sqlQuery(select user, count(url) as cnt, window_end from TABLE( TUMBLE( TABLE EventTable, DESCRIPTOR(rt), INTERVAL 10 SECOND ) ) group by user, window_start, window_end);tableEnv.createTemporaryView(AggTable, windowAggTable);// 4. 注册表聚合函数函数tableEnv.createTemporarySystemFunction(Top2, Top2.class);// 5. 在Table API中调用函数Table resultTable tableEnv.from(AggTable).groupBy($(window_end)).flatAggregate(call(Top2, $(cnt)).as(value, rank)).select($(window_end), $(value), $(rank));// 6. 输出到控制台tableEnv.toChangelogStream(resultTable).print();env.execute();
}
// 聚合累加器的类型定义包含最大的第一和第二两个数据
public static class Top2Accumulator {public Long first;public Long second;
}// 自定义表聚合函数查询一组数中最大的两个返回值为(数值排名)的二元组
public static class Top2 extends TableAggregateFunctionTuple2Long, Integer, Top2Accumulator {Overridepublic Top2Accumulator createAccumulator() {Top2Accumulator acc new Top2Accumulator();acc.first Long.MIN_VALUE; // 为方便比较初始值给最小值acc.second Long.MIN_VALUE;return acc;}// 每来一个数据调用一次判断是否更新累加器public void accumulate(Top2Accumulator acc, Long value) {if (value acc.first) {acc.second acc.first;acc.first value;} else if (value acc.second) {acc.second value;}}// 输出(数值排名)的二元组输出两行数据public void emitValue(Top2Accumulator acc, CollectorTuple2Long, Integer out) {if (acc.first ! Long.MIN_VALUE) {out.collect(Tuple2.of(acc.first, 1));}if (acc.second ! Long.MIN_VALUE) {out.collect(Tuple2.of(acc.second, 2));}}
}目前 SQL 中没有直接使用表聚合函数的方式所以需要使用 Table API 的方式来调用
// 注册表聚合函数函数
tableEnv.createTemporarySystemFunction(Top2, Top2.class);
// 在 Table API 中调用函数
tableEnv.from(MyTable).groupBy($(myField)).flatAggregate(call(Top2, $(value)).as(value, rank)).select($(myField), $(value), $(rank));SQL 客户端
有了 Table API 和 SQL我们就可以使用熟悉的 SQL 来编写查询语句进行流处理了。不过这种方式还是将 SQL 语句嵌入到 Java/Scala 代码中进行的另外写完的代码后想要提交作业还需要使用工具进行打包。这都给 Flink 的使用设置了门槛如果不是 Java/Scala 程序员即使是非常熟悉 SQL 的工程师恐怕也会望而生畏了。
Flink 为我们提供了一个工具来进行 Flink 程序的编写、测试和提交这工具叫作“SQL 客户端”。SQL 客户端提供了一个命令行交互界面CLI我们可以在里面非常容易地编写 SQL 进行查询就像使用 MySQL 一样整个 Flink 应用编写、提交的过程全变成了写 SQL不需要写一行 Java/Scala 代码。
具体使用流程如下
1首先启动本地集群
./bin/start-cluster.sh2启动 Flink SQL 客户端
./bin/sql-client.shSQL 客户端的启动脚本同样位于 Flink 的 bin 目录下。默认的启动模式是 embedded也就是说客户端是一个嵌入在本地的进程这是目前唯一支持的模式。未来会支持连接到远程 SQL客户端的模式。
3设置运行模式
启动客户端后就进入了命令行界面这时就可以开始写 SQL 了。一般我们会在开始之前对环境做一些设置比较重要的就是运行模式。
首先是表环境的运行时模式有流处理和批处理两个选项。默认为流处理
Flink SQL SET execution.runtime-mode streaming;其次是 SQL 客户端的“执行结果模式”主要有 table、changelog、tableau 三种默认为table 模式
Flink SQL SET sql-client.execution.result-mode table;table 模式就是最普通的表处理模式结果会以逗号分隔每个字段changelog 则是更新日志模式会在数据前加上“”表示插入或“-”表示撤回的前缀而 tableau 则是经典的可视化表模式结果会是一个虚线框的表格。
此外我们还可以做一些其它可选的设置比如之前提到的空闲状态生存时间TTL
Flink SQL SET table.exec.state.ttl 1000;除了在命令行进行设置我们也可以直接在 SQL 客户端的配置文件 sql-cli-defaults.yaml中进行各种配置甚至还可以在这个 yaml 文件里预定义表、函数和 catalog。
4执行 SQL 查询
接下来就可以愉快的编写 SQL 语句了这跟操作 MySQL、Oracle 等关系型数据库没什么区别。
我们可以尝试把一开始举的简单聚合例子写一下
Flink SQL CREATE TABLE EventTable( user STRING, url STRING, timestamp BIGINT ) WITH ( connector filesystem, path events.csv, format csv );
Flink SQL CREATE TABLE ResultTable ( user STRING, cnt BIGINT ) WITH ( connector print );
Flink SQL INSERT INTO ResultTable SELECT user, COUNT(url) as cnt FROM EventTable GROUP BY user;这里我们直接用 DDL 创建两张表注意需要有 WITH 定义的外部连接。一张表叫作EventTable是从外部文件 events.csv 中读取数据的这是输入数据表另一张叫作 ResultTable连接器为“print”其实就是标准控制台打印当然就是输出表了。所以接下来就可以直接执行 SQL 查询并将查询结果 INSERT 写入结果表中了。
在 SQL 客户端中每定义一个 SQL 查询就会把它作为一个 Flink 作业提交到集群上执行。所以通过这种方式我们可以快速地对流处理程序进行开发测试。
连接到外部系统
在 Table API 和 SQL 编写的 Flink 程序中可以在创建表的时候用 WITH 子句指定连接器connector这样就可以连接到外部系统进行数据交互了。
架构中的 TableSource 负责从外部系统中读取数据并转换成表TableSink 则负责将结果表写入外部系统。在 Flink 1.13 的 API 调用中已经不去区分 TableSource 和 TableSink我们只要建立到外部系统的连接并创建表就可以Flink 自动会从程序的处理逻辑中解析出它们的用途。
Flink 的 Table API 和 SQL 支持了各种不同的连接器。当然最简单的其实就是连接到控制台打印输出
CREATE TABLE ResultTable (user STRING,cnt BIGINT
)WITH (connector print
);Kafka
1引入依赖
想要在 Flink 程序中使用 Kafka 连接器需要引入如下依赖
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka_${scala.binary.version}/artifactIdversion${flink.version}/version
/dependency这里我们引入的 Flink 和 Kafka 的连接器与之前 DataStream API 中引入的连接器是一样的。如果想在 SQL 客户端里使用 Kafka 连接器还需要下载对应的 jar 包放到 lib 目录下。
另外Flink 为各种连接器提供了一系列的“表格式”table formats比如 CSV、JSONAvro、Parquet 等等。这些表格式定义了底层存储的二进制数据和表的列之间的转换方式相当于表的序列化工具。对于 Kafka 而言CSV、JSON、Avro 等主要格式都是支持的根据 Kafka 连接器中配置的格式我们可能需要引入对应的依赖支持。以 CSV 为例
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-csv/artifactIdversion${flink.version}/version
/dependency由于 SQL 客户端中已经内置了 CSV、JSON 的支持因此使用时无需专门引入而对于没有内置支持的格式比如 Avro则仍然要下载相应的 jar 包。
2创建连接到 Kafka 的表
创建一个连接到 Kafka 表需要在 CREATE TABLE 的 DDL 中在 WITH 子句里指定连接器为 Kafka并定义必要的配置参数。
CREATE TABLE KafkaTable (user STRING,url STRING,ts TIMESTAMP(3) METADATA FROM timestamp
) WITH (connector kafka,topic events,properties.bootstrap.servers localhost:9092,properties.group.id testGroup,scan.startup.mode earliest-offset,format csv
)在 KafkaTable 的字段中有一个 ts它的声明中用到了METADATA FROM这是表示一个“元数据列”metadata column它是由 Kafka 连接器的元数据“timestamp”生成的。这里的 timestamp 其实就是 Kafka 中数据自带的时间戳我们把它直接作为元数据提取出来转换成一个新的字段 ts。
3Upsert Kafka
正常情况下Kafka 作为保持数据顺序的消息队列读取和写入都应该是流式的数据对应在表中就是仅追加append-only模式。如果我们想要将有更新操作比如分组聚合的结果表写入 Kafka就会因为 Kafka 无法识别撤回retract或更新插入upsert消息而导致异常。
为了解决这个问题Flink 专门增加了一个**“更新插入 Kafka”Upsert Kafka连接器**。这个连接器支持以更新插入UPSERT的方式向 Kafka 的 topic 中读写数据。
具体来说Upsert Kafka 连接器处理的是更新日志changlog流。如果作为 TableSource连接器会将读取到的 topic中的数据key, value解释为对当前 key 的数据值的更新UPDATE也就是查找动态表中 key 对应的一行数据将 value 更新为最新的值因为是 Upsert 操作所以如果没有 key 对应的行那么也会执行插入INSERT操作。另外如果遇到 value 为空null连接器就把这条数据理解为对相应 key 那一行的删除DELETE操作。如果作为 TableSinkUpsert Kafka 连接器会将有更新操作的结果表转换成更新日志changelog流。如果遇到插入INSERT或者更新后UPDATE_AFTER的数据对应的是一个添加add消息那么就直接正常写入 Kafka 主题如果是删除DELETE或者更新前的数据对应是一个撤回retract消息那么就把 value 为空null的数据写入 Kafka。由于 Flink 是根据键key的值对数据进行分区的这样就可以保证同一个 key 上的更新和删除消息都会落到同一个分区中。
下面是一个创建和使用 Upsert Kafka 表的例子
CREATE TABLE pageviews_per_region (user_region STRING,pv BIGINT,uv BIGINT,PRIMARY KEY (user_region) NOT ENFORCED
) WITH (connector upsert-kafka,topic pageviews_per_region,properties.bootstrap.servers ...,key.format avro,value.format avro
);
CREATE TABLE pageviews (user_id BIGINT,page_id BIGINT,viewtime TIMESTAMP,user_region STRING,WATERMARK FOR viewtime AS viewtime - INTERVAL 2 SECOND
) WITH (connector kafka,topic pageviews,properties.bootstrap.servers ...,format json
);
-- 计算 pv、uv 并插入到 upsert-kafka 表中
INSERT INTO pageviews_per_region
SELECTuser_region,COUNT(*),COUNT(DISTINCT user_id)
FROM pageviews
GROUP BY user_region;为了将结果表写入 Kafka 的 pageviews_per_region 主题我们定义了一个 Upsert Kafka 表它的字段中需要用PRIMARY KEY来指定主键并且在WITH子句中分别指定key和value的序列化格式。
文件系统
另一类非常常见的外部系统就是文件系统File System了。Flink 提供了文件系统的连接器支持从本地或者分布式的文件系统中读写数据。这个连接器是内置在 Flink 中的所以使用它并不需要额外引入依赖。
CREATE TABLE MyTable (column_name1 INT,column_name2 STRING,...part_name1 INT,part_name2 STRING
) PARTITIONED BY (part_name1, part_name2) WITH (connector filesystem, -- 连接器类型path ..., -- 文件路径format ... -- 文件格式
)这里在 WITH 前使用了 PARTITIONED BY 对数据进行了分区操作。文件系统连接器支持对分区文件的访问。
JDBC
关系型数据表本身就是 SQL 最初应用的地方所以我们也会希望能直接向关系型数据库中读写表数据。Flink 提供的 JDBC 连接器可以通过 JDBC 驱动程序driver向任意的关系型数据库读写数据比如 MySQL、PostgreSQL、Derby 等。
作为 TableSink 向数据库写入数据时运行的模式取决于创建表的 DDL 是否定义了主键primary key。如果有主键那么 JDBC 连接器就将以更新插入Upsert模式运行可以向外部数据库发送按照指定键key的更新UPDATE和删除DELETE操作如果没有定义主键那么就将在追加Append模式下运行不支持更新和删除操作。
想要在 Flink 程序中使用 JDBC 连接器需要引入如下依赖
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-jdbc_${scala.binary.version}/artifactIdversion${flink.version}/version
/dependency此外为了连接到特定的数据库我们还用引入相关的驱动器依赖比如 MySQL
dependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion5.1.38/version
/dependency创建 JDBC 表的方法与前面 Upsert Kafka 大同小异。下面是一个具体示例
-- 创建一张连接到 MySQL 的 表
CREATE TABLE MyTable (id BIGINT,name STRING,age INT,status BOOLEAN,PRIMARY KEY (id) NOT ENFORCED
) WITH (connector jdbc,url jdbc:mysql://localhost:3306/mydatabase,table-name users
);
-- 将另一张表 T 的数据写入到 MyTable 表中
INSERT INTO MyTable SELECT id, name, age, status FROM T;Elasticsearch
Elasticsearch 作为分布式搜索分析引擎在大数据应用中有非常多的场景。Flink 提供的Elasticsearch的SQL连接器只能作为TableSink可以将表数据写入Elasticsearch的索引index。
Elasticsearch 连接器的使用与 JDBC 连接器非常相似写入数据的模式同样是由创建表的 DDL中是否有主键定义决定的。
想要在 Flink 程序中使用 Elasticsearch 连接器需要引入对应的依赖
dependencygroupIdorg.apache.flink/groupId artifactIdflink-connector-elasticsearch6_${scala.binary.version}/artifactIdversion${flink.version}/version
/dependency具体的依赖与Elasticsearch 服务器的版本有关对于 6.x 版本引入依赖如上对于 Elasticsearch 7 以上的版本引入的依赖则是
dependencygroupIdorg.apache.flink/groupId artifactIdflink-connector-elasticsearch7_${scala.binary.version}/artifactIdversion${flink.version}/version
/dependency创建 Elasticsearch 表的方法与 JDBC 表基本一致。下面是一个具体示例
-- 创建一张连接到 Elasticsearch 的 表
CREATE TABLE MyTable (user_id STRING,user_name STRINGuv BIGINT,pv BIGINT,PRIMARY KEY (user_id) NOT ENFORCED
) WITH (connector elasticsearch-7,hosts http://localhost:9200,index users
);HBase
作为高性能、可伸缩的分布式列存储数据库HBase 在大数据分析中是一个非常重要的工具。Flink 提供的 HBase 连接器支持面向 HBase 集群的读写操作。
在流处理场景下连接器作为 TableSink 向 HBase 写入数据时采用的始终是更新插入Upsert模式。也就是说HBase 要求连接器必须通过定义的主键primary key来发送更新日志changelog。所以在创建表的 DDL 中我们必须要定义行键rowkey字段并将它声明为主键如果没有用 PRIMARY KEY 子句声明主键连接器会默认把 rowkey 作为主键。
想要在 Flink 程序中使用 HBase 连接器需要引入对应的依赖。目前 Flink 只对 HBase 的1.4.x 和 2.2.x 版本提供了连接器支持而引入的依赖也应该与具体的 HBase 版本有关。对于1.4 版本引入依赖如下
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-hbase-1.4_${scala.binary.version}/artifactIdversion${flink.version}/version
/dependency对于 HBase 2.2 版本引入的依赖则是
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-hbase-2.2_${scala.binary.version}/artifactIdversion${flink.version}/version
/dependency由于 HBase 并不是关系型数据库因此转换为 Flink SQL 中的表会稍有一些麻烦。在 DDL 创建出的 HBase 表中所有的列族column family都必须声明为 ROW 类型在表中占据一个字段而每个 family 中的列column qualifier则对应着 ROW 里的嵌套字段。我们不需要将 HBase 中所有的 family 和 qualifier 都在 Flink SQL 的表中声明出来只要把那些在查询中用到的声明出来就可以了。
除了所有 ROW 类型的字段对应着 HBase 中的 family表中还应有一个原子类型的字段它就会被识别为 HBase 的 rowkey。在表中这个字段可以任意取名不一定非要叫 rowkey。
-- 创建一张连接到 HBase 的 表
CREATE TABLE MyTable (rowkey INT,family1 ROWq1 INT,family2 ROWq2 STRING, q3 BIGINT,family3 ROWq4 DOUBLE, q5 BOOLEAN, q6 STRING,PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (connector hbase-1.4,table-name mytable,zookeeper.quorum localhost:2181
);
-- 假设表 T 的字段结构是 [rowkey, f1q1, f2q2, f2q3, f3q4, f3q5, f3q6]
INSERT INTO MyTable
SELECT rowkey, ROW(f1q1), ROW(f2q2, f2q3), ROW(f3q4, f3q5, f3q6) FROM T;Hive
Apache Hive 作为一个基于 Hadoop 的数据仓库基础框架可以说已经成为了进行海量数据分析的核心组件。Hive 支持类 SQL 的查询语言可以用来方便对数据进行处理和统计分析而且基于 HDFS 的数据存储有非常好的可扩展性是存储分析超大量数据集的唯一选择。Hive的主要缺点在于查询的延迟很高几乎成了离线分析的代言人。而 Flink 的特点就是实时性强所以 Flink SQL 与 Hive 的结合势在必行。
Flink 与 Hive 的集成比较特别。Flink 提供了“Hive 目录”HiveCatalog功能允许使用Hive 的“元存储”Metastore来管理 Flink 的元数据。这带来的好处体现在两个方面
Metastore 可以作为一个持久化的目录因此使用 HiveCatalog 可以跨会话存储 Flink 特定的元数据。这样一来我们在 HiveCatalog 中执行执行创建 Kafka 表或者 ElasticSearch 表就可以把它们的元数据持久化存储在 Hive 的 Metastore 中对于不同的作业会话就不需要重复创建了直接在 SQL 查询中重用就可以。使用 HiveCatalogFlink 可以作为读写 Hive 表的替代分析引擎。这样一来在 Hive中进行批处理会更加高效与此同时也有了连续在 Hive 中读写数据、进行流处理的能力这也使得“实时数仓”real-time data warehouse成为了可能。
HiveCatalog 被设计为“开箱即用”与现有的 Hive 配置完全兼容我们不需要做任何的修改与调整就可以直接使用。注意只有 Blink 的计划器planner提供了 Hive 集成的支持所以需要在使用 Flink SQL时选择Blink planner。下面我们就来看以下与Hive 集成的具体步骤。
1引入依赖
Hive 各版本特性变化比较大所以使用时需要注意版本的兼容性。目前 Flink 支持的 Hive版本包括
Hive 1.x1.0.0~1.2.2Hive 2.x2.0.02.2.02.3.02.3.6Hive 3.x3.0.0~3.1.2
由于 Hive 是基于 Hadoop 的组件因此我们首先需要提供 Hadoop 的相关支持在环境变量中设置 HADOOP_CLASSPATH
export HADOOP_CLASSPATHhadoop classpath在 Flink 程序中可以引入以下依赖
!-- Flink 的 Hive 连接器--
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-hive_${scala.binary.version}/artifactIdversion${flink.version}/version
/dependency
!-- Hive 依赖 --
dependencygroupIdorg.apache.hive/groupIdartifactIdhive-exec/artifactIdversion${hive.version}/version
/dependency建议不要把这些依赖打包到结果 jar 文件中而是在运行时的集群环境中为不同的 Hive版本添加不同的依赖支持。
2连接到 Hive
在 Flink 中连接 Hive是通过在表环境中配置 HiveCatalog 来实现的。需要说明的是配置 HiveCatalog 本身并不需要限定使用哪个 planner不过对 Hive 表的读写操作只有 Blink 的planner 才支持。所以一般我们需要将表环境的 planner 设置为 Blink。下面是代码中配置 Catalog 的示例
EnvironmentSettings settings EnvironmentSettings.newInstance().useBlinkPlanner().build();
TableEnvironment tableEnv TableEnvironment.create(settings);
String name myhive;
String defaultDatabase mydatabase;
String hiveConfDir /opt/hive-conf;
// 创建一个 HiveCatalog并在表环境中注册
HiveCatalog hive new HiveCatalog(name, defaultDatabase, hiveConfDir);
tableEnv.registerCatalog(myhive, hive);
// 使用 HiveCatalog 作为当前会话的 catalog
tableEnv.useCatalog(myhive);当然我们也可以直接启动 SQL 客户端用 CREATE CATALOG 语句直接创建 HiveCatalog
Flink SQL create catalog myhive with (type hive, hive-conf-dir /opt/hive-conf);
[INFO] Execute statement succeed.
Flink SQL use catalog myhive;
[INFO] Execute statement succeed.3设置 SQL 方言
我们知道Hive内部提供了类SQL的查询语言不过语法细节与标准SQL会有一些出入相当于是 SQL 的一种“方言”dialect。为了提高与 Hive 集成时的兼容性Flink SQL 提供了一个非常有趣而强大的功能可以使用方言来编写 SQL 语句。换句话说我们可以直接在 Flink中写 Hive SQL 来操作 Hive 表。
Flink 目前支持两种 SQL 方言的配置default 和 hive。所谓的 default 就是 Flink SQL 默认的 SQL 语法了。我们需要先切换到 hive 方言然后才能使用 Hive SQL 的语法。具体设置可以分为 SQL 和 Table API 两种方式。 SQL 中设置 我们可以通过配置 table.sql-dialect 属性来设置 SQL 方言 set table.sql-dialecthive;当然我们可以在代码中执行上面的 SET 语句也可以直接启动 SQL 客户端来运行。如果使用 SQL 客户端我们还可以在配置文件 sql-cli-defaults.yaml 中通过“configuration”模块来设置 execution:planner: blinktype: batchresult-mode: table
configuration:table.sql-dialect: hiveTable API 中设置 另外一种方式就是在代码中直接使用 Table API 获取表环境的配置项来进行设置 // 配置 hive 方言
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
// 配置 default 方言
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);4读写 Hive 表
Flink 支持以批处理和流处理模式向 Hive 中读写数据
在批处理模式下Flink 会在执行查询语句时对 Hive 表进行一次性读取在作业完成时将结果数据向 Hive 表进行一次性写入而在流处理模式下Flink 会持续监控 Hive 表在新数据可用时增量读取也可以持续写入新数据并增量式地让它们可见。
更灵活的是我们可以随时切换 SQL 方言从其它数据源例如 Kafka读取数据、经转换后再写入Hive。下面是以纯SQL形式编写的一个示例我们可以启动SQL客户端来运行
-- 设置 SQL 方言为 hive创建 Hive 表
SET table.sql-dialecthive;
CREATE TABLE hive_table (user_id STRING,order_amount DOUBLE
) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (partition.time-extractor.timestamp-pattern$dt $hr:00:00,sink.partition-commit.triggerpartition-time,sink.partition-commit.delay1 h,sink.partition-commit.policy.kindmetastore,success-file
);
-- 设置 SQL 方言为 default创建 Kafka 表
SET table.sql-dialectdefault;
CREATE TABLE kafka_table (user_id STRING,order_amount DOUBLE,log_ts TIMESTAMP(3),WATERMARK FOR log_ts AS log_ts - INTERVAL 5 SECOND – 定义水位线
) WITH (...);
-- 将 Kafka 中读取的数据经转换后写入 Hive
INSERT INTO TABLE hive_table
SELECT user_id, order_amount, DATE_FORMAT(log_ts, yyyy-MM-dd),
DATE_FORMAT(log_ts, HH)
FROM kafka_table;这里我们创建 Hive 表时设置了通过分区时间来触发提交的策略。将 Kafka 中读取的数据经转换后写入 Hive这是一个流处理的 Flink SQL 程序。