福州哪家企业网站建设设计最高端,电商网站开发平台用什么人开发,怎么进入自己网站主机地址,网页版微信文件存储路径文章目录 一. 滚动策略#xff1a;sink后文件切分(暂不关注)1. 切分分区目录下的文件2. 小文件合并 二. 分区提交1. 分区提交触发器 #xff08;什么时候创建分区#xff09;1.1. 逻辑说明1.2. 举例说明 2. 分区时间提取器 (用于partition-time情况下partition commit策略)2… 文章目录 一. 滚动策略sink后文件切分(暂不关注)1. 切分分区目录下的文件2. 小文件合并 二. 分区提交1. 分区提交触发器 什么时候创建分区1.1. 逻辑说明1.2. 举例说明 2. 分区时间提取器 (用于partition-time情况下partition commit策略)2.1. 逻辑说明2.2. 举例说明 3. 分区提交策略 分区创建后怎么告知下游或系统3.1. 逻辑说明3.2. 举例说明 4. Sink Parallelism 三. 完整示例1. 官网partition-time2. 实际测试kafka-hive 本文概述 flink支持动态写数据到文件系统提供了分块写数据以及动态分区接下来看flink是如何分块写数据以及如何配置动态分区的建立。 文件系统连接器支持流写入是基于 Flink 的 文件系统 写入文件的。
我们可以直接编写 SQL将流数据插入到非分区表。 如果是分区表可以配置分区操作相关的属性。具体参考分区提交。 一. 滚动策略sink后文件切分(暂不关注)
1. 切分分区目录下的文件
分区目录下的数据被分割到 part 文件中。每个分区对应的 sink 的 subtask 都至少会为该分区生成一个 part 文件。 该策略基于大小和指定的文件可被打开的最大 timeout 时长来滚动 part 文件。
键默认值类型描述sink.rolling-policy.file-size128MBMemorySize当part达到设定值时文件开始滚动。sink.rolling-policy.rollover-interval30 minDuration滚动前part 文件处于打开状态的最大时长默认值30分钟以避免产生大量小文件。 检查频率是由 sink.rolling-policy.check-interval 属性控制的。sink.rolling-policy.check-interval1 minDuration周期检查文件打开时长。
根据描述默认情况下Flink采取了如上默认值的滚动策略。 todocheckpoint 也会影响part文件的生成 对于 bulk formats 数据 (parquet、orc、avro)滚动策略与 checkpoint 间隔pending 状态的文件会在下个 checkpoint 完成控制了 part 文件的大小和个数。 2. 小文件合并 todo: checkpoint的间隔会影响文件产生的效率 file sink 支持文件合并允许应用程序使用较小的 checkpoint 间隔但不产生大量小文件。
键默认值类型描述auto-compactionfalseBoolean在流式 sink 中自动合并功能。数据首先会被写入临时文件。当 checkpoint 完成后该检查点产生的临时文件会被合并。这些临时文件在合并前不可见。compaction.file-size(无)MemorySize合并目标文件大小默认值为滚动文件大小。
如果启用文件合并功能会根据目标文件大小将多个小文件合并成大文件。
在生产环境中使用文件合并功能时需要注意 只有 checkpoint 内部的文件才会被合并至少生成的文件个数与 checkpoint 个数相同。合并前文件是不可见的那么文件的可见时间是checkpoint 间隔时长 合并时长。如果合并时间过长将导致反压延长 checkpoint 所需时间。 二. 分区提交
sink动态写分区包括如下两个操作 Trigger-提交分区的时机通过什么来识别分区watermark或处理时间什么时候提交分区Policy-提交分区后通知下游写_SUCCESShive metadata 中新增分区或自定义合并小文件等。 注意 分区提交仅在(什么是)动态分区插入模式下才有效。 1. 分区提交触发器 什么时候创建分区
1.1. 逻辑说明
Flink 提供了两种类型分区提交触发器 第一种根据分区的处理时间没有根据字段吗。基于分区创建时间这里指的是什么和当前系统时间来触发分区。 这种触发器更具通用性但不是很精确。例如数据延迟或故障将导致过早提交分区。第二种根据从分区字段提取的时间以及 watermark。 这需要 job 支持 watermark 生成分区是根据时间来切割的例如按小时或按天分区。 感知分区的几种情况 不管分区数据是否完整而只想让下游尽快感知到分区(不推荐) ‘sink.partition-commit.trigger’‘process-time’ (默认值) ‘sink.partition-commit.delay’‘0s’ (默认值) 一旦数据进入分区将立即提交分区。注意这个分区可能会被提交多次提交多次产生的影响ing浪费多余的资源。 如果想让下游只有在分区数据完整时才感知到分区并且 job 中有 watermark 生成也能从分区字段的值中提取到时间 ‘sink.partition-commit.trigger’‘partition-time’ ‘sink.partition-commit.delay’‘1h’ (根据分区类型指定如果是按小时分区可配置为 ‘1h’) 该方式是最精准地提交分区的方式尽力确保提交分区的数据完整。 如果想让下游系统只有在数据完整时才感知到分区但是没有 watermark或者无法从分区字段的值中提取时间 ‘sink.partition-commit.trigger’‘process-time’ (默认值) ‘sink.partition-commit.delay’‘1h’ (根据分区类型指定如果是按小时分区可配置为 ‘1h’) 该方式尽量精确地提交分区但是数据延迟或者故障将导致过早提交分区。 延迟数据的处理延迟的记录会被写入到已经提交的对应分区中且会再次触发该分区的提交。 如下参数
确定何时提交分区这里只关注process-time trigger下的两个参数 sink.partition-commit.trigger: 默认值process-time 描述 基于机器时间 ‘process-time’不需要分区时间提取器也不需要 watermark 生成器。一旦 “当前系统时间” 超过了 分区创建系统时间(比如flink消费到一条数据触发了分区创建操作对应的时间) 和 sink.partition-commit.delay 之和立即提交分区。基于提取的分区时间‘partition-time’。需要 watermark 生成。一旦 watermark 超过了 “分区创建系统时间” 和 ‘sink.partition-commit.delay’ 之和立即提交分区。 sink.partition-commit.delay 默认值0s 描述 该延迟时间之前分区不会被提交。如果是按天分区可以设置为 ‘1 d’如果是按小时分区应设置为 ‘1 h’当然也可以设置分钟例如 30min。 1.2. 举例说明
--默认值可以不配置
sink.partition-commit.triggerprocess-time
--当来第一条数据时记录为时刻1先创建hive分区文件夹当时间超过 时刻11h 时分区提交
--分区未提交时文件为.data开头的临时文件分区提交时会从cp中同步数据到临时文件中并命名为正式文件。
sink.partition-commit.delay1h 2. 分区时间提取器 (用于partition-time情况下partition commit策略)
2.1. 逻辑说明
时间提取器从分区字段值中提取时间。 partition.time-extractor.kind 默认值default 描述从分区字段中提取时间的时间提取器。 支持default 和 custom。在默认情况下可以配置 timestamp-pattern/formatter。对于custom应指定提取器类。 partition.time-extractor.timestamp-pattern 默认值无 描述分区格式的数据拼接。 默认支持第一个字段按 ‘yyyy-MM-dd hh:mm:ss’ 这种模式提取。 如果需要从一个分区字段 ‘dt’ 提取 timestamp可以配置成‘$dt’。如果需要从多个分区字段中提取分区比如 ‘year’、‘month’、‘day’ 和 ‘hour’ 提取 timestamp可以配置成: $year-$month-$day $hour:00:00。如果需要从两个分区字段 dt 和 hour 提取 timestamp可以配置成$dt$hour:00:00。 partition.time-extractor.timestamp-formatter 默认值yyyy-MM-dd HH:mm:ss 描述分区格式的规定。具体数值由partition.time-extractor.timestamp-pattern设置。默认yyyy-MM-dd HH:mm:ss。 2.2. 举例说明
-- year、month 和 day三个字段组成分区
-- 可不填default为默认值即从分区字段中获取
partition.time-extractor.kind default
--具体动态分区名怎么由字段拼接
partition.time-extractor.timestamp-pattern $year$month$day
--分区名格式
partition.time-extractor.timestamp-formatter yyyyMMdd3. 分区提交策略 分区创建后怎么告知下游或系统
3.1. 逻辑说明
分区提交策略定义了提交分区时的具体操作。 metadata 存储metastore仅 hive 表支持该策略该策略下文件系统通过目录层次结构来管理分区。(todo:通过hive更新表元数据?)success 文件该策略下会在分区对应的目录下生成一个名为 _SUCCESS 的空文件。 sink.partition-commit.policy.kind 默认值无 描述分区提交策略通知下游某个分区已经写完毕可以被读取了。 metastore向 metadata 增加分区。仅 hive 支持 metastore 策略文件系统通过目录结构管理分区success-file在目录中增加 ‘_success’ 文件 上述两个策略可以同时定‘metastore,success-file’。custom通过指定的类来创建提交策略。 支持同时指定多个提交略‘metastore,success-file’。 sink.partition-commit.success-file.name 默认值: _SUCCESS 描述使用success-file 分区提交策略时的文件名默认值是 ‘_SUCCESS’。 sink.partition-commit.policy.class 默认值无 描述 custom下才用 实现PartitionCommitPolicy 接口的分区提交策略类。只有在 custom 提交策略下才使用该类。 可以自定义提交策略如下 public class AnalysisCommitPolicy implements PartitionCommitPolicy {private HiveShell hiveShell;Overridepublic void commit(Context context) throws Exception {if (hiveShell null) {hiveShell createHiveShell(context.catalogName());}hiveShell.execute(String.format(ALTER TABLE %s ADD IF NOT EXISTS PARTITION (%s %s) location %s,context.tableName(),context.partitionKeys().get(0),context.partitionValues().get(0),context.partitionPath()));hiveShell.execute(String.format(ANALYZE TABLE %s PARTITION (%s %s) COMPUTE STATISTICS FOR COLUMNS,context.tableName(),context.partitionKeys().get(0),context.partitionValues().get(0)));}
}todo如上通过hive语句来添加分区 3.2. 举例说明 sink.partition-commit.policy.kindsuccess-file
sink.partition-commit.success-file.name_SUCCESS_gao 4. Sink Parallelism
在流模式和批模式下向外部文件系统包括 hive写文件时的 parallelism 可以通过相应的 table 配置项指定。 默认情况下该 sink parallelism 与上游 chained operator 的 parallelism 一样。 比如kafka作为source源分区为5设置并行度为5在同一个chained中写分区时hive sink的并行度自动设为5。 当配置了跟上游的 chained operator 不一样的 parallelism 时写文件和合并文件的算子如果开启的话会使用指定的 sink parallelism。
键默认值类型描述sink.parallelism(无)Integer将文件写入外部文件系统的 parallelism。这个值应该大于0否则抛异常。
注意 目前当且仅当上游的 changelog 模式为 INSERT-ONLY 时才支持配置 sink parallelism。否则程序将会抛出异常。 三. 完整示例
1. 官网partition-time
以下示例展示了如何使用文件系统连接器编写流式查询语句将数据从 Kafka 写入文件系统然后运行批式查询语句读取数据。 CREATE TABLE kafka_table (user_id STRING,order_amount DOUBLE,log_ts TIMESTAMP(3),WATERMARK FOR log_ts AS log_ts - INTERVAL 5 SECOND
) WITH (...);CREATE TABLE fs_table (user_id STRING,order_amount DOUBLE,dt STRING,hour STRING
) PARTITIONED BY (dt, hour) WITH (connectorfilesystem,path...,formatparquet,sink.partition-commit.delay1 h,sink.partition-commit.policy.kindsuccess-file
);-- 流式 sql插入文件系统表
INSERT INTO fs_table
SELECT user_id, order_amount, DATE_FORMAT(log_ts, yyyy-MM-dd),DATE_FORMAT(log_ts, HH)
FROM kafka_table;-- 批式 sql使用分区修剪进行选择
SELECT * FROM fs_table WHERE dt2020-05-20 and hour12;如果 watermark 被定义在 TIMESTAMP_LTZ 类型的列上并且使用 partition-time 模式进行提交sink.partition-commit.watermark-time-zone 这个属性需要设置成会话时区否则分区提交可能会延迟若干个小时。 CREATE TABLE kafka_table (user_id STRING,order_amount DOUBLE,ts BIGINT, -- 以毫秒为单位的时间ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL 5 SECOND -- 在 TIMESTAMP_LTZ 列上定义 watermark
) WITH (...);CREATE TABLE fs_table (user_id STRING,order_amount DOUBLE,dt STRING,hour STRING
) PARTITIONED BY (dt, hour) WITH (connectorfilesystem,path...,formatparquet,partition.time-extractor.timestamp-pattern$dt $hour:00:00,sink.partition-commit.delay1 h,sink.partition-commit.triggerpartition-time,sink.partition-commit.watermark-time-zoneAsia/Shanghai, -- 假设用户配置的时区为 Asia/Shanghaisink.partition-commit.policy.kindsuccess-file
);-- 流式 sql插入文件系统表
INSERT INTO fs_table
SELECT user_id, order_amount, DATE_FORMAT(ts_ltz, yyyy-MM-dd),DATE_FORMAT(ts_ltz, HH)
FROM kafka_table;-- 批式 sql使用分区修剪进行选择
SELECT * FROM fs_table WHERE dt2020-05-20 and hour12;2. 实际测试kafka-hive
-- SET table.sql-dialecthive;
CREATE CATALOG myhive WITH (type hive,default-database data_base,hive-conf-dir /usr/bin/hadoop/software/hive/conf
);CREATE TABLE source_kafka (pv string,uv string,p_day_id string
) WITH (connector kafka-x,topic hive_kafka,properties.bootstrap.servers xxx:9092,properties.group.id luna_g,scan.startup.mode earliest-offset,json.timestamp-format.standard SQL,json.ignore-parse-errors true,format json,scan.parallelism 1);-- 通过sql hint来指定表的行为
-- 1. 分区名称策略
-- partition.time-extractor.timestamp-pattern$p_day_id 分区数据组成
-- partition.time-extractor.timestamp-formatter yyyyMMdd :分区格式-- 2. 分区提交策略
-- sink.partition-commit.delay5min分区提交延迟分区时间 延迟 与 process_time做对比--3. 通知下游策略
-- sink.partition-commit.policy.kindmetastore,success-file通知下游策略
-- sink.partition-commit.success-file.name_SUCCESS_gao 成功文件名称insert into myhive.logsget.dws_thjl_pv_uv_d_xky_bak /* OPTIONS(partition.time-extractor.timestamp-pattern$p_day_id:00:00,sink.partition-commit.policy.kindmetastore,success-file,sink.partition-commit.success-file.name_SUCCESS_gao111) */select * from source_kafka;