海南智能网站建设公司,上海网站建设建站,wap手机网站静态模板,湛江今天发生的重大新闻Flink 系列文章
1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接
13、Flink 的table api与sql的基本概念、通用api介绍及入门示例 14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性 15、Flink 的ta…Flink 系列文章
1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接
13、Flink 的table api与sql的基本概念、通用api介绍及入门示例 14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性 15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置如何处理更新结果、时态表、流上的join、流上的确定性以及查询配置 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例1 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例2 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例3 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例4 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例6 17、Flink 之Table API: Table API 支持的操作1 17、Flink 之Table API: Table API 支持的操作2 18、Flink的SQL 支持的操作和语法 19、Flink 的Table API 和 SQL 中的内置函数及示例1
20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL可以直接提交 SQL 任务到集群上
22、Flink 的table api与sql之创建表的DDL 24、Flink 的table api与sql之Catalogs介绍、类型、java api和sql实现ddl、java api和sql操作catalog-1 24、Flink 的table api与sql之Catalogsjava api操作数据库、表-2 24、Flink 的table api与sql之Catalogsjava api操作视图-3 24、Flink 的table api与sql之Catalogsjava api操作分区与函数-4
26、Flink 的SQL之概览与入门示例 27、Flink 的SQL之SELECT (select、where、distinct、order by、limit、集合操作和去重)介绍及详细示例1 27、Flink 的SQL之SELECT (SQL Hints 和 Joins)介绍及详细示例2 27、Flink 的SQL之SELECT (窗口函数)介绍及详细示例3 27、Flink 的SQL之SELECT (窗口聚合)介绍及详细示例4 27、Flink 的SQL之SELECT (Group Aggregation分组聚合、Over Aggregation Over聚合 和 Window Join 窗口关联)介绍及详细示例5 27、Flink 的SQL之SELECT (Top-N、Window Top-N 窗口 Top-N 和 Window Deduplication 窗口去重)介绍及详细示例6 27、Flink 的SQL之SELECT (Pattern Recognition 模式检测)介绍及详细示例7 28、Flink 的SQL之DROP 、ALTER 、INSERT 、ANALYZE 语句 29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE1 29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE2 30、Flink SQL之SQL 客户端通过kafka和filesystem的例子介绍了配置文件使用-表、视图等 32、Flink table api和SQL 之用户自定义 Sources Sinks实现及详细示例 41、Flink之Hive 方言介绍及详细示例 42、Flink 的table api与sql之Hive Catalog 43、Flink之Hive 读写及详细验证示例 44、Flink之module模块介绍及使用示例和Flink SQL使用hive内置函数及自定义函数详细示例–网上有些说法好像是错误的 文章目录 Flink 系列文章一、函数分类1、分类标准及类别2、函数引用1、精确函数引用2、模糊函数引用 3、函数解析顺序1、精确函数引用2、模糊函数引用 二、系统内置函数1、标量函数1、比较函数2、逻辑函数3、算术函数4、字符串函数5、时间函数6、条件函数7、类型转换函数8、集合函数9、JSON Functions1、IS JSON2、JSON_EXISTS3、JSON_STRING4、JSON_VALUE5、JSON_QUERY6、JSON_OBJECT7、JSON_ARRAY8、JSON_ARRAYAGG10、JSON_OBJECTAGG 10、值构建函数11、值获取函数12、分组函数13、哈希函数 2、聚合函数3、时间间隔单位和时间点单位标识符4、列函数 本文介绍了flink的函数分类、内置函数的说明及示例特别是针对json function函数每个均以可运行的示例进行说明。 本文依赖flink集群能正常使用。 本文分为2个部分即函数分类以及内置函数。 本文的示例均在Flink 1.17版本中运行。
一、函数分类
Flink 允许用户在 Table API 和 SQL 中使用函数进行数据的转换。 Flink 中的函数有两个划分标准。
1、分类标准及类别
一个划分标准是系统内置函数和 Catalog 函数。系统函数没有名称空间只能通过其名称来进行引用。 Catalog 函数属于 Catalog 和数据库因此它们拥有 Catalog 和数据库命名空间。 用户可以通过全/部分限定名catalog.db.func 或 db.func或者函数名 来对 Catalog 函数进行引用。
另一个划分标准是临时函数和持久化函数。 临时函数始终由用户创建它容易改变并且仅在会话的生命周期内有效。 持久化函数不是由系统提供就是存储在 Catalog 中它在会话的整个生命周期内都有效。
这两个划分标准给 Flink 用户提供了 4 种函数
临时性系统函数系统函数临时性 Catalog 函数Catalog 函数 系统函数始终优先于 Catalog 函数解析临时函数始终优先于持久化函数解析 函数解析优先级如下所述。 2、函数引用
用户在 Flink 中可以通过精确、模糊两种引用方式引用函数。
1、精确函数引用
精确函数引用允许用户跨 Catalog跨数据库调用 Catalog 函数。 例如select mycatalog.mydb.myfunc(x) from mytable 和 select mydb.myfunc(x) from mytable。
仅 Flink 1.10 以上版本支持。
2、模糊函数引用
在模糊函数引用中用户只需在 SQL 查询中指定函数名例如 select myfunc(x) from mytable。
3、函数解析顺序
当函数名相同函数类型不同时函数解析顺序才有意义。 例如当有三个都名为 “myfunc” 的临时性 Catalog 函数Catalog 函数和系统函数时 如果没有命名冲突三个函数将会被解析为一个函数。
1、精确函数引用
由于系统函数没有命名空间Flink 中的精确函数引用必须 指向临时性 Catalog 函数或 Catalog 函数。
解析顺序如下
临时性 catalog 函数Catalog 函数
2、模糊函数引用
解析顺序如下
临时性系统函数系统函数临时性 Catalog 函数, 在会话的当前 Catalog 和当前数据库中Catalog 函数, 在会话的当前 Catalog 和当前数据库中
二、系统内置函数
Flink Table API SQL 为用户提供了一组内置的数据转换函数。
1、标量函数
标量函数将零、一个或多个值作为输入并返回单个值作为结果。
1、比较函数 2、逻辑函数 3、算术函数 4、字符串函数 5、时间函数 6、条件函数 7、类型转换函数 8、集合函数 9、JSON Functions
JSON 函数使用 SQL 标准的 ISO/IEC TR 19075-6 中所述的 JSON 路径表达式JSON path expressions 。它们的语法受到 ECMAScript 的启发并采用了 ECMAScript 的许多功能但既不是它的子集也不是它的超集。
路径表达式有两种风格宽松和严格 lax and strict.。省略时它默认为严格模式。 严格模式旨在从架构角度检查数据每当数据不符合路径表达式时就会引发错误。但是像 JSON_VALUE 这样的函数允许在遇到错误时定义回退行为。 宽松模式更宽容并将错误转换为空序列。
特殊字符 $ 表示 JSON 路径中的根节点。路径可以访问属性 . a 、数组元素 .a、数组元素 .a、数组元素.a[0].b 或分支数组中的所有元素 $.a[*].b。
已知限制 截至Flink 1.17版本并非正确支持宽松模式的所有功能。这是一个上游错误 CALCITE-4717。不保证非标准行为。
1、IS JSON
确定给定字符串是否为有效的 JSON。 指定可选的类型参数会限制允许哪种类型的 JSON 对象。如果字符串是有效的 JSON但不是该类型则返回 false。默认值为 VALUE。
SQL语法
IS JSON [ { VALUE | SCALAR | ARRAY | OBJECT } ]table api语法
STRING.isJson([JsonType type])示例 -- TRUE
Flink SQL select 1 IS JSON;
------------
| op | EXPR$0 |
------------
| I | TRUE |
------------Flink SQL select [] IS JSON;
------------
| op | EXPR$0 |
------------
| I | TRUE |
------------
-- The following statements return TRUE.
SELECT 1 IS JSON;
SELECT [] IS JSON;
SELECT {} IS JSON;
SELECT abc IS JSON;
SELECT 1 IS JSON SCALAR;
SELECT {} IS JSON OBJECT;-- The following statements return FALSE.
SELECT abc IS JSON;
SELECT 1 IS JSON ARRAY;
SELECT 1 IS JSON OBJECT;
SELECT {} IS JSON SCALAR;
SELECT {} IS JSON ARRAY;# 以下示例一样不再赘述1 IS JSON
[] IS JSON
{} IS JSON-- TRUE
abc IS JSON
-- FALSE
abc IS JSON
NULL IS JSON-- TRUE
1 IS JSON SCALAR
-- FALSE
1 IS JSON ARRAY
-- FALSE
1 IS JSON OBJECT-- FALSE
{} IS JSON SCALAR
-- FALSE
{} IS JSON ARRAY
-- TRUE
{} IS JSON OBJECT2、JSON_EXISTS
确定 JSON 字符串是否满足给定的路径搜索条件。 如果省略错误行为则假定 FALSE ON ERROR 为默认值。
SQL语法
JSON_EXISTS(jsonValue, path [ { TRUE | FALSE | UNKNOWN | ERROR } ON ERROR ])table api语法
STRING.jsonExists(STRING path [, JsonExistsOnError onError])示例
Flink SQL SELECT JSON_EXISTS({a: true}, strict $.b FALSE ON ERROR);
------------
| op | EXPR$0 |
------------
| I | FALSE |
-------------- The following statements return TRUE.
SELECT JSON_EXISTS({a: true}, $.a);
SELECT JSON_EXISTS({a: [{ b: 1 }]}, $.a[0].b);
SELECT JSON_EXISTS({a: true}, strict $.b TRUE ON ERROR);
-- The following statements return FALSE.
SELECT JSON_EXISTS({a: true}, $.b);
SELECT JSON_EXISTS({a: true}, strict $.b FALSE ON ERROR);-- TRUE
SELECT JSON_EXISTS({a: true}, $.a);
-- FALSE
SELECT JSON_EXISTS({a: true}, $.b);
-- TRUE
SELECT JSON_EXISTS({a: [{ b: 1 }]},$.a[0].b);-- TRUE
SELECT JSON_EXISTS({a: true},strict $.b TRUE ON ERROR);
-- FALSE
SELECT JSON_EXISTS({a: true},strict $.b FALSE ON ERROR);3、JSON_STRING
将值序列化为 JSON。 此函数返回包含序列化值的 JSON 字符串。如果值为 NULL则该函数返回 NULL。
SQL语法
JSON_STRING(value)table api语法
jsonString(value)示例
Flink SQL SELECT JSON_STRING(1);
------------------------------------
| op | EXPR$0 |
------------------------------------
| I | 1 |
-------------------------------------- returns NULL
SELECT JSON_STRING(CAST(NULL AS INT));-- returns 1
SELECT JSON_STRING(1);-- returns true
SELECT JSON_STRING(TRUE);-- returns Hello, World!
JSON_STRING(Hello, World!);-- returns [1,2]
JSON_STRING(ARRAY[1, 2])-- NULL
JSON_STRING(CAST(NULL AS INT))-- 1
JSON_STRING(1)
-- true
JSON_STRING(TRUE)
-- Hello, World!
JSON_STRING(Hello, World!)
-- [1,2]
JSON_STRING(ARRAY[1, 2])4、JSON_VALUE
从 JSON 字符串中提取标量。 此方法在 JSON 字符串中搜索给定的路径表达式如果该路径的值为标量则返回该值。不能返回非标量值。 默认情况下该值以 STRING 形式返回。使用 returningType 可以选择不同的类型并支持以下类型
VARCHAR / STRINGBOOLEANINTEGERDOUBLE
对于空路径表达式或错误可以将行为定义为返回 null、引发错误或返回定义的默认值。 省略时默认值分别为 NULL ON EMPTY 或 NULL ON ERROR。 默认值可以是文本或表达式。如果默认值本身引发错误则它将下降到 ON EMPTY 的错误行为并引发 ON ERROR 的错误。
对于包含空格等特殊字符的路径可以使用 [‘property’] 或 [“property”] 选择父对象中的指定属性。 请务必在属性名称两边加上单引号或双引号。 在 SQL 中使用 JSON_VALUE 时路径是一个字符参数该参数已经是单引号因此您必须对属性名称周围的单引号进行转义 例如 JSON_VALUE‘{“a b” “true”}’ ‘$.[’‘a b’‘]’。
SQL语法
JSON_VALUE(jsonValue, path [RETURNING dataType] [ { NULL | ERROR | DEFAULT defaultExpr } ON EMPTY ] [ { NULL | ERROR | DEFAULT defaultExpr } ON ERROR ])table api语法
STRING.jsonValue(STRING path [, returnType, onEmpty, defaultOnEmpty, onError, defaultOnError])示例
Flink SQL SELECT JSON_VALUE({a: true}, $.a);
------------------------------------
| op | EXPR$0 |
------------------------------------
| I | true |
------------------------------------
Flink SQL SELECT JSON_VALUE({contains blank: right}, strict $.[contains blank] NULL ON EMPTY DEFAULT wrong ON ERROR);
------------------------------------
| op | EXPR$0 |
------------------------------------
| I | right |
-------------------------------------- returns true
SELECT JSON_VALUE({a: true}, $.a);-- returns TRUE
SELECT JSON_VALUE({a: true}, $.a RETURNING BOOLEAN);-- returns false
SELECT JSON_VALUE({a: true}, lax $.b DEFAULT FALSE ON EMPTY);-- returns false
SELECT JSON_VALUE({a: true}, strict $.b DEFAULT FALSE ON ERROR);-- returns 0.998D
SELECT JSON_VALUE({a.b: [0.998,0.996]},$.[a.b][0] RETURNING DOUBLE);-- returns right
SELECT JSON_VALUE({contains blank: right}, strict $.[contains blank] NULL ON EMPTY DEFAULT wrong ON ERROR);
5、JSON_QUERY
目前不支持 RETURNING 子句。 wrappingBehavior 确定是否应将提取的值包装到数组中以及是无条件地包装还是仅在值本身还不是数组时才这样做。 onEmpty 和 onError 分别确定路径表达式为空或引发错误时的行为。 默认情况下在这两种情况下都返回 null。其他选择是使用空数组、空对象或引发错误。
SQL语法
JSON_QUERY(jsonValue, path [ { WITHOUT | WITH CONDITIONAL | WITH UNCONDITIONAL } [ ARRAY ] WRAPPER ] [ { NULL | EMPTY ARRAY | EMPTY OBJECT | ERROR } ON EMPTY ] [ { NULL | EMPTY ARRAY | EMPTY OBJECT | ERROR } ON ERROR ])table api语法
STRING.jsonQuery(path [, JsonQueryWrapper [, JsonQueryOnEmptyOrError, JsonQueryOnEmptyOrError ] ])示例
Flink SQL SELECT JSON_QUERY({ a: { b: 1 } }, $.a);
------------------------------------
| op | EXPR$0 |
------------------------------------
| I | {b:1} |
------------------------------------
Flink SQL SELECT JSON_QUERY({}, lax $.invalid EMPTY OBJECT ON EMPTY);
------------------------------------
| op | EXPR$0 |
------------------------------------
| I | {} |
------------------------------------
-- returns { b: 1 }
SELECT JSON_QUERY({ a: { b: 1 } }, $.a);-- returns [1, 2]
SELECT JSON_QUERY([1, 2], $);-- returns NULL
SELECT JSON_QUERY(CAST(NULL AS STRING), $);-- returns [c1,c2]
SELECT JSON_QUERY({a:[{c:c1},{c:c2}]}, lax $.a[*].c);-- Wrap the result into an array.
-- returns [{}]
SELECT JSON_QUERY({}, $ WITH CONDITIONAL ARRAY WRAPPER);-- returns [1, 2]
SELECT JSON_QUERY([1, 2], $ WITH CONDITIONAL ARRAY WRAPPER);-- returns [[1, 2]]
SELECT JSON_QUERY([1, 2], $ WITH UNCONDITIONAL ARRAY WRAPPER);-- Scalars must be wrapped to be returned.
-- returns NULL
SELECT JSON_QUERY(1, $);-- returns [1]
SELECT JSON_QUERY(1, $ WITH CONDITIONAL ARRAY WRAPPER);-- Behavior if the path expression is empty.
-- returns {}
SELECT JSON_QUERY({}, lax $.invalid EMPTY OBJECT ON EMPTY);-- Behavior if the path expression has an error.
-- returns []
SELECT JSON_QUERY({}, strict $.invalid EMPTY ARRAY ON ERROR);-- { b: 1 }
JSON_QUERY({ a: { b: 1 } }, $.a)
-- [1, 2]
JSON_QUERY([1, 2], $)
-- NULL
JSON_QUERY(CAST(NULL AS STRING), $)
-- [c1,c2]
JSON_QUERY({a:[{c:c1},{c:c2}]},lax $.a[*].c)-- Wrap result into an array
-- [{}]
JSON_QUERY({}, $ WITH CONDITIONAL ARRAY WRAPPER)
-- [1, 2]
JSON_QUERY([1, 2], $ WITH CONDITIONAL ARRAY WRAPPER)
-- [[1, 2]]
JSON_QUERY([1, 2], $ WITH UNCONDITIONAL ARRAY WRAPPER)-- Scalars must be wrapped to be returned
-- NULL
JSON_QUERY(1, $)
-- [1]
JSON_QUERY(1, $ WITH CONDITIONAL ARRAY WRAPPER)-- Behavior if path expression is empty / there is an error
-- {}
JSON_QUERY({}, lax $.invalid EMPTY OBJECT ON EMPTY)
-- []
JSON_QUERY({}, strict $.invalid EMPTY ARRAY ON ERROR)6、JSON_OBJECT
从键值对列表生成 JSON 对象字符串。 请注意键必须是非 NULL 字符串文本而值可以是任意表达式。 此函数返回一个 JSON 字符串。ON NULL 行为定义如何处理 NULL 值。如果省略则默认假定 NULL ON NULL。 从另一个 JSON 构造函数调用JSON_OBJECT、JSON_ARRAY创建的值是直接插入的而不是作为字符串插入的。这允许构建嵌套的 JSON 结构。
SQL语法
JSON_OBJECT([[KEY] key VALUE value]* [ { NULL | ABSENT } ON NULL ])table api语法
jsonObject(JsonOnNull, keyValues...)示例
Flink SQL SELECT JSON_OBJECT(KEY K1VALUE JSON_OBJECT(KEY K2VALUE V));
------------------------------------
| op | EXPR$0 |
------------------------------------
| I | {K1:{K2:V}} |
------------------------------------Flink SQL SELECT JSON_OBJECT(KEY K1 VALUE CAST(NULL AS STRING) ABSENT ON NULL);
------------------------------------
| op | EXPR$0 |
------------------------------------
| I | {} |
-------------------------------------- returns {}
SELECT JSON_OBJECT();-- returns {K1:V1,K2:V2}
SELECT JSON_OBJECT(K1 VALUE V1, K2 VALUE V2);-- Use an expression as a value.
SELECT JSON_OBJECT(orderNo VALUE orders.orderId);-- ON NULL
-- {K1:null}
SELECT JSON_OBJECT(KEY K1 VALUE CAST(NULL AS STRING) NULL ON NULL);-- ON NULL
-- {}
SELECT JSON_OBJECT(KEY K1 VALUE CAST(NULL AS STRING) ABSENT ON NULL);-- returns {K1:{K2:V}}
SELECT JSON_OBJECT(KEY K1VALUE JSON_OBJECT(KEY K2VALUE V)
);-- {}
JSON_OBJECT()-- {K1:V1,K2:V2}
JSON_OBJECT(K1 VALUE V1, K2 VALUE V2)-- Expressions as values
JSON_OBJECT(orderNo VALUE orders.orderId)-- ON NULL
JSON_OBJECT(KEY K1 VALUE CAST(NULL AS STRING) NULL ON NULL) -- {K1:null}
JSON_OBJECT(KEY K1 VALUE CAST(NULL AS STRING) ABSENT ON NULL) -- {}-- {K1:{K2:V}}
JSON_OBJECT(KEY K1VALUE JSON_OBJECT(KEY K2VALUE V)
)7、JSON_ARRAY
从值列表生成 JSON 数组字符串。 此函数返回一个 JSON 字符串。这些值可以是任意表达式。ON NULL 行为定义如何处理 NULL 值。如果省略则默认假定 ABSENT ON NULL。 从另一个 JSON 构造函数调用JSON_OBJECT、JSON_ARRAY创建的元素是直接插入的而不是作为字符串插入的。这允许构建嵌套的 JSON 结构。
SQL语法
JSON_ARRAY([value]* [ { NULL | ABSENT } ON NULL ])table api语法
jsonArray(JsonOnNull, values...)示例
Flink SQL SELECT JSON_ARRAY(1, 2);
------------------------------------
| op | EXPR$0 |
------------------------------------
| I | [1,2] |
------------------------------------
Received a total of 1 rowFlink SQL SELECT JSON_ARRAY(CAST(NULL AS STRING) ABSENT ON NULL);
------------------------------------
| op | EXPR$0 |
------------------------------------
| I | [] |
-------------------------------------- returns []
SELECT JSON_ARRAY();-- returns [1,2]
SELECT JSON_ARRAY(1, 2);-- Use an expression as a value.
SELECT JSON_ARRAY(orders.orderId);-- ON NULL
-- returns [null]
SELECT JSON_ARRAY(CAST(NULL AS STRING) NULL ON NULL);-- ON NULL
-- returns []
SELECT JSON_ARRAY(CAST(NULL AS STRING) ABSENT ON NULL);-- returns [[1]]
SELECT JSON_ARRAY(JSON_ARRAY(1));-- []
JSON_ARRAY()
-- [1,2]
JSON_ARRAY(1, 2)-- Expressions as values
JSON_ARRAY(orders.orderId)-- ON NULL
JSON_ARRAY(CAST(NULL AS STRING) NULL ON NULL) -- [null]
JSON_ARRAY(CAST(NULL AS STRING) ABSENT ON NULL) -- []-- [[1]]
JSON_ARRAY(JSON_ARRAY(1))
8、JSON_ARRAYAGG
将明细聚合到 JSON 数组字符串中。 JSON_ARRAYAGG 函数通过将指定的项聚合到数组中来创建 JSON 对象字符串。
item 表达式可以是任意的包括其他 JSON 函数。
如果值为 NULL则 ON NULL 行为定义要执行的操作。如果省略则 ABSENT ON NULL 为默认值。
OVER 窗口、无限会话窗口或 HOP 窗口不支持JSON_ARRAYAGG函数。
SQL语法
JSON_ARRAYAGG(items [ { NULL | ABSENT } ON NULL ])table api语法
在这里插入代码片示例
Flink SQL CREATE TABLE source_table (userId INT,age INT,balance DOUBLE,userName STRING,t_insert_time AS localtimestamp,WATERMARK FOR t_insert_time AS t_insert_time) WITH (connector datagen,rows-per-second5,fields.userId.kindsequence,fields.userId.start1,fields.userId.end10,fields.balance.kindrandom,fields.balance.min1,fields.balance.max100,fields.age.min1,fields.age.max1000,fields.userName.length10);
[INFO] Execute statement succeed.Flink SQL select * from source_table;
-----------------------------------------------------------------------------------------------------------------------
| op | userId | age | balance | userName | t_insert_time |
-----------------------------------------------------------------------------------------------------------------------
| I | 1 | 555 | 90.45012880441223 | 7e2b6c7beb | 2023-11-06 17:29:05.273 |
| I | 2 | 209 | 32.07201650494765 | f652baac94 | 2023-11-06 17:29:05.274 |
| I | 3 | 278 | 24.299962537076734 | 11b4353416 | 2023-11-06 17:29:05.274 |
| I | 4 | 433 | 58.634356546049574 | 21d5d09603 | 2023-11-06 17:29:05.274 |
| I | 5 | 55 | 16.20617629075601 | d626f31213 | 2023-11-06 17:29:05.274 |
| I | 6 | 442 | 98.87803427244727 | 0305c21dc5 | 2023-11-06 17:29:06.267 |
| I | 7 | 19 | 96.11095443982174 | ea873b2df2 | 2023-11-06 17:29:06.268 |
| I | 8 | 806 | 36.5775262369553 | f8df556b22 | 2023-11-06 17:29:06.268 |
| I | 9 | 919 | 69.47517602162831 | 85074390f3 | 2023-11-06 17:29:06.268 |
| I | 10 | 46 | 47.519467818569815 | 662990446f | 2023-11-06 17:29:06.268 |
-----------------------------------------------------------------------------------------------------------------------
Received a total of 10 rowsFlink SQL SELECTJSON_ARRAYAGG(userName)FROM source_table;
------------------------------------
| op | EXPR$0 |
------------------------------------
| I | [ee2e4edb32] |
| -U | [ee2e4edb32] |
| U | [ee2e4edb32,66e13f3f77] |
| -U | [ee2e4edb32,66e13f3f77] |
| U | [ee2e4edb32,66e13f3f77,... |
| -U | [ee2e4edb32,66e13f3f77,... |
| U | [ee2e4edb32,66e13f3f77,... |
| -U | [ee2e4edb32,66e13f3f77,... |
| U | [ee2e4edb32,66e13f3f77,... |
| -U | [ee2e4edb32,66e13f3f77,... |
| U | [ee2e4edb32,66e13f3f77,... |
| -U | [ee2e4edb32,66e13f3f77,... |
| U | [ee2e4edb32,66e13f3f77,... |
| -U | [ee2e4edb32,66e13f3f77,... |
| U | [ee2e4edb32,66e13f3f77,... |
| -U | [ee2e4edb32,66e13f3f77,... |
| U | [ee2e4edb32,66e13f3f77,... |
| -U | [ee2e4edb32,66e13f3f77,... |
| U | [ee2e4edb32,66e13f3f77,... |
------------------------------------
Received a total of 19 rowsFlink SQL SELECTJSON_ARRAYAGG(userId)FROM source_table;
------------------------------------
| op | EXPR$0 |
------------------------------------
| I | [1] |
| -U | [1] |
| U | [1,2] |
| -U | [1,2] |
| U | [1,2,3] |
| -U | [1,2,3] |
| U | [1,2,3,4] |
| -U | [1,2,3,4] |
| U | [1,2,3,4,5] |
| -U | [1,2,3,4,5] |
| U | [1,2,3,4,5,6] |
| -U | [1,2,3,4,5,6] |
| U | [1,2,3,4,5,6,7] |
| -U | [1,2,3,4,5,6,7] |
| U | [1,2,3,4,5,6,7,8] |
| -U | [1,2,3,4,5,6,7,8] |
| U | [1,2,3,4,5,6,7,8,9] |
| -U | [1,2,3,4,5,6,7,8,9] |
| U | [1,2,3,4,5,6,7,8,9,10] |
------------------------------------
Received a total of 19 rows
10、JSON_OBJECTAGG
将key-value表达式聚合到 JSON 字符串中。
JSON_OBJECTAGG 函数通过将key-value表达式聚合到单个 JSON 对象中来创建 JSON 对象字符串。
key表达式必须返回不可为 null 的字符串。value表达式可以是任意的包括其他 JSON 函数。
密钥必须是唯一的。如果一个key多次出现则会引发错误。
如果value为 NULL则 ON NULL 行为定义要执行的操作。如果省略则 NULL ON NULL 为默认值。
OVER 窗口中不支持 JSON_OBJECTAGG 函数。
SQL语法
JSON_OBJECTAGG([KEY] key VALUE value [ { NULL | ABSENT } ON NULL ])table api语法
在这里插入代码片示例 Flink SQL select JSON_OBJECTAGG(userName VALUE f652baac94 )FROM source_table;
------------------------------------
| op | EXPR$0 |
------------------------------------
| I | {0c3ceeca6f:f652baac94} |
| -U | {0c3ceeca6f:f652baac94} |
| U | {0c3ceeca6f:f652baac94,... |
| -U | {0c3ceeca6f:f652baac94,... |
| U | {0c3ceeca6f:f652baac94,... |
| -U | {0c3ceeca6f:f652baac94,... |
| U | {0c3ceeca6f:f652baac94,... |
| -U | {0c3ceeca6f:f652baac94,... |
| U | {0c3ceeca6f:f652baac94,... |
| -U | {0c3ceeca6f:f652baac94,... |
| U | {0c3ceeca6f:f652baac94,... |
| -U | {0c3ceeca6f:f652baac94,... |
| U | {0c3ceeca6f:f652baac94,... |
| -U | {0c3ceeca6f:f652baac94,... |
| U | {0c3ceeca6f:f652baac94,... |
| -U | {0c3ceeca6f:f652baac94,... |
| U | {0c3ceeca6f:f652baac94,... |
| -U | {0c3ceeca6f:f652baac94,... |
| U | {0c3ceeca6f:f652baac94,... |
------------------------------------
10、值构建函数 11、值获取函数 12、分组函数 13、哈希函数 2、聚合函数
聚合函数将所有的行作为输入并返回单个聚合值作为结果。
3、时间间隔单位和时间点单位标识符
下表列出了时间间隔单位和时间点单位标识符。
对于 Table API请使用 _ 代替空格例如 DAY_TO_HOUR 4、列函数
列函数用于选择或丢弃表的列。 列函数仅在 Table API 中使用。 详细语法如下
//列函数:withColumns(columnExprs)withoutColumns(columnExprs)//多列表达式:columnExpr [, columnExpr]*//单列表达式:columnRef | columnIndex to columnIndex | columnName to columnName//列引用:columnName(The field name that exists in the table) | columnIndex(a positive integer starting from 1)列函数的用法如下表所示假设我们有一个包含 5 列的表(a: Int, b: Long, c: String, d:String, e: String) 列函数可用于所有需要列字段的地方例如 select、groupBy、orderBy、UDFs 等函数例如
table.groupBy(withColumns(range(1, 3))).select(withColumns(range(a, b)), myUDAgg(myUDF(withColumns(range(5, 20)))));以上介绍了flink的函数分类、内置函数的说明及示例特别是针对json function函数每个均以可运行的示例进行说明。