服装网站建设前景分析,怎么寻求网站建设,做高端网站,驻马店阿里巴巴做网站文章目录 一、CDC 入湖1.1、[开启binlog](https://blog.csdn.net/wuxintdrh/article/details/130142601)1.2、创建测试表1.2.1、创建mysql表1.2.2、将 binlog 日志 写入 kafka1、使用 mysql-cdc 监听 binlog2、kafka 作为 sink表3、写入sink 表 1.2.3、将 kakfa 数据写入hudi1、… 文章目录 一、CDC 入湖1.1、[开启binlog](https://blog.csdn.net/wuxintdrh/article/details/130142601)1.2、创建测试表1.2.1、创建mysql表1.2.2、将 binlog 日志 写入 kafka1、使用 mysql-cdc 监听 binlog2、kafka 作为 sink表3、写入sink 表 1.2.3、将 kakfa 数据写入hudi1、kafak 作为 源表flinksql 消费kafka 二、Bulk Insert (离线批量导入)2.1、buck_insert 案例2.2.1、mysql jdbc2.2.2、hudi buck_insert2.2.3、buck insert 写入hudi 表 三、Index Bootstrap 全量接增量3.1、Index Bootstrap 案例 四、Changelog Mode4.1、基本特性4.2、可选配置参数4.3、案例 五、Append Mode5.1、Inline Clustering 只支持 Copy_On_Write 表5.2、Async Clustering5.3、Clustering Plan Strategy 六、Bucket Index6.1、WITH 参数6.2、与 state index 对比七、Rate Limit (限流) 使用版本 hudi-0.12.1
flink-1.15.2一、CDC 入湖
CDC(change data capture) 保证了完整数据变更目前主要有两种方式
1、直接使用 cdc-connector 对接 DB 的 binlog数据导入。优点是不依赖消息队列缺点是对 db server 造成压力。2、对接 cdc format 消费 kafka 数据导入 hudi优点是可扩展性强缺点是依赖 kafka。
接下来我们主要介绍 第二种方式
1.1、开启binlog
1.2、创建测试表
1.2.1、创建mysql表
create database hudi_test;
use hudi_test;-- 建表
create table person(id int auto_increment primary key,name varchar(30),age int
);1.2.2、将 binlog 日志 写入 kafka
mysql-cdc 参考 https://chbxw.blog.csdn.net/article/details/119841434 使用cdc-2.x
1、使用 mysql-cdc 监听 binlog
wget https://maven.aliyun.com/repository/central/com/ververica/flink-connector-mysql-cdc/2.0.0/flink-connector-mysql-cdc-2.0.0.jar
Flink SQL
create database hudi_test;
use hudi_test;create table person_binlog (id bigint not null,name string,age int,primary key (id) not enforced
) with (connector mysql-cdc,hostname chb1,port 3306,username root,password 123456,database-name flinktest,table-name person
);使用mysql-cdc 报错
NoClassDefFoundError: org/apache/flink/shaded/guava18/com/google/common/util/concurrent/Thr原因在于sql和非sql connector实现中对于shaded guava的处理不同 使用 flink-sql-connector-mysql-cdc 替代 flink-connector-mysql-cdc 而且2.0.0版本不行提升到2.2.1版本解决问题。
2、kafka 作为 sink表
-- 为了显示更清晰
Flink SQL SET sql-client.execution.result-mode tableau;
[INFO] Session property has been set.Flink SQL SET execution.runtime-mode streaming;
[INFO] Session property has been set.Flink SQL
create table person_binlog_sink_kafka(id bigint not null,name string,age int not null,primary key (id) not enforced -- 主键
) with (connector upsert-kafka -- kafka connector upsert-kafka,topic cdc_mysql_person_sink,properties.zookeeper.connect chb1:2181,properties.bootstrap.servers chb1:9092,key.format json,value.format json
);3、写入sink 表
Flink SQL
insert into person_binlog_sink_kafka
select * from person_binlog;1.2.3、将 kakfa 数据写入hudi
1、kafak 作为 源表flinksql 消费kafka
Flink SQL
create table person_binlog_source_kafka (id bigint not null,name string,age int not null
) with (connector kafka,topic cdc_mysql_person_sink,properties.bootstrap.servers chb1:9092,format json,scan.startup.mode earliest-offset,properties.group.id testGroup
);2、创建hudi目标表
Flink SQL
create table person_binlog_sink_hudi (id bigint not null,name string,age int not null,primary key (id) not enforced -- 主键
) with (connector hudi,path hdfs://chb3:8020/hudi_db/person_binlog_sink_hudi,table.type MERGE_ON_READ,write.option insert
);3、将 kafka 中数据 写入 hudi
Flink SQL
insert into person_binlog_sink_hudi
select * from person_binlog_source_kafka;插入20条数据产生332个小文件 小文件问题
二、Bulk Insert (离线批量导入)
如果数据源来源于其他系统可以使用批量导入数据功能快速的将存量数据导入hudi。
1、消除了序列化和数据合并。由于跳过了重复数据删除用户需要保证数据的唯一性。2、在批处理执行模式下效率更高。默认情况下批处理执行模式将输入记录按分区路径进行排序并写入Hudi避免频繁切换文件句柄导致写性能下降。
Flink SQL
SET execution.runtime-mode streaming; // 默认是流模式
SET execution.checkpointing.interval 0; // 关闭checkpoint batch模式不支持checkpoint3、bulk_insert 的并行度由write.tasks指定。并行度会影响小文件的数量。理论上bulk_insert的并行度是桶的数量(特别是当每个桶写入到最大文件大小时它将切换到新的文件句柄。最后文件的数量 write.bucket_assign.tasks。
参数名是否必选默认值备注write.operationtrueupsert设置为bulk_insert 开启功能write.tasksfalse4bulk_insert 的并行度, 文件数量 write.bucket_assign.taskswrite.bulk_insert.shuffle_inputfalsetrue写入前是否根据输入字段(分区) shuffle。启用此选项将减少小文件的数量但可能存在数据倾斜的风险write.bulk_insert.sort_inputfalsetrue写入前是否根据输入字段(partition字段)对数据进行排序。当一个 write task写多个分区时启用该选项将减少小文件的数量。write.sort.memoryfalse128排序算子 可用的 managed memory 默认128 MB
2.1、buck_insert 案例
2.2.1、mysql jdbc
参考 https://chbxw.blog.csdn.net/article/details/119479967
Flink SQL
create table person (id int not null,name string,age int not null,primary key (id) not enforced
) with (connector jdbc,url jdbc:mysql://chb1:3306/flinktest,username root,password 123456,table-name person
);报错 java.lang.Integer cannot be cast to java.lang.Long, 由于 mysql 中 person的id 是 int 类型 转为 flink 对应的是 int, 但是在flink建表时 字段为 bigint.所以报错。
2.2.2、hudi buck_insert
Flink SQL
create table person_binlog_sink_hudi_buck (id int not null,name string,age int not null,primary key (id) not enforced -- 主键
) with (connector hudi,path hdfs://chb3:8020/hudi_db/person_binlog_sink_hudi_buck,table.type MERGE_ON_READ,write.option bulk_insert -- 配置 buck_insert 模式
);2.2.3、buck insert 写入hudi 表
Flink SQL
insert into person_binlog_sink_hudi_buck
select * from person;一次性的。
三、Index Bootstrap 全量接增量
在上面使用 buck_insert 已经完成全量数据导入接下来 用户可以通过Index Bootstrap 功能实时插入增量数据保证数据不重复。
WITH 参数
参数名是否必选默认值备注index.bootstrap.enabledtruefalse此功能开启Hudi 表中剩余的记录将一次性加载到Flink状态index.partition.regexfalse*优化选择。设置正则表达式以过滤分区。默认情况下所有分区都加载到flink状态
使用方法
CREATE TABLE创建一条与Hudi表对应的语句。 注意这个 table.type 配置必须正确。设置index.bootstrap.enabled true来启用index bootstrap功能在flink-conf.yaml文件中设置Flink checkpoint的容错机制设置配置项execution.checkpointing.tolerable-failed-checkpoints n取决于Flink checkpoint执行时间等待直到第一个checkpoint成功表明index bootstrap完成。在index bootstrap完成后用户可以退出并保存savepoint(或直接使用外部 checkpoint)。重启任务并且设置index.bootstrap.enable 为 false。
注意
索引引导是一个阻塞过程因此在索引引导期间无法完成checkpoint。index bootstrap由输入数据触发。 用户需要确保每个分区中至少有一条记录。index bootstrap是并发执行的。用户可以在日志文件中通过finish loading the index under partition以及Load record form file观察index bootstrap的进度。第一个成功的checkpoint表明 index bootstrap已完成。 从checkpoint恢复时不需要再次加载索引。
3.1、Index Bootstrap 案例
Flink SQL
create table person_binlog_sink_hudi_boot (id bigint not null,name string,age int not null,primary key (id) not enforced -- 主键
) with (connector hudi,path hdfs://chb3:8020/hudi_db/person_binlog_sink_hudi_buck,table.type MERGE_ON_READ,index.bootstrap.enabledtrue
);index bootstrap表接cdc表
Flink SQL
insert into person_binlog_sink_hudi_boot
select * from person_binlog;四、Changelog Mode
4.1、基本特性
Hudi可以保留消息的所有中间变化(I / -U / U / D)然后通过flink的状态计算消费从而拥有一个接近实时的数据仓库ETL管道(增量计算)。 Hudi MOR表以行的形式存储消息支持保留所有更改日志(格式级集成)。 所有的更新日志记录可以使用Flink流阅读器。
4.2、可选配置参数
参数名是否必选默认值备注changelog.enabledfalsefalse默认是关闭的即upsert语义只有合并的消息被确保保留中间的更改可以被合并。 设置为true以支持消费所有的更改
注意 不管格式是否存储了中间更改日志消息批(快照)读取仍然合并所有中间更改。 在设置changelog.enable为true时中间的变更也是 best effort: 异步压缩任务将更新日志记录合并到一条记录中因此如果流源不及时消费则压缩后只能读取每个key的合并记录。 解决方案是通过调整压缩策略比如压缩选项:compress.delta_commits和compression.delta_seconds为读取器保留一些缓冲时间。
4.3、案例 Flink SQL SET sql-client.execution.result-mode tableau; -- table tableau changelog
[INFO] Session property has been set.Flink SQL SET execution.runtime-mode streaming;
[INFO] Session property has been set.Flink SQL
create table person2(id bigint not null,name string,age int not null,primary key (id) not enforced -- 主键
) with (connector hudi,path hdfs://chb3:8020/hudi_db/person2,table.type MERGE_ON_READ,read.streaming.enabled true,read.streaming.check-interval 4,changelog.enabled true
);-- 插入数据
insert into person2 values (1, chb, 23);
insert into person2 values (1, chb, 24);select * from person2; 创建非changelog表 url 指向person2同一路径 Flink SQL
create table person3(id bigint not null,name string,age int not null,primary key (id) not enforced -- 主键
) with (connector hudi,path hdfs://chb3:8020/hudi_db/person2,table.type MERGE_ON_READ,read.streaming.enabled true,read.streaming.check-interval 4
);
结果只有最新数据 报错 Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.mapred.FileInputFormat
拷贝 hadoop-mapreduce-client-core.jar 到 flink lib.
五、Append Mode
从 0.10 开始支持
对于 INSERT 模式
MOR 默认会 apply 小文件策略 会追加写 avro log 文件COW 每次直接写新的 parquet 文件没有小文件策略
Hudi 支持丰富的 Clustering 策略优化 INSERT 模式下的小文件问题。
5.1、Inline Clustering 只支持 Copy_On_Write 表
参数名是否必选默认值备注write.insert.clusterfalsefalse是否在写入时合并小文件COW 表默认 insert 写不合并小文件开启该参数后每次写入会优先合并之前的小文件不会去重吞吐会受影响 用的比较少建议使用 Async Clustering
5.2、Async Clustering
从 0.12 开始支持
WITH 参数
名称Required默认值说明clustering.schedule.enabledfalsefalse是否在写入时定时异步调度 clustering plan默认关闭clustering.delta_commitsfalse4调度 clsutering plan 的间隔 commitsclustering.schedule.enabled 为 true 时生效clustering.async.enabledfalsefalse是否异步执行 clustering plan默认关闭clustering.tasksfalse4Clustering task 执行并发clustering.plan.strategy.target.file.max.bytesfalse1024 * 1024 * 1024Clustering 单文件目标大小默认 1GBclustering.plan.strategy.small.file.limitfalse600小于该大小的文件才会参与 clustering默认600MBclustering.plan.strategy.sort.columnsfalseN/A支持指定特殊的排序字段clustering.plan.partition.filter.modefalseNONE支持NONE不做限制RECENT_DAYS按时间天回溯SELECTED_PARTITIONS指定固定的 partitionclustering.plan.strategy.daybased.lookback.partitionsfalse2RECENT_DAYS 生效默认 2 天
5.3、Clustering Plan Strategy
支持定制化的 clustering 策略。
名称Required默认值说明clustering.plan.partition.filter.modeFALSENONE支持· NONE不做限制· RECENT_DAYS按时间天回溯· SELECTED_PARTITIONS指定固定的 partitionclustering.plan.strategy.daybased.lookback.partitionsFALSE2RECENT_DAYS 生效默认 2 天clustering.plan.strategy.cluster.begin.partitionFALSEN/ASELECTED_PARTITIONS 生效指定开始 partition(inclusive)clustering.plan.strategy.cluster.end.partitionFALSEN/ASELECTED_PARTITIONS 生效指定结束 partition(incluseve)clustering.plan.strategy.partition.regex.patternFALSEN/A正则表达式过滤 partitionsclustering.plan.strategy.partition.selectedFALSEN/A显示指定目标 partitions支持逗号 , 分割多个 partition
六、Bucket Index
默认的 flink 流式写入使用 state 存储索引信息primary key 到 fileId 的映射关系。当数据量比较大的时候state的存储开销可能成为瓶颈bucket 索引通过固定的 hash 策略将相同 key 的数据分配到同一个 fileGroup 中避免了索引的存储和查询开销。
6.1、WITH 参数
名称Required默认值说明index.typefalseFLINK_STATE设置 BUCKET 开启 Bucket 索引功能hoodie.bucket.index.hash.fieldfalse主键可以设置成主键的子集hoodie.bucket.index.num.bucketsfalse4默认每个 partition 的 bucket 数当前设置后则不可再变更。
6.2、与 state index 对比
1bucket index 没有 state 的存储计算开销性能较好2bucket index 无法扩容 bucketsstate index 则可以依据文件的大小动态扩容3bucket index 不支持跨 partition 的变更(如果输入是 cdc 流则没有这个限制)state index 没有限制
七、Rate Limit (限流)
有许多用户将完整的历史数据集与实时增量数据一起放到消息队列中的用例。然后使用 flink 将队列中的数据从最早的偏移量消费到hudi中。 消费历史数据集具有以下特点:
1)瞬时吞吐量巨大2)严重无序(随机写分区)。
这将导致写入性能下降和吞吐量故障。对于这种情况可以打开速度限制参数以确保流的平滑写入。
名称Required默认值说明write.rate.limitfalse0默认禁止限流
参考 https://hudi.apache.org/cn/docs/hoodie_deltastreamer/#flink-ingestion