曲靖网站设计公司,北京微信网站开发报价,建设部 招投标网站,百度线上推广在高频小批量写入场景下#xff0c;传统的导入方式存在以下问题#xff1a;
每个导入都会创建一个独立的事务#xff0c;都需要经过 FE 解析 SQL 和生成执行计划#xff0c;影响整体性能每个导入都会生成一个新的版本#xff0c;导致版本数快速增长#xff0c;增加了后台…在高频小批量写入场景下传统的导入方式存在以下问题
每个导入都会创建一个独立的事务都需要经过 FE 解析 SQL 和生成执行计划影响整体性能每个导入都会生成一个新的版本导致版本数快速增长增加了后台compaction的压力
为了解决这些问题Doris 引入了 Group Commit 机制。Group Commit 不是一种新的导入方式而是对现有导入方式的优化扩展主要针对
INSERT INTO tbl VALUES(...) 语句Stream Load 导入
通过将多个小批量导入在后台合并成一个大的事务提交显著提升了高并发小批量写入的性能。同时Group Commit 与 PreparedStatement 结合使用可以获得更高的性能提升。
Group Commit 模式
Group Commit 写入有三种模式分别是 关闭模式off_mode 不开启 Group Commit。 同步模式sync_mode Doris 根据负载和表的 group_commit_interval属性将多个导入在一个事务提交事务提交后导入返回。这适用于高并发写入场景且在导入完成后要求数据立即可见。 异步模式async_mode Doris 首先将数据写入 WAL (Write Ahead Log)然后导入立即返回。Doris 会根据负载和表的group_commit_interval属性异步提交数据提交之后数据可见。为了防止 WAL 占用较大的磁盘空间单次导入数据量较大时会自动切换为sync_mode。这适用于写入延迟敏感以及高频写入的场景。 WAL的数量可以通过FE http接口查看具体可见这里也可以在BE的metrics中搜索关键词wal查看。
Group Commit 使用方式
假如表的结构为
CREATE TABLE dt (id int(11) NOT NULL,name varchar(50) NULL,score int(11) NULL
) ENGINEOLAP
DUPLICATE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES (replication_num 1
);使用 JDBC
当用户使用 JDBC insert into values方式写入时为了减少 SQL 解析和生成规划的开销我们在 FE 端支持了 MySQL 协议的 PreparedStatement 特性。当使用 PreparedStatement 时SQL 和其导入规划将被缓存到 Session 级别的内存缓存中后续的导入直接使用缓存对象降低了 FE 的 CPU 压力。下面是在 JDBC 中使用 PreparedStatement 的例子
1. 设置 JDBC URL 并在 Server 端开启 Prepared Statement
url jdbc:mysql://127.0.0.1:9030/db?useServerPrepStmtstrueuseLocalSessionStatetruerewriteBatchedStatementstruecachePrepStmtstrueprepStmtCacheSqlLimit99999prepStmtCacheSize5002. 配置 group_commit session 变量有如下两种方式
通过 JDBC url 设置增加sessionVariablesgroup_commitasync_mode
url jdbc:mysql://127.0.0.1:9030/db?useServerPrepStmtstrueuseLocalSessionStatetruerewriteBatchedStatementstruecachePrepStmtstrueprepStmtCacheSqlLimit99999prepStmtCacheSize500sessionVariablesgroup_commitasync_mode通过执行 SQL 设置
try (Statement statement conn.createStatement()) {statement.execute(SET group_commit async_mode;);
}3. 使用 PreparedStatement
private static final String JDBC_DRIVER com.mysql.jdbc.Driver;
private static final String URL_PATTERN jdbc:mysql://%s:%d/%s?useServerPrepStmtstrueuseLocalSessionStatetruerewriteBatchedStatementstruecachePrepStmtstrueprepStmtCacheSqlLimit99999prepStmtCacheSize50$sessionVariablesgroup_commitasync_mode;
private static final String HOST 127.0.0.1;
private static final int PORT 9087;
private static final String DB db;
private static final String TBL dt;
private static final String USER root;
private static final String PASSWD ;
private static final int INSERT_BATCH_SIZE 10;private static void groupCommitInsertBatch() throws Exception {Class.forName(JDBC_DRIVER);// add rewriteBatchedStatementstrue and cachePrepStmtstrue in JDBC url// set session variables by sessionVariablesgroup_commitasync_mode in JDBC urltry (Connection conn DriverManager.getConnection(String.format(URL_PATTERN, HOST, PORT, DB), USER, PASSWD)) {String query insert into TBL values(?, ?, ?);try (PreparedStatement stmt conn.prepareStatement(query)) {for (int j 0; j 5; j) {// 10 rows per insertfor (int i 0; i INSERT_BATCH_SIZE; i) {stmt.setInt(1, i);stmt.setString(2, name i);stmt.setInt(3, i 10);stmt.addBatch();}int[] result stmt.executeBatch();}}} catch (Exception e) {e.printStackTrace();}
}注意由于高频的insert into语句会打印大量的audit log对最终性能有一定影响默认关闭了打印prepared语句的audit log。可以通过设置session variable的方式控制是否打印prepared语句的audit log。
# 配置 session 变量开启打印parpared语句的audit log, 默认为false即关闭打印parpared语句的audit log。
set enable_prepared_stmt_audit_logtrue;关于 JDBC 的更多用法参考使用 Insert 方式同步数据。
使用Golang进行Group Commit
Golang的prepared语句支持有限所以我们可以通过手动客户端攒批的方式提高Group Commit的性能以下为一个示例程序。
package mainimport (database/sqlfmtmath/randstringssyncsync/atomictime_ github.com/go-sql-driver/mysql
)const (host 127.0.0.1port 9038db testuser rootpassword table async_lineitem
)var (threadCount 20batchSize 100
)var totalInsertedRows int64
var rowsInsertedLastSecond int64func main() {dbDSN : fmt.Sprintf(%s:%stcp(%s:%d)/%s?parseTimetrue, user, password, host, port, db)db, err : sql.Open(mysql, dbDSN)if err ! nil {fmt.Printf(Error opening database: %s\n, err)return}defer db.Close()var wg sync.WaitGroupfor i : 0; i threadCount; i {wg.Add(1)go func() {defer wg.Done()groupCommitInsertBatch(db)}()}go logInsertStatistics()wg.Wait()
}func groupCommitInsertBatch(db *sql.DB) {for {valueStrings : make([]string, 0, batchSize)valueArgs : make([]interface{}, 0, batchSize*16)for i : 0; i batchSize; i {valueStrings append(valueStrings, (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?))valueArgs append(valueArgs, rand.Intn(1000))valueArgs append(valueArgs, rand.Intn(1000))valueArgs append(valueArgs, rand.Intn(1000))valueArgs append(valueArgs, rand.Intn(1000))valueArgs append(valueArgs, sql.NullFloat64{Float64: 1.0, Valid: true})valueArgs append(valueArgs, sql.NullFloat64{Float64: 1.0, Valid: true})valueArgs append(valueArgs, sql.NullFloat64{Float64: 1.0, Valid: true})valueArgs append(valueArgs, sql.NullFloat64{Float64: 1.0, Valid: true})valueArgs append(valueArgs, N)valueArgs append(valueArgs, O)valueArgs append(valueArgs, time.Now())valueArgs append(valueArgs, time.Now())valueArgs append(valueArgs, time.Now())valueArgs append(valueArgs, DELIVER IN PERSON)valueArgs append(valueArgs, SHIP)valueArgs append(valueArgs, N/A)}stmt : fmt.Sprintf(INSERT INTO %s VALUES %s,table, strings.Join(valueStrings, ,))_, err : db.Exec(stmt, valueArgs...)if err ! nil {fmt.Printf(Error executing batch: %s\n, err)return}atomic.AddInt64(rowsInsertedLastSecond, int64(batchSize))atomic.AddInt64(totalInsertedRows, int64(batchSize))}
}func logInsertStatistics() {for {time.Sleep(1 * time.Second)fmt.Printf(Total inserted rows: %d\n, totalInsertedRows)fmt.Printf(Rows inserted in the last second: %d\n, rowsInsertedLastSecond)rowsInsertedLastSecond 0}
} INSERT INTO VALUES
异步模式
# 配置 session 变量开启 group commit (默认为 off_mode),开启异步模式
mysql set group_commit async_mode;# 这里返回的 label 是 group_commit 开头的可以区分出是否使用了 group commit
mysql insert into dt values(1, Bob, 90), (2, Alice, 99);
Query OK, 2 rows affected (0.05 sec)
{label:group_commit_a145ce07f1c972fc-bd2c54597052a9ad, status:PREPARE, txnId:181508}# 可以看出这个 label, txn_id 和上一个相同说明是攒到了同一个导入任务中
mysql insert into dt(id, name) values(3, John);
Query OK, 1 row affected (0.01 sec)
{label:group_commit_a145ce07f1c972fc-bd2c54597052a9ad, status:PREPARE, txnId:181508}# 不能立刻查询到
mysql select * from dt;
Empty set (0.01 sec)# 10 秒后可以查询到可以通过表属性 group_commit_interval 控制数据可见延迟。
mysql select * from dt;
--------------------
| id | name | score |
--------------------
| 1 | Bob | 90 |
| 2 | Alice | 99 |
| 3 | John | NULL |
--------------------
3 rows in set (0.02 sec)同步模式
# 配置 session 变量开启 group commit (默认为 off_mode),开启同步模式
mysql set group_commit sync_mode;# 这里返回的 label 是 group_commit 开头的可以区分出是否谁用了 group commit导入耗时至少是表属性 group_commit_interval。
mysql insert into dt values(4, Bob, 90), (5, Alice, 99);
Query OK, 2 rows affected (10.06 sec)
{label:group_commit_d84ab96c09b60587_ec455a33cb0e9e87, status:PREPARE, txnId:3007, query_id:fc6b94085d704a94-a69bfc9a202e66e2}# 数据可以立刻读出
mysql select * from dt;
--------------------
| id | name | score |
--------------------
| 1 | Bob | 90 |
| 2 | Alice | 99 |
| 3 | John | NULL |
| 4 | Bob | 90 |
| 5 | Alice | 99 |
--------------------
5 rows in set (0.03 sec)关闭模式
mysql set group_commit off_mode;Stream Load
假如data.csv的内容为
6,Amy,60
7,Ross,98异步模式
# 导入时在 header 中增加group_commit:async_mode配置curl --location-trusted -u {user}:{passwd} -T data.csv -H group_commit:async_mode -H column_separator:, http://{fe_host}:{http_port}/api/db/dt/_stream_load
{TxnId: 7009,Label: group_commit_c84d2099208436ab_96e33fda01eddba8,Comment: ,GroupCommit: true,Status: Success,Message: OK,NumberTotalRows: 2,NumberLoadedRows: 2,NumberFilteredRows: 0,NumberUnselectedRows: 0,LoadBytes: 19,LoadTimeMs: 35,StreamLoadPutTimeMs: 5,ReadDataTimeMs: 0,WriteDataTimeMs: 26
}# 返回的 GroupCommit 为 true说明进入了 group commit 的流程
# 返回的 Label 是 group_commit 开头的是真正消费数据的导入关联的 label同步模式
# 导入时在 header 中增加group_commit:sync_mode配置curl --location-trusted -u {user}:{passwd} -T data.csv -H group_commit:sync_mode -H column_separator:, http://{fe_host}:{http_port}/api/db/dt/_stream_load
{TxnId: 3009,Label: group_commit_d941bf17f6efcc80_ccf4afdde9881293,Comment: ,GroupCommit: true,Status: Success,Message: OK,NumberTotalRows: 2,NumberLoadedRows: 2,NumberFilteredRows: 0,NumberUnselectedRows: 0,LoadBytes: 19,LoadTimeMs: 10044,StreamLoadPutTimeMs: 4,ReadDataTimeMs: 0,WriteDataTimeMs: 10038
}# 返回的 GroupCommit 为 true说明进入了 group commit 的流程
# 返回的 Label 是 group_commit 开头的是真正消费数据的导入关联的 label关于 Stream Load 使用的更多详细语法及最佳实践请参阅 Stream Load。
自动提交条件
当满足时间间隔 (默认为 10 秒) 或数据量 (默认为 64 MB) 其中一个条件时会自动提交数据。这两个参数需要配合使用建议根据实际场景进行调优。
修改提交间隔
默认提交间隔为 10 秒用户可以通过修改表的配置调整
# 修改提交间隔为 2 秒
ALTER TABLE dt SET (group_commit_interval_ms 2000);参数调整建议: 较短的间隔(如2秒): 优点数据可见性延迟更低适合对实时性要求较高的场景缺点提交次数增多版本数增长更快后台compaction压力更大 较长的间隔(如30秒): 优点提交批次更大版本数增长更慢系统开销更小缺点数据可见性延迟更高
建议根据业务对数据可见性延迟的容忍度来设置如果系统压力大可以适当增加间隔。
修改提交数据量
Group Commit 的默认提交数据量为 64 MB用户可以通过修改表的配置调整
# 修改提交数据量为 128MB
ALTER TABLE dt SET (group_commit_data_bytes 134217728);参数调整建议: 较小的阈值(如32MB): 优点内存占用更少适合资源受限的环境缺点提交批次较小吞吐量可能受限 较大的阈值(如256MB): 优点批量提交效率更高系统吞吐量更大缺点占用更多内存
建议根据系统内存资源和数据可靠性要求来权衡。如果内存充足且追求更高吞吐可以适当增加到128MB或更大。
相关系统配置
BE 配置 group_commit_wal_path 描述group commit 存放 WAL 文件的目录 默认值默认在用户配置的storage_root_path的各个目录下创建一个名为wal的目录。配置示例 group_commit_wal_path/data1/storage/wal;/data2/storage/wal;/data3/storage/walgroup_commit_memory_rows_for_max_filter_ratio 描述当 group commit 导入的总行数不高于该值max_filter_ratio 正常工作否则不工作 默认值10000
使用限制 Group Commit 限制条件 INSERT INTO VALUES 语句在以下情况下会退化为非 Group Commit 方式 使用事务写入 (Begin; INSERT INTO VALUES; COMMIT)指定 Label (INSERT INTO dt WITH LABEL {label} VALUES)VALUES 中包含表达式 (INSERT INTO dt VALUES (1 100))列更新写入表不支持轻量级模式更改 Stream Load 在以下情况下会退化为非 Group Commit 方式 使用两阶段提交指定 Label (-H label:my_label)列更新写入表不支持轻量级模式更改 Unique 模型 Group Commit 不保证提交顺序建议使用 Sequence 列来保证数据一致性。 max_filter_ratio 支持 默认导入中filter_ratio 通过失败行数和总行数计算。Group Commit 模式下max_filter_ratio 在总行数不超过 group_commit_memory_rows_for_max_filter_ratio 时有效。 WAL 限制 async_mode 写入会将数据写入 WAL成功后删除失败时通过 WAL 恢复。WAL 文件是单副本存储的如果对应磁盘损坏或文件误删可能导致数据丢失。下线 BE 节点时使用 DECOMMISSION 命令以防数据丢失。async_mode 在以下情况下切换为 sync_mode 导入数据量过大超过 WAL 单目录 80% 空间不知道数据量的 chunked stream load磁盘可用空间不足 重量级 Schema Change 时拒绝 Group Commit 写入客户端需重试。
性能
我们分别测试了使用Stream Load和JDBC在高并发小数据量场景下group commit(使用async mode) 的写入性能。
Stream Load 日志场景测试
机器配置 1 台 FE阿里云 8 核 CPU、16GB 内存、1 块 100GB ESSD PL1 云磁盘 3 台 BE阿里云 16 核 CPU、64GB 内存、1 块 1TB ESSD PL1 云磁盘 1 台测试客户端阿里云 16 核 CPU、64GB 内存、1 块 100GB ESSD PL1 云磁盘 测试版本为Doris-2.1.5
数据集
httplogs 数据集总共 31GB、2.47 亿条
测试工具
doris-streamloader
测试方法
对比 非 group_commit 和 group_commit 的 async_mode 模式下设置不同的单并发数据量和并发数导入 247249096 行数据
测试结果
导入方式单并发数据量并发数耗时 (秒)导入速率 (行/秒)导入吞吐 (MB/秒)group_commit10 KB10330674,7879.8group_commit10 KB30326475,75010.0group_commit100 KB10424582,44776.7group_commit100 KB30366675,54389.0group_commit500 KB101871,318,661173.7group_commit500 KB301831,351,087178.0group_commit1 MB101781,385,148182.5group_commit1 MB301781,385,148182.5group_commit10 MB101771,396,887184.0非group_commit1 MB10282487,53611.5非group_commit10 MB10450549,44268.9非group_commit10 MB301771,396,887184.0
在上面的group_commit测试中BE 的 CPU 使用率在 10-40% 之间。
可以看出group_commit 模式在小数据量并发导入的场景下能有效的提升导入性能同时减少版本数降低系统合并数据的压力。
JDBC
机器配置 1 台 FE阿里云 8 核 CPU、16GB 内存、1 块 100GB ESSD PL1 云磁盘 1 台 BE阿里云 16 核 CPU、64GB 内存、1 块 500GB ESSD PL1 云磁盘 1 台测试客户端阿里云 16 核 CPU、64GB 内存、1 块 100GB ESSD PL1 云磁盘 测试版本为Doris-2.1.5 关闭打印parpared语句的audit log以提高性能
数据集
tpch sf10 lineitem 表数据集30 个文件总共约 22 GB1.8 亿行
测试工具
DataX
测试方法
通过 txtfilereader 向 mysqlwriter 写入数据配置不同并发数和单个 INSERT 的行数
测试结果
单个 insert 的行数并发数导入速率 (行/秒)导入吞吐 (MB/秒)10010107,17211.4710020140,31714.7910030142,88215.28在上面的测试中FE 的 CPU 使用率在 60-70% 左右BE 的 CPU 使用率在 10-20% 左右。
Insert into sync 模式小批量数据
机器配置 1 台 FE阿里云 16 核 CPU、64GB 内存、1 块 500GB ESSD PL1 云磁盘 5 台 BE阿里云 16 核 CPU、64GB 内存、1 块 1TB ESSD PL1 云磁盘。 1 台测试客户端阿里云 16 核 CPU、64GB 内存、1 块 100GB ESSD PL1 云磁盘 测试版本为Doris-2.1.5
数据集 tpch sf10 lineitem 表数据集。 建表语句为
CREATE TABLE IF NOT EXISTS lineitem (L_ORDERKEY INTEGER NOT NULL,L_PARTKEY INTEGER NOT NULL,L_SUPPKEY INTEGER NOT NULL,L_LINENUMBER INTEGER NOT NULL,L_QUANTITY DECIMAL(15,2) NOT NULL,L_EXTENDEDPRICE DECIMAL(15,2) NOT NULL,L_DISCOUNT DECIMAL(15,2) NOT NULL,L_TAX DECIMAL(15,2) NOT NULL,L_RETURNFLAG CHAR(1) NOT NULL,L_LINESTATUS CHAR(1) NOT NULL,L_SHIPDATE DATE NOT NULL,L_COMMITDATE DATE NOT NULL,L_RECEIPTDATE DATE NOT NULL,L_SHIPINSTRUCT CHAR(25) NOT NULL,L_SHIPMODE CHAR(10) NOT NULL,L_COMMENT VARCHAR(44) NOT NULL
)
DUPLICATE KEY(L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER)
DISTRIBUTED BY HASH(L_ORDERKEY) BUCKETS 32
PROPERTIES (replication_num 3
);测试工具
Jmeter
需要设置的jmeter参数如下图所示 设置测试前的init语句set group_commitasync_mode以及set enable_nereids_plannerfalse。开启jdbc的prepared statement完整的url为jdbc:mysql://127.0.0.1:9030?useServerPrepStmtstrueuseLocalSessionStatetruerewriteBatchedStatementstruecachePrepStmtstrueprepStmtCacheSqlLimit99999prepStmtCacheSize50sessionVariablesgroup_commitasync_modesessionVariablesenable_nereids_plannerfalse。设置导入类型为prepared update statement。设置导入语句。设置每次需要导入的值注意导入的值与导入值的类型要一一匹配。
测试方法
通过 Jmeter 向Doris写数据。每个并发每次通过insert into写入1行数据。
测试结果 数据单位为行每秒。 以下测试分为30100500并发。
30并发sync模式5个BE3副本性能测试
Group commit internal10ms20ms50ms100ms321.5307.3285.8224.3
100并发sync模式性能测试
Group commit internal10ms20ms50ms100ms1175.21108.71016.3704.5
500并发sync模式性能测试
Group commit internal10ms20ms50ms100ms3289.83686.73280.72609.2
Insert into sync 模式大批量数据
机器配置 1 台 FE阿里云 16 核 CPU、64GB 内存、1 块 500GB ESSD PL1 云磁盘 5 台 BE阿里云 16 核 CPU、64GB 内存、1 块 1TB ESSD PL1 云磁盘。注测试中分别用了1台3台5台BE进行测试。 1 台测试客户端阿里云 16 核 CPU、64GB 内存、1 块 100GB ESSD PL1 云磁盘 测试版本为Doris-2.1.5
数据集 tpch sf10 lineitem 表数据集。 建表语句为
CREATE TABLE IF NOT EXISTS lineitem (L_ORDERKEY INTEGER NOT NULL,L_PARTKEY INTEGER NOT NULL,L_SUPPKEY INTEGER NOT NULL,L_LINENUMBER INTEGER NOT NULL,L_QUANTITY DECIMAL(15,2) NOT NULL,L_EXTENDEDPRICE DECIMAL(15,2) NOT NULL,L_DISCOUNT DECIMAL(15,2) NOT NULL,L_TAX DECIMAL(15,2) NOT NULL,L_RETURNFLAG CHAR(1) NOT NULL,L_LINESTATUS CHAR(1) NOT NULL,L_SHIPDATE DATE NOT NULL,L_COMMITDATE DATE NOT NULL,L_RECEIPTDATE DATE NOT NULL,L_SHIPINSTRUCT CHAR(25) NOT NULL,L_SHIPMODE CHAR(10) NOT NULL,L_COMMENT VARCHAR(44) NOT NULL
)
DUPLICATE KEY(L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER)
DISTRIBUTED BY HASH(L_ORDERKEY) BUCKETS 32
PROPERTIES (replication_num 3
);测试工具
Jmeter
测试方法
通过 Jmeter 向Doris写数据。每个并发每次通过insert into写入1000行数据。
测试结果 数据单位为行每秒。 以下测试分为30100500并发。
30并发sync模式性能测试
Group commit internal10ms20ms50ms100ms92.2K85.9K84K83.2K
100并发sync模式性能测试
Group commit internal10ms20ms50ms100ms70.4K70.5K73.2K69.4K
500并发sync模式性能测试
Group commit internal10ms20ms50ms100ms46.3K47.7K47.4K46.5K