移动端网站开发软件,国内优秀的设计网站推荐,仿站能被百度收录吗,wordpress 搜索栏目Lookup Join 是流式查询中的一种 Join#xff0c;Join 要求一个表具有处理时间属性#xff0c;另一个表由lookup source connector支持。
Paimon支持在主键表和附加表上进行Lookup Join。
a) 准备
创建一个Paimon表并实时更新它。
-- Create a paimon catalog
CREATE CAT…Lookup Join 是流式查询中的一种 JoinJoin 要求一个表具有处理时间属性另一个表由lookup source connector支持。
Paimon支持在主键表和附加表上进行Lookup Join。
a) 准备
创建一个Paimon表并实时更新它。
-- Create a paimon catalog
CREATE CATALOG my_catalog WITH (typepaimon,warehousehdfs://nn:8020/warehouse/path -- or file://tmp/foo/bar
);USE CATALOG my_catalog;-- Create a table in paimon catalog
CREATE TABLE customers (id INT PRIMARY KEY NOT ENFORCED,name STRING,country STRING,zip STRING
);-- Launch a streaming job to update customers table
INSERT INTO customers ...-- Create a temporary left table, like from kafka
CREATE TEMPORARY TABLE Orders (order_id INT,total INT,customer_id INT,proc_time AS PROCTIME()
) WITH (connector kafka,topic ...,properties.bootstrap.servers ...,format csv...
);b) Normal Lookup正常查找
可以在lookup join query中使用customers。
-- enrich each order with customer information
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
JOIN customers
FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id c.id;c) Retry Lookup重试查找
在 Flink 1.16 如果Orders记录主表没有 Join 上是因为相应的customers数据查找表尚未准备就绪可以使用Flink的延迟重试策略进行查找。
-- enrich each order with customer information
SELECT /* LOOKUP(tablec, retry-predicatelookup_miss, retry-strategyfixed_delay, fixed-delay1s, max-attempts600) */
o.order_id, o.total, c.country, c.zip
FROM Orders AS o
JOIN customers
FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id c.id;d) Async Retry Lookup异步重试查找
同步重试的问题是一条记录没返回会阻塞后续记录导致整个作业被阻塞可以使用async allow_unordered以避免阻塞。
-- enrich each order with customer information
SELECT /* LOOKUP(tablec, retry-predicatelookup_miss, output-modeallow_unordered, retry-strategyfixed_delay, fixed-delay1s, max-attempts600) */
o.order_id, o.total, c.country, c.zip
FROM Orders AS o
JOIN customers /* OPTIONS(lookup.asynctrue, lookup.async-thread-number16) */
FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id c.id;如果主表Orders是CDC流allow_unordered将被Flink SQL忽略仅支持附加流可能阻塞流式任务可以尝试使用Paimon的audit_log系统表功能将CDC流转换为附加流。
8Query Service
可以运行Flink流作业来启动表的查询服务当QueryService存在时Flink Lookup Join将优先从中获取数据这将有效地提高查询性能。
Flink SQL
CALL sys.query_service(database_name.table_name, parallelism);Flink Action
FLINK_HOME/bin/flink run \/path/to/paimon-flink-action-0.7.0-incubating.jar \query_service \--warehouse warehouse-path \--database database-name \--table table-name \[--parallelism parallelism] \[--catalog_conf paimon-catalog-conf [--catalog_conf paimon-catalog-conf ...]]