网站建设公司不给ftp,网站建设公开,没有网站怎么做网推,seo优化排名怎么做表定义 动态表(dynamic table)#xff1a;动态表是流的另一种表达方式#xff0c;动态表作为一个逻辑的抽象概念#xff0c;使我们更容易理解flink中将streaming发展到table这个层次的设计#xff0c;本质都是对无边界、持续变更数据的表示形式#xff0c;所以动态表与流之…表定义 动态表(dynamic table)动态表是流的另一种表达方式动态表作为一个逻辑的抽象概念使我们更容易理解flink中将streaming发展到table这个层次的设计本质都是对无边界、持续变更数据的表示形式所以动态表与流之间可以相互转换。 版本表(dynamic table)动态表之上的定义版本是一个拥有主键和时间属性的动态表(建表语句必需包含PRIMARY KEY和WATERMARK)所以版本表可以追踪每个key在某时间点/时间内的变化情况。版本表可以直接从changelog格式的source创建或者基于append-only的源创建版本视图。 时态表(temporal table)时态表是随着时间变化而变化的也就是动态表时态表包含一个或多个版本表的快照当它能追踪所有记录的历史变更(来自数据库的changelog)时就是个版本表如果它只能表示所有记录经过物化后的最新快照(直接一个数据库表)那就是个普通表。
Regular Joins
Regular Joins是flink这么多join类型中最普通的任意一侧的数据流有变更或者新增都会影响到join结果。Regular joins是通过把双流的输入数据都保存在flink的状态中存在state过度膨胀的隐患所以我们在使用时要合理设置table状态的TTL(table.exec.state.ttl)这要结合具体的业务场景否则会影响join结果的正确性。
有两种join类型内连接(INNER Equi-JOIN)和外连接(OUTER Equi-JOIN)两者都只支持等值连接且至少一个连接条件。
Interval Joins
Interval join要求至少有一个等值谓词连接和一个时间约束条件这个时间属性定义了流的时间范围且作为WATERMARK ltime rtime ltime rtime AND ltime rtime INTERVAL ‘10’ MINUTE ltime BETWEEN rtime - INTERVAL ‘10’ SECOND AND rtime INTERVAL ‘5’ SECOND 与Regular join一样是双流但是它加上了时间区间的概念可以清理状态中较旧的数据而不会影响join结果的正确性通过InnerJoin算子实现水位线来控制join的数据区间以及清理数据所以两个输入流都要定义WATERMARK否则会变回Regular joinWATERMARK可以定义为event-time或process-time只支持append-only的输入流当尝试使用cdc作为输入源Retract时出报错
Exception in thread main org.apache.flink.table.api.TableException: StreamPhysicalIntervalJoin doesnt support consuming update and delete changes which is produced by node TableSourceScan(table[[ tb_order]], fields[order_id, price, currency, order_time])Temporal Joins
与时态表的join通过上述时态表的描述可得知可以关联得到记录的历史版本或只能得到最新版本flink sql遵循SQL:2011的标准语法 SELECT [column_list]FROM table1 [AS alias1][LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS alias2]ON table1.column-name1 table2.column-name1从使用形式划分可以分为3种Event Time Temporal Join、Processing Time Temporal Join、Temporal Table Function Join
Event Time Temporal Join
Event Time temporal join的是一个版本表意味着可以根据主表的事件时间关联到当时维表的具体版本。 Temporal table join currently only supports ‘FOR SYSTEM_TIME AS OF’ left table’s time attribute field Event-Time Temporal Table Join requires both primary key and row time attribute in versioned table, but no row time attribute can be found. Temporal table’s primary key [currency] must be included in the equivalence condition of temporal join 左表必需定义事件时间右表除了定义事件时间外还需要定义主键即版本表水位线替触发join所以两侧流都需要设置正常的水位线右表的主键必需作为等值谓词连接与regular join相比结果不会受到右侧流影响即输出结果以左流为主右流只是补充了左流的信息效果与left join相似与interval join相比不需要定义时间窗口即可以关联到更久之前的维度因为版本表会保存全量维度最新版本以及上一个水位线之后的变更一个水位线之前的版本数据将会被清理因为随着水位线的推移这些数据将不会再被用到
Processing Time Temporal Join Processing-time temporal join is not supported yet 从flink 1.14开始已经不再支持这种方式可以使用temporal table function语法替换
右表定义process-time属性总是能关联到最新的右表记录其实这和lookup join差不多只不过右表的记录是以HashMap全部保存着最新的版本在状态中。这种方式的强大之处在于可以直接对接不能变成flink动态表的外部表例如hbase
与regular joins相比右表的数据变化不会影响到之前已经join出的结果与interval joins相比不需要定义时间窗口且所有旧版本数据都不会保存在状态中
Temporal Table Function Join Join key must be the same as temporal table’s primary key create table if not exists tb_order (order_id int,price int,currency string,order_time timestamp(3),proc_time AS PROCTIME(),primary key(order_id) not enforced)WITH (connectormysql-cdc,table-nametb_order,)create table if not exists tb_currency (currency string,rate int,update_time timestamp(3),proc_time AS PROCTIME(),)WITH (connectormysql-cdc,table-nametb_currency,)TemporalTableFunction rate tEnv.from(tb_currency).createTemporalTableFunction(proc_time, currency);tEnv.createTemporarySystemFunction(rate,rate);select * from tb_order o , LATERAL TABLE(rate(o.order_time)) c where o.currencyc.currency上面例子实现了process time temporal join两建表语句都不指定事件时间且tb_currency无需指定primary key即非版本表但是在定义TemporalTableFunction可以指定任意字段为主键所以如果建表语句指定了事件时间且TemporalTableFunction也使用事件时间那么相当于间接创建了版本表。
先要定义table funtion指定一个时间属性(event-time或process-time)和主键TemporalTableFunction定义的主键必须作为等值谓词连接除了可以和版本表join还能和普通的表示最新版本的表/视图join即它包含了event-time/processing-time temporal join两种
Lookup Join
通过查询外部存储系统的数据以丰富流的属性这种join方式要求流表必须有一个processing time属性外部数据表的connector要实现LookupTableSource接口 CREATE TEMPORARY TABLE Customers (id INT,name STRING,country STRING,zip STRING) WITH (connector jdbc,table-name customers);SELECT o.order_id, o.total, c.country, c.zipFROM Orders AS oJOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS cON o.customer_id c.id;外部表的数据变化不会影响到已经join出的结果上面所有join都是双流而lookup join是单流