当前位置: 首页 > news >正文

frontpage制作个人网站 技巧大气宏伟wordpress企业主题

frontpage制作个人网站 技巧,大气宏伟wordpress企业主题,哪个网站可以做翻译兼职,96微信编辑器官网Paimon的下载及安装#xff0c;并且了解了主键表的引擎以及changelog-producer的含义参考#xff1a; 大数据组件(四)快速入门实时数据湖存储系统Apache Paimon(1) 利用Paimon表做lookup join#xff0c;集成mysql cdc等参考#xff1a; 大数据组件(四)快速入门实时数据…Paimon的下载及安装并且了解了主键表的引擎以及changelog-producer的含义参考 大数据组件(四)快速入门实时数据湖存储系统Apache Paimon(1) 利用Paimon表做lookup join集成mysql cdc等参考 大数据组件(四)快速入门实时数据湖存储系统Apache Paimon(2) 利用Paimon的Tag兼容HiveBranch管理等参考: 大数据组件(四)快速入门实时数据湖存储系统Apache Paimon(3) 大数据组件(四)快速入门实时数据湖存储系统Apache Paimon(2) 今天我们继续快速了解下最近比较火的Apache Paimon 官方文档https://paimon.apache.org/docs/1.0/推荐阅读当流计算邂逅数据湖Paimon 的前生今世 1 利用Paimon做维表join 在流式处理中Lookup Join用来从另一个表Paimon中“查字典”来补充流的信息。一个表需要有时间标记处理时间属性另一个表需要支持快速查找查找源连接器Paimon在Flink中支持对带有主键的表和追加表进行Lookup Join操作 1.1 常规Lookup SET execution.runtime-mode streaming;-- 1、用户维表 CREATE TABLE customers (id INT PRIMARY KEY NOT ENFORCED,name STRING comment 姓名,country STRING comment 城市,zip STRING comment 邮编 ) WITH (connector paimon );-- 插入维表数据 INSERT INTO customers VALUES(1,tom,伦敦,123),(2,hank,纽约,456),(3,小明,北京,789);-- 2、模拟订单表(数据流) drop TEMPORARY TABLE orders_info; CREATE TEMPORARY TABLE orders_info (order_id INT,total INT,customer_id INT,proc_time AS PROCTIME() ) WITH (connector datagen, rows-per-second1, fields.order_id.kindsequence, fields.order_id.start1, fields.order_id.end1000000, fields.total.kindrandom, fields.total.min1, fields.total.max1000, fields.customer_id.kindrandom, fields.customer_id.min1, fields.customer_id.max3 );-- 3、维表join SELECT o.order_id, o.total, c.country, c.zip FROM orders_info AS o JOIN customers FOR SYSTEM_TIME AS OF o.proc_time AS c ON o.customer_id c.id;-- 可以看到下面的结果 ---------------------------------------------- | op | order_id | total | country | zip | ---------------------------------------------- | I | 1 | 179 | 纽约 | 456 | | I | 2 | 31 | 北京 | 789 | | I | 3 | 148 | 北京 | 789 | | I | 4 | 774 | 北京 | 789 |1.2 同步重试Lookup 如果用户维表查找表的数据未准备好导致订单表主表的记录无法完成连接你可以考虑使用Flink的延迟重试策略进行查找。这个功能仅适用于Flink 1.16及以上的版本。下面sql的提示hints设置了重试策略 tablec: 指定了应用查找重试策略的目标表的别名在这个例子中是customers表其别名为c。retry-predicatelookup_miss: 定义了触发重试的条件。这里的条件是lookup_miss意味着如果在查找过程中没有找到对应的数据即查找缺失就会触发重试机制。retry-strategyfixed_delay: 设置了重试的策略为固定延迟即每次重试之间会有固定的等待时间。fixed-delay1s: 当采用固定延迟重试策略时这里设定了每次重试前等待的时间长度为1秒。max-attempts600: 设定最大重试次数为600次。结合上面的fixed-delay设置这意味着系统会每隔1秒尝试一次数据查找最多尝试600次。 -- enrich each order with customer information SELECT /* LOOKUP(tablec, retry-predicatelookup_miss, retry-strategyfixed_delay, fixed-delay1s, max-attempts600) */o.order_id, o.total, c.country, c.zip FROM orders AS o JOIN customers FOR SYSTEM_TIME AS OF o.proc_time AS c ON o.customer_id c.id;1.3 异步重试Lookup 在同步重试机制下如果处理某一条记录时遇到了问题例如查找失败系统会按照设定的重试策略反复尝试直到成功或者达到最大重试次数。在这期间这条记录后面的其他记录即使没有问题也无法被处理因为整个处理流程被阻塞了。这会导致数据处理的整体效率低下甚至可能造成作业延迟或超时。使用异步加上allow_unordered可以在某些记录在查找时缺失也不会再阻塞其他记录。下面sql配置项解释 output-modeallow_unordered: 设置输出模式允许无序意味着当查找失败时不会阻塞其他记录的处理适合于不需要严格顺序的场景。lookup.asynctrue: 启用异步查找提高效率避免因等待某个查找结果而阻塞其它查找操作。lookup.async-thread-number16: 设置用于异步查找的线程数为16这可以加速查找过程特别是在高并发情况下。 注意如果主表orders是CDC流allow_unordered会被Flink SQL忽略只支持追加流流作业可能会被阻塞。可以尝试使用Paimon的audit_log系统表功能来绕过这个问题将CDC流转为追加流。 SELECT /* LOOKUP(tablec, retry-predicatelookup_miss, output-modeallow_unordered, retry-strategyfixed_delay, fixed-delay1s, max-attempts600) */o.order_id, o.total, c.country, c.zip FROM orders AS o JOIN customers /* OPTIONS(lookup.asynctrue, lookup.async-thread-number16) */ FOR SYSTEM_TIME AS OF o.proc_time AS c ON o.customer_id c.id;1.4 max_pt功能 在传统的数据仓库中每个分区通常维护最新的完整数据因此这种分区表只需要连接最新的分区即可。Paimon特别为此场景开发了max_pt功能。 通过max_ptLookup节点能够自动刷新并查询最新分区的数据确保所使用的客户信息始终是最新的而不需要手动干预来确定或切换到最新的数据分区。 这种方法特别适用于需要频繁更新和查询最新数据的场景可以大大提高数据处理的效率和准确性。 -- 1、分区用户维表 drop table if exists customers; CREATE TABLE customers (id INT,name STRING,country STRING,zip STRING,dt STRING,PRIMARY KEY (id, dt) NOT ENFORCED ) PARTITIONED BY (dt);-- 插入数据(维表关联时候只查找2025-02-19分区数据) INSERT INTO customers VALUES (1, Alice, USA, 10001, 2025-02-18), (2, Bob, UK, 20002, 2025-02-18), (3, Charlie, Germany, 30003, 2025-02-18), (1, Alice, USA, 10002, 2025-02-19), -- 更新了 Alice 的邮编 (4, David, France, 40004, 2025-02-19);-- 2、订单信息表解析kafka数据 CREATE TEMPORARY TABLE orders (order_id BIGINT,customer_id INT,total DECIMAL(10, 2),proc_time AS PROCTIME() ) WITH (connector kafka,topic orders_topic,properties.bootstrap.servers localhost:9092,format json,properties.group.id testordersGroup,scan.startup.mode earliest-offset,json.fail-on-missing-field false,json.ignore-parse-errors true );-- 创建topic /opt/apps/kafka_2.12-2.6.2/bin/kafka-topics.sh --create --topic orders_topic --replication-factor 1 --partitions 1 --bootstrap-server centos01:9092 -- 启动命令行生产者生产数据 /opt/apps/kafka_2.12-2.6.2/bin/kafka-console-producer.sh --topic orders_topic --bootstrap-server centos01:9092 {order_id:1001,customer_id:1,total:50.0} {order_id:1002,customer_id:2,total:30.0} {order_id:1004,customer_id:4,total:20.0}-- lookup.dynamic-partitionmax_pt(): 这个选项指示查找节点自动定位并使用最新最大的分区。max_pt()函数帮助系统识别最新的分区确保只查询最新的数据。 -- lookup.dynamic-partition.refresh-interval1 h: 设置了查找节点刷新最新分区的时间间隔为1小时。这意味着系统会每隔1小时检查一次是否有新的分区可用并自动更新到最新分区的数据。SELECT o.order_id, o.total, c.country, c.zip FROM orders AS o JOIN customers /* OPTIONS(lookup.dynamic-partitionmax_pt(), lookup.dynamic-partition.refresh-interval1 h) */ FOR SYSTEM_TIME AS OF o.proc_time AS c ON o.customer_id c.id;-------------------------------------------------------------------------------------------------------- | op | order_id | total | country | zip | -------------------------------------------------------------------------------------------------------- | I | 1001 | 50.00 | USA | 10002 | | I | 1004 | 20.00 | France | 40004 | 注 可以运行一个Flink流式作业来启动针对该paimon维表的查询服务。当QueryService存在时Flink查找连接Lookup Join会优先从该服务获取数据这将有效提高查询性能。 可以通过调用sys.query_service系统函数来实现 CALL sys.query_service(paimon_db.customers, 4); -- 设置并行度为4或者通过下面action包开启 FLINK_HOME/bin/flink run \/path/to/paimon-flink-action-1.0.0.jar \query_service \--warehouse warehouse-path \--database database-name \--table table-name \[--parallelism parallelism] \[--catalog_conf paimon-catalog-conf [--catalog_conf paimon-catalog-conf ...]]查询服务能够在内存中缓存频繁访问的数据并以高并发的方式提供这些数据减少了磁盘I/O操作和网络延迟的影响。 2 集成Mysql CDC 可以通过 Flink SQL 或者 Flink DataStream API 将 Flink CDC 数据写入 Paimon 中也可以通过Paimon 提供的 CDC 工具来完成入湖。那这两种方式有什么区别呢 上图是使用 Flink SQL 来完成入湖简单但是当源表添加新列后同步作业不会同步新的列下游 Paimon 表也不会增加新列。 上图是使用 Paimon CDC 工具来同步数据可以看到当源表发生列的新增后流作业会自动新增列的同步并传导到下游的 Paimon 表中完成 Schema Evolution 的同步。 Paimon CDC 工具也提供了整库同步 一个作业同步多张表以低成本的方式同步大量小表作业里同时自动进行 Schema Evolution新表将会被自动进行同步你不用重启作业全自动完成 推荐阅读Flink Paimon 数据 CDC 入湖最佳实践 2.1 MySQL一张表同步到Paimon一张表 2.1.1 环境准备 # mysql-cdc相关jar包的下载地址,同时还需要mysql-connector https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-mysql-cdc/3.1.1/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-mysql/3.1.1/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-cdc-common/3.1.1/# 在flink的lib目录添加下面的jar包 [rootcentos01 lib]# ll /opt/apps/flink-1.16.0/lib/ -rw-r--r--. 1 root root 259756 Feb 19 15:13 flink-cdc-common-3.1.1.jar -rw-r--r--. 1 root root 21286022 Feb 19 11:40 flink-cdc-pipeline-connector-mysql-3.1.1.jar -rw-r--r--. 1 root root 388680 Feb 19 10:57 flink-connector-mysql-cdc-3.1.1.jar -rw-r--r--. 1 root root 2475087 Feb 19 10:58 mysql-connector-java-8.0.27.jar ......# 开启MySQL Binlog并重启MySQL [rootcentos01 ~]# vim /etc/my.cnf #添加如下配置信息,开启cdc_db数据库的Binlog #数据库id server-id 1 ##启动binlog该参数的值会作为binlog的文件名 log-binmysql-bin #binlog类型 binlog_formatrow ##启用binlog的数据库需根据实际情况作出修改 binlog-do-dbcdc_db[rootcentos01 ~]# systemctl restart mysqld# 启动hadoop和yarn [rootcentos01 ~]# start-all.sh # 启动Hive [rootcentos01 ~]# nohup hive --service metastore 21 [rootcentos01 ~]# nohup hive --service hiveserver2 21 # yarn-session模式 # http://centos01:8088/cluster中可以看到Flink session cluster的job ID [rootcentos01 ~]# /opt/apps/flink-1.16.0/bin/yarn-session.sh -d # 启动Flink的sql-client [rootcentos01 ~]# /opt/apps/flink-1.16.0/bin/sql-client.sh -s yarn-session使用Hive的元数据 Flink SQL CREATE CATALOG paimon_hive_catalog WITH (type paimon,metastore hive,uri thrift://centos01:9083 );Flink SQL USE CATALOG paimon_hive_catalog; Flink SQL create database if not exists paimon_db; Flink SQL use paimon_db;# 设置显示模式 Flink SQL SET sql-client.execution.result-mode tableau; Flink SQL SET execution.runtime-mode batch;2.1.2 案例详解 如果指定的Paimon表不存在将自动创建表。其schema将从所有指定的MySQL表派生如果Paimon 表已存在则其schema将与所有指定MySQL表的schema进行比较仅支持同步具有主键的MySQL表也可MySQL多张表同步到Paimon一张表可以查看官网示例。 -- 准备测试数据 DROP TABLE IF EXISTS user_info; CREATE TABLE user_info (id bigint(20) NOT NULL AUTO_INCREMENT COMMENT 编号,login_name varchar(200) DEFAULT NULL COMMENT 用户名称,name varchar(200) DEFAULT NULL COMMENT 用户姓名,phone_num varchar(200) DEFAULT NULL COMMENT 手机号,birthday date DEFAULT NULL COMMENT 用户生日,gender varchar(1) DEFAULT NULL COMMENT 性别 M男,F女,create_time datetime DEFAULT NULL COMMENT 创建时间,operate_time datetime DEFAULT NULL COMMENT 修改时间,PRIMARY KEY (id) USING BTREE ) ENGINEInnoDB DEFAULT CHARSETutf8 ROW_FORMATDYNAMIC;-- ---------------------------- -- Records of user_info -- ---------------------------- INSERT INTO user_info(login_name, name, phone_num, birthday, gender, create_time, operate_time) VALUES (zhangsan, 张三, 13800001234, 1990-01-01, M, NOW(), NOW()), (lisi, 李四, 13800005678, 1992-02-02, F, NOW(), NOW()), (wangwu, 王五, 13800009012, 1995-03-03, M, NOW(), NOW()), (zhaoliu, 赵六, 13800003456, 1998-04-04, F, NOW(), NOW()), (sunqi, 孙七, 13800007890, 2000-05-05, M, NOW(), NOW());CREATE TABLE orders (order_id bigint(20) NOT NULL AUTO_INCREMENT COMMENT 订单编号,user_id bigint(20) NOT NULL COMMENT 用户编号,order_date datetime DEFAULT NULL COMMENT 订单日期,total_price decimal(10,2) DEFAULT NULL COMMENT 订单总价,PRIMARY KEY (order_id) USING BTREE,KEY idx_user_id (user_id) ) ENGINEInnoDB DEFAULT CHARSETutf8 ROW_FORMATDYNAMIC;INSERT INTO orders(user_id, order_date, total_price) VALUES (1001, 2025-02-18 14:32:10, 199.99), (1002, 2025-02-19 09:15:30, 299.50), (1001, 2025-02-19 12:47:05, 79.99), (1003, 2025-02-19 13:00:00, 499.00), (1004, 2025-02-20 08:20:25, 159.99);# 按照天进行分区 [rootcentos01 ~]# /opt/apps/flink-1.16.0/bin/flink run \/opt/apps/flink-1.16.0/lib/paimon-flink-action-1.0.0.jar \mysql-sync-table \--warehouse hdfs://centos01:8020/user/hive/warehouse \--database paimon_db \--table ods_user_info_cdc \--primary-keys pt,id \# 指定分区键分区键是通过函数转换而来--partition_keys pt \--computed_column ptdate_format(operate_time, yyyyMMdd) \--mysql-conf hostnamecentos01 \--mysql-conf usernameroot \--mysql-conf password123456 \--mysql-conf database-namecdc_db \--mysql-conf table-nameuser_info \--catalog-conf metastorehive \--catalog-conf urithrift://centos01:9083 \--table-conf bucket2 \--table-conf changelog-producerinput \--table-conf sink.parallelism2 我们可以进行测试了 -- 此时会自动创建paimon表(无需我们手动建表了) Flink SQL select * from ods_user_info_cdc; ----------------------------------------------------------------------------------------------------------- | id | login_name | name | phone_num | birthday | gender | create_time | operate_time | pt | ----------------------------------------------------------------------------------------------------------- | 3 | wangwu | 王五 | 13800009012 | 1995-03-03 | M | 2025-02-19 17:28:18 | 2025-02-19 17:28:18 | 20250219 | | 1 | zhangsan | 张三 | 13800001234 | 1990-01-01 | M | 2025-02-19 17:28:18 | 2025-02-19 17:28:18 | 20250219 | | 2 | lisi | 李四 | 13800005678 | 1992-02-02 | F | 2025-02-19 17:28:18 | 2025-02-19 17:28:18 | 20250219 | | 4 | zhaoliu | 赵六 | 13800003456 | 1998-04-04 | F | 2025-02-19 17:28:18 | 2025-02-19 17:28:18 | 20250219 | | 5 | sunqi | 孙七 | 13800007890 | 2000-05-05 | M | 2025-02-19 17:28:18 | 2025-02-19 17:28:18 | 20250219 | ------------------------------------------------------------------------------------------------------------- 修改原始表的schema mysql ALTER TABLE user_info ADD COLUMN email varchar(255); mysql INSERT INTO user_info(login_name, name, phone_num, birthday, gender, create_time, operate_time, email) values(hank, 汉克, 18600007890, 2000-05-05, M, NOW(), NOW(), hank163.com);-- 注意我们此时在原始sql-client窗口查询,email字段并没有加上 Flink SQL select * from ods_user_info_cdc; ----------------------------------------------------------------------------------------------------------- | id | login_name | name | phone_num | birthday | gender | create_time | operate_time | pt | ----------------------------------------------------------------------------------------------------------- | 3 | wangwu | 王五 | 13800009012 | 1995-03-03 | M | 2025-02-19 18:01:18 | 2025-02-19 18:01:18 | 20250219 | | 1 | zhangsan | 张三 | 13800001234 | 1990-01-01 | M | 2025-02-19 18:01:18 | 2025-02-19 18:01:18 | 20250219 | | 2 | lisi | 李四 | 13800005678 | 1992-02-02 | F | 2025-02-19 18:01:18 | 2025-02-19 18:01:18 | 20250219 | | 4 | zhaoliu | 赵六 | 13800003456 | 1998-04-04 | F | 2025-02-19 18:01:18 | 2025-02-19 18:01:18 | 20250219 | | 5 | sunqi | 孙七 | 13800007890 | 2000-05-05 | M | 2025-02-19 18:01:18 | 2025-02-19 18:01:18 | 20250219 | | 6 | hank | 汉克 | 18600007890 | 2000-05-05 | M | 2025-02-19 18:01:43 | 2025-02-19 18:01:43 | 20250219 | ------------------------------------------------------------------------------------------------------------- 其实此时的schema已经改变我们查看hdfs上的schema信息会发现email字实际上已经添加 [rootcentos01 ~]# hdfs dfs -cat /user/hive/warehouse/paimon_db.db/ods_user_info_cdc/schema/schema-1-- 因此我们需要另外重新启动一个sql-client窗口进行查询可以发现是正确的 Flink SQL select * from ods_user_info_cdc; ------------------------------------------------------------------------------------------------------------------------- | id | login_name | name | phone_num | birthday | gender | create_time | operate_time | pt | email | ------------------------------------------------------------------------------------------------------------------------- | 3 | wangwu | 王五 | 13800009012 | 1995-03-03 | M | 2025-02-19 18:01:18 | 2025-02-19 18:01:18 | 20250219 | NULL | | 1 | zhangsan | 张三 | 13800001234 | 1990-01-01 | M | 2025-02-19 18:01:18 | 2025-02-19 18:01:18 | 20250219 | NULL | | 2 | lisi | 李四 | 13800005678 | 1992-02-02 | F | 2025-02-19 18:01:18 | 2025-02-19 18:01:18 | 20250219 | NULL | | 4 | zhaoliu | 赵六 | 13800003456 | 1998-04-04 | F | 2025-02-19 18:01:18 | 2025-02-19 18:01:18 | 20250219 | NULL | | 5 | sunqi | 孙七 | 13800007890 | 2000-05-05 | M | 2025-02-19 18:01:18 | 2025-02-19 18:01:18 | 20250219 | NULL | | 6 | hank | 汉克 | 18600007890 | 2000-05-05 | M | 2025-02-19 18:01:43 | 2025-02-19 18:01:43 | 20250219 | hank163.com | -------------------------------------------------------------------------------------------------------------------------注 上面自动创建的paimon表是分区表但是在Hive中不是分区表 -- 报错 0: jdbc:hive2://192.168.42.101:10000 show partitions ods_user_info_cdc ; Error: Error while processing statement: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. Table ods_user_info_cdc is not a partitioned table (state42000,code1)要想在hive中也是分区表需要在同步时候设置参数metastore.partitioned-tabletrue # 同步到另一张paimon表并设置hive分区参数 [rootcentos01 ~]# /opt/apps/flink-1.16.0/bin/flink run \/opt/apps/flink-1.16.0/lib/paimon-flink-action-1.0.0.jar \mysql-sync-table \--warehouse hdfs://centos01:8020/user/hive/warehouse \--database paimon_db \--table ods_user_info_partition_cdc \--primary-keys pt,id \--partition_keys pt \--computed_column ptdate_format(operate_time, yyyyMMdd) \--mysql-conf hostnamecentos01 \--mysql-conf usernameroot \--mysql-conf password123456 \--mysql-conf database-namecdc_db \--mysql-conf table-nameuser_info \--catalog-conf metastorehive \--catalog-conf urithrift://centos01:9083 \--table-conf bucket2 \--table-conf changelog-producerinput \--table-conf metastore.partitioned-tabletrue \--table-conf sink.parallelism2 然后可以从hive中读取相关分区了 0: jdbc:hive2://192.168.42.101:10000 show partitions ods_user_info_partition_cdc ; -------------- | partition | -------------- | pt20250219 | ---------------- 读取相关分区数据 0: jdbc:hive2://192.168.42.101:10000 select * from ods_user_info_partition_cdc a where pt20250219; ----------------------------------------------------------------------------------------------------------------------------------------------- | a.id | a.login_name | a.name | a.phone_num | a.birthday | a.gender | a.create_time | a.operate_time | a.pt | a.email | ----------------------------------------------------------------------------------------------------------------------------------------------- | 1 | zhangsan | 张三 | 13800001234 | 1990-01-01 | M | 2025-02-19 18:01:18.0 | 2025-02-19 18:01:18.0 | 20250219 | NULL | | 2 | lisi | 李四 | 13800005678 | 1992-02-02 | F | 2025-02-19 18:01:18.0 | 2025-02-19 18:01:18.0 | 20250219 | lisiqq.com | | 4 | zhaoliu | 赵六 | 13800003456 | 1998-04-04 | F | 2025-02-19 18:01:18.0 | 2025-02-19 18:01:18.0 | 20250219 | NULL | | 5 | sunqi | 孙七 | 13800007890 | 2000-05-05 | M | 2025-02-19 18:01:18.0 | 2025-02-19 18:01:18.0 | 20250219 | NULL | | 6 | hank | 汉克 | 18600007890 | 2000-05-05 | M | 2025-02-19 18:01:43.0 | 2025-02-19 18:01:43.0 | 20250219 | hank163.com | | 3 | wangwu | 王五 | 13800009012 | 1995-03-03 | M | 2025-02-19 18:01:18.0 | 2025-02-19 18:01:18.0 | 20250219 | NULL | ------------------------------------------------------------------------------------------------------------------------------------------------- 需要注意的是如果我们修改数据原始的hive分区原始数据可能会减少 -- 下面示例我们修改了两条数据 0: jdbc:hive2://192.168.42.101:10000 select * from ods_user_info_partition_cdc a where pt20250219; ----------------------------------------------------------------------------------------------------------------------------------------------- | a.id | a.login_name | a.name | a.phone_num | a.birthday | a.gender | a.create_time | a.operate_time | a.email | a.pt | ----------------------------------------------------------------------------------------------------------------------------------------------- | 2 | lisi | 李四 | 13800005678 | 1992-02-02 | F | 2025-02-19 18:01:18.0 | 2025-02-19 18:01:18.0 | lisiqq.com | 20250219 | | 4 | zhaoliu | 赵六 | 13800003456 | 1998-04-04 | F | 2025-02-19 18:01:18.0 | 2025-02-19 18:01:18.0 | NULL | 20250219 | | 5 | sunqi | 孙七 | 13800007890 | 2000-05-05 | M | 2025-02-19 18:01:18.0 | 2025-02-19 18:01:18.0 | NULL | 20250219 | | 6 | hank | 汉克 | 18600007890 | 2000-05-05 | M | 2025-02-19 18:01:43.0 | 2025-02-19 18:01:43.0 | hank163.com | 20250219 | ----------------------------------------------------------------------------------------------------------------------------------------------- 4 rows selected (0.199 seconds) 0: jdbc:hive2://192.168.42.101:10000 select * from ods_user_info_partition_cdc a where pt20250220; --------------------------------------------------------------------------------------------------------------------------------------------- | a.id | a.login_name | a.name | a.phone_num | a.birthday | a.gender | a.create_time | a.operate_time | a.email | a.pt | --------------------------------------------------------------------------------------------------------------------------------------------- | 3 | wangwu | 王五 | 13800009012 | 1995-03-03 | M | 2025-02-19 18:01:18.0 | 2025-02-20 12:01:18.0 | ww163.com | 20250220 | | 1 | zhangsan | 张三三 | 13800001234 | 1990-01-01 | M | 2025-02-19 18:01:18.0 | 2025-02-20 18:01:18.0 | NULL | 20250220 | --------------------------------------------------------------------------------------------------------------------------------------------- 2 rows selected (0.189 seconds) 0: jdbc:hive2://192.168.42.101:10000 select * from ods_user_info_partition_cdc a ; ----------------------------------------------------------------------------------------------------------------------------------------------- | a.id | a.login_name | a.name | a.phone_num | a.birthday | a.gender | a.create_time | a.operate_time | a.email | a.pt | ----------------------------------------------------------------------------------------------------------------------------------------------- | 2 | lisi | 李四 | 13800005678 | 1992-02-02 | F | 2025-02-19 18:01:18.0 | 2025-02-19 18:01:18.0 | lisiqq.com | 20250219 | | 4 | zhaoliu | 赵六 | 13800003456 | 1998-04-04 | F | 2025-02-19 18:01:18.0 | 2025-02-19 18:01:18.0 | NULL | 20250219 | | 5 | sunqi | 孙七 | 13800007890 | 2000-05-05 | M | 2025-02-19 18:01:18.0 | 2025-02-19 18:01:18.0 | NULL | 20250219 | | 6 | hank | 汉克 | 18600007890 | 2000-05-05 | M | 2025-02-19 18:01:43.0 | 2025-02-19 18:01:43.0 | hank163.com | 20250219 | | 3 | wangwu | 王五 | 13800009012 | 1995-03-03 | M | 2025-02-19 18:01:18.0 | 2025-02-20 12:01:18.0 | ww163.com | 20250220 | | 1 | zhangsan | 张三三 | 13800001234 | 1990-01-01 | M | 2025-02-19 18:01:18.0 | 2025-02-20 18:01:18.0 | NULL | 20250220 | ----------------------------------------------------------------------------------------------------------------------------------------------- 当然我们在建立paimon表的时候也可以指定参数metastore.partitioned-table true Flink SQL drop table if exists flink_p_demo; Flink SQL CREATE TABLE flink_p_demo (dt STRING NOT NULL,name STRING NOT NULL,amount BIGINT,PRIMARY KEY (dt, name) NOT ENFORCED ) PARTITIONED BY (dt) WITH (connector paimon,changelog-producer lookup,metastore.partitioned-table true -- 设置为true ); Flink SQL insert into flink_p_demo values (20240725, apple, 3), (20240726, banana, 5);0: jdbc:hive2://192.168.42.101:10000 show partitions flink_p_demo; -------------- | partition | -------------- | dt20240725 | | dt20240726 | -------------- 0: jdbc:hive2://192.168.42.101:10000 select * from flink_p_demo a where dt 20240726; ------------------------------- | a.name | a.amount | a.dt | ------------------------------- | banana | 5 | 20240726 | -------------------------------2.2 整库同步 通过官方提供的paimon-action的jar包可以很方便的将 MySQL、Kafka、Mongo等中的数据实时摄入到Paimon中使用paimon-flink-action采用mysql-sync-database整库同步并通过–including_tables参数选择要同步的表这种同步模式有效地节省了大量资源开销相比每个表启动一个 Flink 任务而言避免了资源的大量浪费。 [rootcentos01 ~]# /opt/apps/flink-1.16.0/bin/flink run \/opt/apps/flink-1.16.0/lib/paimon-flink-action-1.0.0.jar \mysql-sync-database \--warehouse hdfs://centos01:8020/user/hive/warehouse \--database paimon_db \--table-prefix ods_ \--table-suffix _mysql_cdc \--mysql-conf hostnamecentos01 \--mysql-conf usernameroot \--mysql-conf password123456 \--mysql-conf database-namecdc_db \--catalog-conf metastorehive \--catalog-conf urithrift://centos01:9083 \--table-conf bucket2 \--table-conf changelog-producerinput \--table-conf sink.parallelism2 \--including-tables user_info|orders-- 在一个flink任务中同步多个mysql表 -- 在paimon中会自动创建多张表 Flink SQL select * from ods_orders_mysql_cdc; Flink SQL select * from ods_user_info_mysql_cdc;
http://www.dnsts.com.cn/news/194121.html

相关文章:

  • 做cpa联盟必须要有网站吗做网站很忙吗
  • 网站建设程序员提成网络营销方式举个例子
  • 小生意是做网站还是公众号网站权重有时降
  • 北京建站模板制作个人社保缴费怎么网上缴费
  • 网站快照是什么建设网站作用
  • 个人网站制作模板响应式外贸网站建设视频
  • 网站超级链接建设工程质量管理条例2022
  • 怀柔成都网站建设wordpress 1核2g的服务器卡
  • 怎么自己创立网站竞价推广教程
  • 中国建设银行网站查询密码网站内怎样做关键词有效果
  • 百度搜索 网站图片手机网站模板 学校
  • 怎么做快递网站的分点国外网站顶部菜单设计
  • 坂田杨美企业网站建设购物网站怎么做SEO
  • 怎么注册网站电商app排名300
  • 中国建设银行官网站积分抽奖信誉好的永州网站建设
  • 佛山网站定制门户网站开源
  • 做购物网站小图标关注公众号一单一结兼职app
  • 沈阳德泰诺网站建设网上书城网站开发背景
  • 连云港规划建设网站深圳招聘官网
  • 提高网站的用户体验度怒江网站制作
  • 做网站需要美工吗wordpress修改社交标签
  • 天津网站建设如何海外cdn
  • 亚马逊网站建设特点哪些网站可以做淘宝基础销量
  • 网站推广营销怎么做wordpress 固定连接
  • 怎么做动漫照片下载网站萧山网页设计
  • 天津建设银行东丽网站aso优化师
  • 搞一个网站多少钱3合一网站怎么做
  • 百度收录网站入口桓台响应式网站建设
  • 光泽县规划建设局网站霞山网站开发公司
  • 哈尔滨seo整站优化remix做歌网站