当前位置: 首页 > news >正文

服装网站建设前景分析怎么寻求网站建设

服装网站建设前景分析,怎么寻求网站建设,做高端网站,驻马店阿里巴巴做网站文章目录 一、CDC 入湖1.1、[开启binlog](https://blog.csdn.net/wuxintdrh/article/details/130142601)1.2、创建测试表1.2.1、创建mysql表1.2.2、将 binlog 日志 写入 kafka1、使用 mysql-cdc 监听 binlog2、kafka 作为 sink表3、写入sink 表 1.2.3、将 kakfa 数据写入hudi1、… 文章目录 一、CDC 入湖1.1、[开启binlog](https://blog.csdn.net/wuxintdrh/article/details/130142601)1.2、创建测试表1.2.1、创建mysql表1.2.2、将 binlog 日志 写入 kafka1、使用 mysql-cdc 监听 binlog2、kafka 作为 sink表3、写入sink 表 1.2.3、将 kakfa 数据写入hudi1、kafak 作为 源表flinksql 消费kafka 二、Bulk Insert (离线批量导入)2.1、buck_insert 案例2.2.1、mysql jdbc2.2.2、hudi buck_insert2.2.3、buck insert 写入hudi 表 三、Index Bootstrap 全量接增量3.1、Index Bootstrap 案例 四、Changelog Mode4.1、基本特性4.2、可选配置参数4.3、案例 五、Append Mode5.1、Inline Clustering 只支持 Copy_On_Write 表5.2、Async Clustering5.3、Clustering Plan Strategy 六、Bucket Index6.1、WITH 参数6.2、与 state index 对比七、Rate Limit (限流) 使用版本 hudi-0.12.1 flink-1.15.2一、CDC 入湖 CDC(change data capture) 保证了完整数据变更目前主要有两种方式 1、直接使用 cdc-connector 对接 DB 的 binlog数据导入。优点是不依赖消息队列缺点是对 db server 造成压力。2、对接 cdc format 消费 kafka 数据导入 hudi优点是可扩展性强缺点是依赖 kafka。 接下来我们主要介绍 第二种方式 1.1、开启binlog 1.2、创建测试表 1.2.1、创建mysql表 create database hudi_test; use hudi_test;-- 建表 create table person(id int auto_increment primary key,name varchar(30),age int );1.2.2、将 binlog 日志 写入 kafka mysql-cdc 参考 https://chbxw.blog.csdn.net/article/details/119841434 使用cdc-2.x 1、使用 mysql-cdc 监听 binlog wget https://maven.aliyun.com/repository/central/com/ververica/flink-connector-mysql-cdc/2.0.0/flink-connector-mysql-cdc-2.0.0.jar Flink SQL create database hudi_test; use hudi_test;create table person_binlog (id bigint not null,name string,age int,primary key (id) not enforced ) with (connector mysql-cdc,hostname chb1,port 3306,username root,password 123456,database-name flinktest,table-name person );使用mysql-cdc 报错 NoClassDefFoundError: org/apache/flink/shaded/guava18/com/google/common/util/concurrent/Thr原因在于sql和非sql connector实现中对于shaded guava的处理不同 使用 flink-sql-connector-mysql-cdc 替代 flink-connector-mysql-cdc 而且2.0.0版本不行提升到2.2.1版本解决问题。 2、kafka 作为 sink表 -- 为了显示更清晰 Flink SQL SET sql-client.execution.result-mode tableau; [INFO] Session property has been set.Flink SQL SET execution.runtime-mode streaming; [INFO] Session property has been set.Flink SQL create table person_binlog_sink_kafka(id bigint not null,name string,age int not null,primary key (id) not enforced -- 主键 ) with (connector upsert-kafka -- kafka connector upsert-kafka,topic cdc_mysql_person_sink,properties.zookeeper.connect chb1:2181,properties.bootstrap.servers chb1:9092,key.format json,value.format json );3、写入sink 表 Flink SQL insert into person_binlog_sink_kafka select * from person_binlog;1.2.3、将 kakfa 数据写入hudi 1、kafak 作为 源表flinksql 消费kafka Flink SQL create table person_binlog_source_kafka (id bigint not null,name string,age int not null ) with (connector kafka,topic cdc_mysql_person_sink,properties.bootstrap.servers chb1:9092,format json,scan.startup.mode earliest-offset,properties.group.id testGroup );2、创建hudi目标表 Flink SQL create table person_binlog_sink_hudi (id bigint not null,name string,age int not null,primary key (id) not enforced -- 主键 ) with (connector hudi,path hdfs://chb3:8020/hudi_db/person_binlog_sink_hudi,table.type MERGE_ON_READ,write.option insert );3、将 kafka 中数据 写入 hudi Flink SQL insert into person_binlog_sink_hudi select * from person_binlog_source_kafka;插入20条数据产生332个小文件 小文件问题 二、Bulk Insert (离线批量导入) 如果数据源来源于其他系统可以使用批量导入数据功能快速的将存量数据导入hudi。 1、消除了序列化和数据合并。由于跳过了重复数据删除用户需要保证数据的唯一性。2、在批处理执行模式下效率更高。默认情况下批处理执行模式将输入记录按分区路径进行排序并写入Hudi避免频繁切换文件句柄导致写性能下降。 Flink SQL SET execution.runtime-mode streaming; // 默认是流模式 SET execution.checkpointing.interval 0; // 关闭checkpoint batch模式不支持checkpoint3、bulk_insert 的并行度由write.tasks指定。并行度会影响小文件的数量。理论上bulk_insert的并行度是桶的数量(特别是当每个桶写入到最大文件大小时它将切换到新的文件句柄。最后文件的数量 write.bucket_assign.tasks。 参数名是否必选默认值备注write.operationtrueupsert设置为bulk_insert 开启功能write.tasksfalse4bulk_insert 的并行度, 文件数量 write.bucket_assign.taskswrite.bulk_insert.shuffle_inputfalsetrue写入前是否根据输入字段(分区) shuffle。启用此选项将减少小文件的数量但可能存在数据倾斜的风险write.bulk_insert.sort_inputfalsetrue写入前是否根据输入字段(partition字段)对数据进行排序。当一个 write task写多个分区时启用该选项将减少小文件的数量。write.sort.memoryfalse128排序算子 可用的 managed memory 默认128 MB 2.1、buck_insert 案例 2.2.1、mysql jdbc 参考 https://chbxw.blog.csdn.net/article/details/119479967 Flink SQL create table person (id int not null,name string,age int not null,primary key (id) not enforced ) with (connector jdbc,url jdbc:mysql://chb1:3306/flinktest,username root,password 123456,table-name person );报错 java.lang.Integer cannot be cast to java.lang.Long, 由于 mysql 中 person的id 是 int 类型 转为 flink 对应的是 int, 但是在flink建表时 字段为 bigint.所以报错。 2.2.2、hudi buck_insert Flink SQL create table person_binlog_sink_hudi_buck (id int not null,name string,age int not null,primary key (id) not enforced -- 主键 ) with (connector hudi,path hdfs://chb3:8020/hudi_db/person_binlog_sink_hudi_buck,table.type MERGE_ON_READ,write.option bulk_insert -- 配置 buck_insert 模式 );2.2.3、buck insert 写入hudi 表 Flink SQL insert into person_binlog_sink_hudi_buck select * from person;一次性的。 三、Index Bootstrap 全量接增量 在上面使用 buck_insert 已经完成全量数据导入接下来 用户可以通过Index Bootstrap 功能实时插入增量数据保证数据不重复。 WITH 参数 参数名是否必选默认值备注index.bootstrap.enabledtruefalse此功能开启Hudi 表中剩余的记录将一次性加载到Flink状态index.partition.regexfalse*优化选择。设置正则表达式以过滤分区。默认情况下所有分区都加载到flink状态 使用方法 CREATE TABLE创建一条与Hudi表对应的语句。 注意这个 table.type 配置必须正确。设置index.bootstrap.enabled true来启用index bootstrap功能在flink-conf.yaml文件中设置Flink checkpoint的容错机制设置配置项execution.checkpointing.tolerable-failed-checkpoints n取决于Flink checkpoint执行时间等待直到第一个checkpoint成功表明index bootstrap完成。在index bootstrap完成后用户可以退出并保存savepoint(或直接使用外部 checkpoint)。重启任务并且设置index.bootstrap.enable 为 false。 注意 索引引导是一个阻塞过程因此在索引引导期间无法完成checkpoint。index bootstrap由输入数据触发。 用户需要确保每个分区中至少有一条记录。index bootstrap是并发执行的。用户可以在日志文件中通过finish loading the index under partition以及Load record form file观察index bootstrap的进度。第一个成功的checkpoint表明 index bootstrap已完成。 从checkpoint恢复时不需要再次加载索引。 3.1、Index Bootstrap 案例 Flink SQL create table person_binlog_sink_hudi_boot (id bigint not null,name string,age int not null,primary key (id) not enforced -- 主键 ) with (connector hudi,path hdfs://chb3:8020/hudi_db/person_binlog_sink_hudi_buck,table.type MERGE_ON_READ,index.bootstrap.enabledtrue );index bootstrap表接cdc表 Flink SQL insert into person_binlog_sink_hudi_boot select * from person_binlog;四、Changelog Mode 4.1、基本特性 Hudi可以保留消息的所有中间变化(I / -U / U / D)然后通过flink的状态计算消费从而拥有一个接近实时的数据仓库ETL管道(增量计算)。 Hudi MOR表以行的形式存储消息支持保留所有更改日志(格式级集成)。 所有的更新日志记录可以使用Flink流阅读器。 4.2、可选配置参数 参数名是否必选默认值备注changelog.enabledfalsefalse默认是关闭的即upsert语义只有合并的消息被确保保留中间的更改可以被合并。 设置为true以支持消费所有的更改 注意 不管格式是否存储了中间更改日志消息批(快照)读取仍然合并所有中间更改。 在设置changelog.enable为true时中间的变更也是 best effort: 异步压缩任务将更新日志记录合并到一条记录中因此如果流源不及时消费则压缩后只能读取每个key的合并记录。 解决方案是通过调整压缩策略比如压缩选项:compress.delta_commits和compression.delta_seconds为读取器保留一些缓冲时间。 4.3、案例 Flink SQL SET sql-client.execution.result-mode tableau; -- table tableau changelog [INFO] Session property has been set.Flink SQL SET execution.runtime-mode streaming; [INFO] Session property has been set.Flink SQL create table person2(id bigint not null,name string,age int not null,primary key (id) not enforced -- 主键 ) with (connector hudi,path hdfs://chb3:8020/hudi_db/person2,table.type MERGE_ON_READ,read.streaming.enabled true,read.streaming.check-interval 4,changelog.enabled true );-- 插入数据 insert into person2 values (1, chb, 23); insert into person2 values (1, chb, 24);select * from person2; 创建非changelog表 url 指向person2同一路径 Flink SQL create table person3(id bigint not null,name string,age int not null,primary key (id) not enforced -- 主键 ) with (connector hudi,path hdfs://chb3:8020/hudi_db/person2,table.type MERGE_ON_READ,read.streaming.enabled true,read.streaming.check-interval 4 ); 结果只有最新数据 报错 Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.mapred.FileInputFormat 拷贝 hadoop-mapreduce-client-core.jar 到 flink lib. 五、Append Mode 从 0.10 开始支持 对于 INSERT 模式 MOR 默认会 apply 小文件策略 会追加写 avro log 文件COW 每次直接写新的 parquet 文件没有小文件策略 Hudi 支持丰富的 Clustering 策略优化 INSERT 模式下的小文件问题。 5.1、Inline Clustering 只支持 Copy_On_Write 表 参数名是否必选默认值备注write.insert.clusterfalsefalse是否在写入时合并小文件COW 表默认 insert 写不合并小文件开启该参数后每次写入会优先合并之前的小文件不会去重吞吐会受影响 用的比较少建议使用 Async Clustering 5.2、Async Clustering ​ 从 0.12 开始支持 WITH 参数 名称Required默认值说明clustering.schedule.enabledfalsefalse是否在写入时定时异步调度 clustering plan默认关闭clustering.delta_commitsfalse4调度 clsutering plan 的间隔 commitsclustering.schedule.enabled 为 true 时生效clustering.async.enabledfalsefalse是否异步执行 clustering plan默认关闭clustering.tasksfalse4Clustering task 执行并发clustering.plan.strategy.target.file.max.bytesfalse1024 * 1024 * 1024Clustering 单文件目标大小默认 1GBclustering.plan.strategy.small.file.limitfalse600小于该大小的文件才会参与 clustering默认600MBclustering.plan.strategy.sort.columnsfalseN/A支持指定特殊的排序字段clustering.plan.partition.filter.modefalseNONE支持NONE不做限制RECENT_DAYS按时间天回溯SELECTED_PARTITIONS指定固定的 partitionclustering.plan.strategy.daybased.lookback.partitionsfalse2RECENT_DAYS 生效默认 2 天 5.3、Clustering Plan Strategy ​ 支持定制化的 clustering 策略。 名称Required默认值说明clustering.plan.partition.filter.modeFALSENONE支持· NONE不做限制· RECENT_DAYS按时间天回溯· SELECTED_PARTITIONS指定固定的 partitionclustering.plan.strategy.daybased.lookback.partitionsFALSE2RECENT_DAYS 生效默认 2 天clustering.plan.strategy.cluster.begin.partitionFALSEN/ASELECTED_PARTITIONS 生效指定开始 partition(inclusive)clustering.plan.strategy.cluster.end.partitionFALSEN/ASELECTED_PARTITIONS 生效指定结束 partition(incluseve)clustering.plan.strategy.partition.regex.patternFALSEN/A正则表达式过滤 partitionsclustering.plan.strategy.partition.selectedFALSEN/A显示指定目标 partitions支持逗号 , 分割多个 partition 六、Bucket Index 默认的 flink 流式写入使用 state 存储索引信息primary key 到 fileId 的映射关系。当数据量比较大的时候state的存储开销可能成为瓶颈bucket 索引通过固定的 hash 策略将相同 key 的数据分配到同一个 fileGroup 中避免了索引的存储和查询开销。 6.1、WITH 参数 名称Required默认值说明index.typefalseFLINK_STATE设置 BUCKET 开启 Bucket 索引功能hoodie.bucket.index.hash.fieldfalse主键可以设置成主键的子集hoodie.bucket.index.num.bucketsfalse4默认每个 partition 的 bucket 数当前设置后则不可再变更。 6.2、与 state index 对比 1bucket index 没有 state 的存储计算开销性能较好2bucket index 无法扩容 bucketsstate index 则可以依据文件的大小动态扩容3bucket index 不支持跨 partition 的变更(如果输入是 cdc 流则没有这个限制)state index 没有限制 七、Rate Limit (限流) 有许多用户将完整的历史数据集与实时增量数据一起放到消息队列中的用例。然后使用 flink 将队列中的数据从最早的偏移量消费到hudi中。 消费历史数据集具有以下特点: 1)瞬时吞吐量巨大2)严重无序(随机写分区)。 这将导致写入性能下降和吞吐量故障。对于这种情况可以打开速度限制参数以确保流的平滑写入。 名称Required默认值说明write.rate.limitfalse0默认禁止限流 参考 https://hudi.apache.org/cn/docs/hoodie_deltastreamer/#flink-ingestion
http://www.dnsts.com.cn/news/87160.html

相关文章:

  • 中国化工建设网站做网站能传电影网站多少钱
  • 部门网站建设怎么做好看大方的企业网站源码.net
  • 河北网诚网站建设成都系统软件定制开发
  • 有哪些网站可以做毕业设计外贸网站免费模板
  • 网站建设策划书范文6篇东莞品牌整合营销
  • wordpress改后台ip网站关键词seo费用
  • 口碑营销网站wordpress 自带评论
  • 南京市建设档案馆网站网站 专题建设服务
  • wordpress建娱乐站团支书登录智慧团建网站
  • 郑州专业做微信网站263企业邮箱网页登录入口
  • 网站建设入门基础中关村在线主页
  • 做哪个外贸网站不用交费搜狗网页游戏大厅
  • 域名怎么绑定自己网站淘客推广平台排名
  • 广州市网站开发四川建设集团有限公司网站
  • 制作网站公司定价广州最大网站建设
  • 中国建设人才服务信息网是正规网站seo和sem的区别与联系
  • 南京网站开发公司驻马店百牛网站建设
  • 网站设计公司杭州中国企业排名
  • 公司做网站好不好网站需要实名认证
  • 教做面点的网站济宁网站建设电话
  • 建手机网站软件百度站长工具数据提交
  • 网站采集转载wordpress rss教程
  • app制作网站制作完免费行情软件app网站大全
  • 网站设计的企业云端商城买流量
  • xampp做的网站能搜索吗网上整合营销
  • 网站维护平台龙岩网站建设要多少费用
  • vs连接数据库做网站门户网站开发案例
  • 网站开发 手机 电脑南京口碑最好的装修公司
  • 上海黄金网站设计uniapp商城app整套源码
  • 网站制作哪些分类如何优化wordpress