有没有免费网站建设,泰安网络设计公司,部队织梦网站模板免费下载,北京建设银行具体定义请参考官方文档#xff1a;https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/dev/table/overview/本文主要针对实际使用中比较常用的api进行整理#xff0c;大多数例子都是官网#xff0c;如有歧义可与官方对照。一、 创建 TableEnvironmentTab…具体定义请参考官方文档https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/dev/table/overview/本文主要针对实际使用中比较常用的api进行整理大多数例子都是官网如有歧义可与官方对照。一、 创建 TableEnvironmentTableEnvironment 是 Table API 和 SQL 的核心概念。它负责:在内部的 catalog 中注册 Table注册外部的 catalog加载可插拔模块执行 SQL 查询注册自定义函数 scalar、table 或 aggregationDataStream 和 Table 之间的转换(面向 StreamTableEnvironment )from pyflink.table import EnvironmentSettings, TableEnvironmentfrom pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
#创建流处理
env_settings EnvironmentSettings.in_streaming_mode()
table_env TableEnvironment.create(env_settings)
#创建批处理
env_settings EnvironmentSettings.in_batch_mode()
table_env TableEnvironment.create(env_settings)
#用户可以从现有的 StreamExecutionEnvironment 创建一个 StreamTableEnvironment 与 DataStream API 互操作。
s_env StreamExecutionEnvironment.get_execution_environment()
t_env StreamTableEnvironment.create(s_env)TableEnvironment主要用来Table 管理创建表、列举表、Table 和 DataStream 互转等。自定义函数管理自定义函数的注册、删除、列举等。 关于 Python 自定义函数的更多细节请参考普通自定义函数 和向量化自定义函数章节的介绍。执行 SQL 语句更多细节可查阅SQL 查询章节的介绍。作业配置管理更多细节可查阅Python 配置章节的介绍。Python 依赖管理更多细节可查阅依赖管理章节的介绍。作业提交更多细节可查阅作业提交章节的介绍。二、创建表Table 是 Python Table API 的核心组件。Table 对象由一系列数据转换操作构成但是它不包含数据本身。 相反它描述了如何从数据源中读取数据以及如何将最终结果写出到外部存储等。表可以被打印、优化并最终在集群中执行。 表也可以是有限流或无限流以支持流式处理和批处理场景。一个 Table 实例总是与一个特定的 TableEnvironment 相绑定。不支持在同一个查询中合并来自不同 TableEnvironments 的表例如 join 或者 union 它们。from pyflink.table import EnvironmentSettings, TableEnvironment# 创建 批 TableEnvironment
env_settings EnvironmentSettings.in_batch_mode()
table_env TableEnvironment.create(env_settings)首先在上面创建了一个批处理的TableEnvironment。然后创建一张表。在pyflink中可从不同的数据类型中形成创建表下面介绍几个比较常用的方法1、from_elements 从元素集合创建表集合中的元素必须长度相等类型顺序相同from_elements(elements: Iterable, schema: Union[pyflink.table.types.DataType, List[str]] None, verify_schema: bool True) → pyflink.table.table.Table参数elements- 创建表格的元素。schema- 表的架构。verify_schema- 是否根据架构验证元素。例子如下schema可以使用DataTypes 指定类型也可以不指定直接写列名会自动识别。#table table_env.from_elements([(1, Hi), (2, Hello)],[a,b])
#table table_env.from_elements([(1, Hi), (2, Hello)],DataTypes.ROW([DataTypes.FIELD(a, DataTypes.INT()),DataTypes.FIELD(b, DataTypes.STRING())]))
table.execute().print()2、通过 pandas DataFrame 来创建表from_pandas(pdf, schemaNone, split_num1)参数pdf- Pandas DataFrame 。schema- 转换后的表的架构。splits_num- 给定的 Pandas DataFrame 将被分割成的分割数。它决定了并行源任务的数量。如果未指定将使用默认并行度。pdf pd.DataFrame(np.random.rand(10, 2))
table table_env.from_pandas(pdf, [a, b])
#table_env.from_pandas(pdf, [DataTypes.DOUBLE(), DataTypes.DOUBLE()])
# table_env.from_pandas(pdf, DataTypes.ROW(
# [DataTypes.FIELD(a, DataTypes.DOUBLE()), DataTypes.FIELD(b, DataTypes.DOUBLE())]))
table.execute().print()3、create_temporary_view通过指定路径下已注册的表来创建一个表例如通过 create_temporary_view 注册表。from_path(path: str) → pyflink.table.table.Table参数path- 要扫描的表 API 对象的路径。# 临时表
table table_env.from_elements([(1, Hi), (2, Hello)], [id, data])
table_env.create_temporary_view(source_table, table)
new_table table_env.from_path(source_table)
new_table.execute().print()#读取表 - 从已注册的目录中读取表
table_env.execute_sql(CREATE TABLE random_source (id BIGINT, data TINYINT ) WITH (connector datagen,fields.id.kindsequence,fields.id.start1,fields.id.end3,fields.data.kindsequence,fields.data.start4,fields.data.end6)
)
table table_env.from_path(random_source)
table.execute().print()create_temporary_view 将一个 Table 对象注册为一张临时表类似于 SQL 的临时表。create_temporary_view(view_path, table)参数view_path - 注册视图的路径。table_or_data_stream 用于创建视图的表或数据流。table_env.execute_sql(CREATE TABLE table_sink (id BIGINT, data VARCHAR ) WITH (connector print)
)# 将 Table API 表转换成 SQL 中的视图
table table_env.from_elements([(1, Hi), (2, Hello)], [id, data])
table_env.create_temporary_view(table_api_table, table)# 将 Table API 表的数据写入结果表
table_env.execute_sql(INSERT INTO table_sink SELECT * FROM table_api_table).wait()4、execute_sql执行指定的语句并返回执行结果。 执行语句可以是 DDL/DML/DQL/SHOW/DESCRIBE/EXPLAIN/USE。注意对于 INSERT INTO 语句这是一个异步操作通常在向远程集群提交作业时才需要使用。execute_sql(stmt str) 参数str- sql 语句table_env.execute_sql(INSERT INTO table_sink SELECT * FROM table_api_table).wait()5、sql_query(query)执行一条 SQL 查询并将查询的结果作为一个 Table 对象.sql_query(query) 参数query- sql 语句table_env.sql_query(SELECT * FROM %s % table)6、create_statemente_set用来执行多条sql语句可以通过该方法编写multi_sink的作业。table table_env.from_elements([(1, Hi), (2, Hello)], [id, data])
table_env.create_temporary_view(simple_source, table)
table_env.execute_sql(CREATE TABLE first_sink_table (id BIGINT, data VARCHAR ) WITH (connector print)
)
table_env.execute_sql(CREATE TABLE second_sink_table (id BIGINT, data VARCHAR) WITH (connector print)
)# 创建 statement set
statement_set table_env.create_statement_set()# 将 table 的数据写入 first_sink_table
statement_set.add_insert(first_sink_table, table)# 通过一条 sql 插入语句将数据从 simple_source 写入到 second_sink_table
statement_set.add_insert_sql(INSERT INTO second_sink_table SELECT * FROM simple_source)# 执行 statement set
statement_set.execute().wait()7 I(1,Hi) 7 I(1,Hi) 7 I(2,Hello) 7 I(2,Hello)7、get_schema获取schema信息table table_env.from_elements([(1, Hi), (2, Hello)], [id, data])
# 默认情况下“id” 列的类型是 64 位整型
print(By default the type of the id column is %s. % table.get_schema().get_field_data_type(id))from pyflink.table import DataTypestable table_env.from_elements([(1, Hi), (2, Hello)],DataTypes.ROW([DataTypes.FIELD(id, DataTypes.TINYINT()),DataTypes.FIELD(data, DataTypes.STRING())]))
# 现在 “id” 列的类型是 8 位整型
print(table.get_schema())By default the type of the id column is BIGINT. root |-- id: TINYINT |-- data: STRING三、创建TableDescriptor 用来定义表的scheam例子from pyflink.table import EnvironmentSettings, TableEnvironment, TableDescriptor, Schema, DataTypes# create a stream TableEnvironment
env_settings EnvironmentSettings.in_streaming_mode()
table_env TableEnvironment.create(env_settings)table_env.create_temporary_table(random_source,TableDescriptor.for_connector(datagen).schema(Schema.new_builder().column(id, DataTypes.BIGINT()).column(data, DataTypes.TINYINT()).build()).option(fields.id.kind, sequence).option(fields.id.start, 1).option(fields.id.end, 3).option(fields.data.kind, sequence).option(fields.data.start, 4).option(fields.data.end, 6).build())table table_env.from_path(random_source)
table.execute().print()----------------------------------
| op | id | data |
----------------------------------
| I | 1 | 4 |
| I | 2 | 5 |
| I | 3 | 6 |
----------------------------------for_connector使用给定的连接器为表创建一个新的构建器参数当前仅有部分 connector 的实现包含在 Flink 官方提供的发行包中好比 FileSystemDataGen、Print、BlackHole 等大部分 connector 的实现当前没有包含在 Flink 官方提供的发行包中好比 Kafka、ES 等。针对没有包含在 Flink 官方提供的发行包中的 connector若是须要在 PyFlink 做业中使用用户须要显式地指定相应 FAT JAR.具体直接看阿里云的flink文档https://help.aliyun.com/document_detail/176688.html四、在 Catalog 中创建表 表可以是临时的并与单个 Flink 会话session的生命周期相关也可以是永久的并且在多个 Flink 会话和群集cluster中可见。 永久表需要 catalog例如 Hive Metastore以维护表的元数据。一旦永久表被创建它将对任何连接到 catalog 的 Flink 会话可见且持续存在直至被明确删除。 通过 SQL DDL 创建的表和视图 例如 “create table …” 和 “create view …都存储在 catalog 中。 你可以通过 SQL 直接访问 catalog 中的表。 如果你要用 Table API 来使用 catalog 中的表可以使用 “from_path” 方法来创建 Table API 对象# 准备 catalog
# 将 Table API 表注册到 catalog 中
table table_env.from_elements([(1, Hi), (2, Hello)], [id, data])
table_env.create_temporary_view(source_table, table)# 从 catalog 中获取 Table API 表
new_table table_env.from_path(source_table)
new_table.execute().print()----------------------------------------------------------
| op | id | data |
----------------------------------------------------------
| I | 1 | Hi |
| I | 2 | Hello |
----------------------------------------------------------五、Table API-DML语法from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table.expressions import col
from pyflink.table.expressions import concat# 通过 batch table environment 来执行查询
env_settings EnvironmentSettings.in_batch_mode()
table_env TableEnvironment.create(env_settings)orders table_env.from_elements([(Jack, FRANCE, 10), (Rose, ENGLAND, 30), (Jack, FRANCE, 20)],[name, country, revenue])1.查询-select
# 查询两列
revenue orders.select(col(name), col(country).alias(country))
# 查询全部列
revenue1 orders .select(col(*)) revenue.execute().print()
table_result revenue.execute()
print(type(table_result ))----------------------------------------------------------------
| name | country |
----------------------------------------------------------------
| Jack | FRANCE |
| Rose | ENGLAND |
| Jack | FRANCE |
| Bob | CH |
| Bob | CH |
| YU | CH |
----------------------------------------------------------------class pyflink.table.table_result.TableResult说下返回值通过打印可以知道revenue.execute()返回值类型是TableResult这个类型不能直接通过for循环遍历。需要调用collect()方法然后在遍历。for res_row in table_result.collect():for rr in res_row:print(rr)对于 SELECT 操作除非已收集所有结果数据否则作业不会完成所以除非是有界或是批处理那么不建议使用for循环遍历数据。#所以建立使用with循环
with table_result.collect() as results:for result in results:2.过滤-where等同filter和 SQL 的 WHERE 子句类似。 过滤掉未验证通过过滤谓词的行。--javascripttypescriptbashsqljsonhtmlcssccppjavarubypythongorustmarkdownresult orders.where(col(name) Jack)
#或
result orders.filter(col(name) Jack)
#打印
result .execute().print()------------------------------------------------------------------------------------------------------
| name | country | revenue | r_time |
------------------------------------------------------------------------------------------------------
| Jack | FRANCE | 10 | 2023-02-23 11:11:33.081 |
| Jack | FRANCE | 20 | 2023-02-24 07:11:33.081 |
------------------------------------------------------------------------------------------------------3.列操作--javascripttypescriptbashsqljsonhtmlcssccppjavarubypythongorustmarkdown#添加列add_columns但是如果该列存在则直接报错
#concat 合并
result orders.add_columns(concat(col(name), sunny).alias(desc))
result .execute().print()--------------------------------------------------------------------------------------------------------------------------------------
| name | country | revenue | r_time | desc |
--------------------------------------------------------------------------------------------------------------------------------------
| Jack | FRANCE | 10 | 2023-02-23 11:11:33.081 | Jacksunny |
| Rose | ENGLAND | 30 | 2023-02-23 21:11:33.081 | Rosesunny |
| Jack | FRANCE | 20 | 2023-02-24 07:11:33.081 | Jacksunny |
| Bob | CH | 40 | 2023-02-24 17:11:33.081 | Bobsunny |
| Bob | CH | 50 | 2023-02-24 17:11:33.081 | Bobsunny |
| YU | CH | 100 | 2023-02-23 14:11:33.081 | YUsunny |
--------------------------------------------------------------------------------------------------------------------------------------#add_or_replace_columns执行字段添加操作。 如果添加的列名称和已存在的列名称相同则已存在的字段将被替换。 此外如果添加的字段里面有重复的字段名则会使用最后一个字段。
result orders.add_or_replace_columns(concat(col(name), sunny).alias(desc)).select(col(name),col(desc))
result.execute().print()----------------------------------------------------------------
| name | desc |
----------------------------------------------------------------
| Jack | Jacksunny |
| Rose | Rosesunny |
| Jack | Jacksunny |
| Bob | Bobsunny |
| Bob | Bobsunny |
| YU | YUsunny |
----------------------------------------------------------------#删除列如果删除多个则用逗号隔开drop_columns(col(a),col(b))
result orders.drop_columns(col(name))
result.execute().print()----------------------------------------------------------------------
| country | revenue | r_time |
----------------------------------------------------------------------
| FRANCE | 10 | 2023-02-23 11:11:33.081 |
| ENGLAND | 30 | 2023-02-23 21:11:33.081 |
| FRANCE | 20 | 2023-02-24 07:11:33.081 |
| CH | 40 | 2023-02-24 17:11:33.081 |
| CH | 50 | 2023-02-24 17:11:33.081 |
| CH | 100 | 2023-02-23 14:11:33.081 |
----------------------------------------------------------------------#修改列名
result orders.rename_columns(col(name).alias(name1), col(country).alias(country2))
result.execute().print()------------------------------------------------------------------------------------------------------
| name1 | country2 | revenue | r_time |
------------------------------------------------------------------------------------------------------
| Jack | FRANCE | 10 | 2023-02-23 11:11:33.081 |
| Rose | ENGLAND | 30 | 2023-02-23 21:11:33.081 |
| Jack | FRANCE | 20 | 2023-02-24 07:11:33.081 |
| Bob | CH | 40 | 2023-02-24 17:11:33.081 |
| Bob | CH | 50 | 2023-02-24 17:11:33.081 |
| YU | CH | 100 | 2023-02-23 14:11:33.081 |
------------------------------------------------------------------------------------------------------4.聚合计算-Aggregations4.1 group_by # 计算所有来自法国客户的收入
# 使用group_by 来进行分组计算
#对于流失计算因为数据是无界的计算出的结果是可能是无限长的取决查询或聚合的字段所以当是流式时请提供空闲状态保留时间。
revenue orders \.select(col(name), col(country), col(revenue)) \.where(col(country) FRANCE) \.group_by(col(name)) \.select(col(name), orders.revenue.sum.alias(rev_sum))revenue.execute().print()---------------------------------------------
| name | rev_sum |
---------------------------------------------
| Jack | 30 |
---------------------------------------------4.2窗口函数 - Tumble滚动窗口将行分配给固定长度的非重叠连续窗口。例如一个 5 分钟的滚动窗口以 5 分钟的间隔对行进行分组。滚动窗口可以定义在事件时间、处理时间或行数上。#生成测试数据
orders table_env.from_elements(
[
(Jack, FRANCE, 10, datetime.now()timedelta(hours2)),
(Rose, ENGLAND, 30, datetime.now()timedelta(hours12)),
(Jack, FRANCE, 20, datetime.now()timedelta(hours22)),
(Bob, CH, 40, datetime.now()timedelta(hours32)),
(Bob, CH, 50, datetime.now()timedelta(hours32)),
(YU, CH, 100, datetime.now()timedelta(hours5))
],
DataTypes.ROW([DataTypes.FIELD(name, DataTypes.STRING()),
DataTypes.FIELD(country, DataTypes.STRING()),
DataTypes.FIELD(revenue, DataTypes.INT()),
DataTypes.FIELD(r_time, DataTypes.TIMESTAMP(3))]))#设置窗口函数
win_fun Tumble.over(lit(1).hours).on(col(r_time)).alias(w)over将窗口的长度定义为时间或行计数间隔。on要对数据进行分组时间间隔或排序行计数的时间属性。批处理查询支持任意 Long 或 Timestamp 类型的属性。流处理查询仅支持声明的事件时间或处理时间属性。alias指定窗口的别名。别名用于在 group_by() 子句中引用窗口并可以在 select() 子句中选择如窗口开始、结束或行时间戳的窗口属性。#使用窗口函数
result orders.window(win_fun) \.group_by(col(name), col(w)) \.select(col(name), col(w).start, col(w).end, col(revenue).sum.alias(d)).order_by(d)
result.execute().print()-----------------------------------------------------------------------------------------------
| name | EXPR$0 | EXPR$1 | d |
-----------------------------------------------------------------------------------------------
| Jack | 2023-02-22 19:00:00.000 | 2023-02-22 20:00:00.000 | 10 |
| Jack | 2023-02-23 15:00:00.000 | 2023-02-23 16:00:00.000 | 20 |
| Rose | 2023-02-23 05:00:00.000 | 2023-02-23 06:00:00.000 | 30 |
| Bob | 2023-02-24 01:00:00.000 | 2023-02-24 02:00:00.000 | 90 |
| YU | 2023-02-22 22:00:00.000 | 2023-02-22 23:00:00.000 | 100 |
-----------------------------------------------------------------------------------------------4.3窗口函数 - Over Window和 SQL 的 OVER 子句类似 PS暂时没有测试数据参考sql即可Over.partition_by(col(a)) \ .order_by(col(rowtime)) \ .preceding(expr.UNBOUNDED_RANGE) \ .alias(w)order_by 需是time 属性才可以排序4.4Distinct Aggregation 和 SQL DISTINCT 聚合子句类似例如 COUNT(DISTINCT a)。 #去重后相加
group_by_distinct_result orders.group_by(col(name)) \.select(col(name), col(revenue).sum.distinct.alias(d))
group_by_distinct_result .execute().print()---------------------------------------------
| name | d |
---------------------------------------------
| Jack | 30 |
| Bob | 90 |
| YU | 100 |
| Rose | 30 |
---------------------------------------------也可以直接Distinct筛选完全相同的行数据。orders1 table_env.from_elements([(Jack, FRANCE, 10),(Jack, FRANCE, 10)],DataTypes.ROW([DataTypes.FIELD(name, DataTypes.STRING()), DataTypes.FIELD(country, DataTypes.STRING()),DataTypes.FIELD(revenue, DataTypes.INT())]))
result orders1.distinct()
result .execute().print()-----------------------------------------------------------------------------
| name | country | revenue |
-----------------------------------------------------------------------------
| Jack | FRANCE | 10 |
-----------------------------------------------------------------------------