怀化网站排名优化,包头企业网站制作,各国网站域名,大连金州房价一、Datax
1.1 DataX 3.0概述 DataX3.0是一个异构数据源离线同步工具#xff0c;可以方便的对各种异构数据源进行高效的数据同步。 其github地址为#xff1a;
https://github.com/alibaba/DataX/blob/master/introduction.mdhttps://github.com/alibaba/DataX/blob/mast…
一、Datax
1.1 DataX 3.0概述 DataX3.0是一个异构数据源离线同步工具可以方便的对各种异构数据源进行高效的数据同步。 其github地址为
https://github.com/alibaba/DataX/blob/master/introduction.mdhttps://github.com/alibaba/DataX/blob/master/introduction.md
GitCode - 开发者的代码家园https://gitcode.com/alibaba/datax/overview
1.2 DataX3.0框架设计
DataX将复杂的网状的同步链路变成了星型数据链路DataX自身作为中间传输载体负责连接各种数据源解决了异构数据源同步问题。Datax采用的是 DataX本身作为离线数据同步框架采用Framework plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件纳入到整个同步框架中 ReaderReader为数据采集模块负责采集数据源的数据将数据发送给Framework。WriterWriter为数据写入模块负责不断向Framework取数据并将数据写入到目的端。FrameworkFramework用于连接reader和writer作为两者的数据传输通道并处理缓冲流控并发数据转换等核心技术问题。
1.3 DataX3.0核心架构 DataX 3.0 开源版本支持单机多线程模式完成同步作业运行。基于DataX作业生命周期的时序图从整体架构设计角度来阐述DataX各个模块相互关系。 1.3.1 核心模块介绍 DataX完成单个数据同步的作业我们称之为JobDataX接受到一个Job之后将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。DataXJob启动后会根据不同的源端切分策略将Job切分成多个小的Task(子任务)以便于并发执行。Task便是DataX作业的最小单元每一个Task都会负责一部分数据的同步工作。切分多个Task之后DataX Job会调用Scheduler模块根据配置的并发数据量将拆分成的Task重新组合组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task默认单个任务组的并发数量为5。每一个Task都由TaskGroup负责启动Task启动后会固定启动Reader—Channel—Writer的线程来完成任务同步工作。DataX作业运行起来之后 Job监控并等待多个TaskGroup模块任务完成等待所有TaskGroup任务完成后Job成功退出。否则异常退出进程退出值非0。 1.3.2 DataX调度流程 用户提交了一个DataX作业并且配置了DataX Channel并发数为20个需求是将一个100张分表的mysql数据同步到starrocks里面 则DataX的调度决策思路是 DataXJob根据分库分表切分成了100个Task。根据20个并发DataX计算共需要分配4个TaskGroup。4个TaskGroup平分切分好的100个Task每一个TaskGroup负责以5个并发共计运行25个Task。 二、StarRocksWriter DataX基于StarRocks开发的StarRocksWriter插件支持MySQL、Oracle等数据库中的数据导入至 StarRocks。在底层实现上StarRocksWriter内部将各种reader读取的数据进行缓存攒批以csv或 json格式之后采用Stream Load 方式批量导入至 StarRocks。总体的数据流是Source --Reader --DataX channel -- Writer --- StarRocks 官网文章地址
使用 DataX 导入 | StarRocks
三、创建配置文件 为导入作业创建 JSON 格式配置文件 这里列举几种Datax同步脚本。
1同步oracle数据至starrocksoracle2starrocks.json
{job: {setting: {speed: {channel: 1},errorLimit: {record: 0,percentage: 0}},content: [{reader: {name: oraclereader,parameter: {username: root,password: root,connection: [{querySql: [select fid,f_diccode,concat(substr(qhcode,1,2),0000) as partition_no from nannd.test1],jdbcUrl: [jdbc:oracle:thin:192.168.22.115:1521/init]}]}},writer: {name: starrockswriter,parameter: {username: root,password: root,database: ,table: test2,column: [fid,f_diccode,partition_no],preSql: [truncate table des.test2],postSql: [],jdbcUrl: jdbc:mysql://192.168.10.103:9030,loadUrl: [192.168.10.101:8030,192.168.10.102:8030,192.168.10.103:8030],loadProps: {format: json,strip_outer_array: true}}}}]}
}
OracleReader的配置说明见 https://github.com/alibaba/DataX/blob/master/introduction.md
https://github.com/alibaba/DataX/blob/master/oraclereader/doc/oraclereader.md
StarRocksWriter的配置说明见官网
使用 DataX 导入 | StarRocks 2同步mysql库的数据至starrocksmysql2starrocks.json
{job: {setting: {speed: {channel: 1},errorLimit: {record: 0,percentage: 0}},content: [{reader: {name: mysqlreader,parameter: {username: root,password: root,column: [OBJECTID,xmmc,shengmc,shimc,xianmc,],connection: [{table: [init2.test6],jdbcUrl: [jdbc:mysql://192.168.22.156:3306/init2]}]}},writer: {name: starrockswriter,parameter: {username: root,password: root,database: des3,table: test7,column: [OBJECTID,shengmc,shimc,xianmc,],preSql: [],postSql: [],jdbcUrl: ,loadUrl: [192.168.10.101:8030,192.168.10.102:8030,192.168.10.103:8030],loadProps: {format: json,strip_outer_array: true}}}}]}
}
MysqlReader的配置说明见 https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md
StarRocksWriter的配置说明见官网
3同步tidb库的数据至starrockstidb2starrocks.json
{job: {setting: {speed: {channel: 1},errorLimit: {record: 0,percentage: 0}},content: [{reader: {name: mysqlreader,parameter: {username: root,password: rootsq2023,connection: [{querySql: [select id,member_id,create_time,update_time,now() as run_dt from test2],jdbcUrl: [jdbc:mysql://192.168.22.143:4000/init1]}]}},writer: {name: starrockswriter,parameter: {username: root,password: root,database: des1,table: test3,column: [id,member_id,create_time,update_time,run_dt],preSql: [],postSql: [],jdbcUrl: ,loadUrl: [192.168.10.101:8030,192.168.10.102:8030,192.168.10.103:8030],loadProps: {format: json,strip_outer_array: true}}}}]}
}ps从tidb数据读取数据采用的read插件还是MysqlReder不赘述。
四、常见问题记录
4.1 常规排查方案 例如针对配置文件job.json启动导入任务设置JVM 调优参数--jvm-Xms6G -Xmx6G以及日志等级--logleveldebug日志等级用来任务失败时打印更详细的作业执行信息
python datax/bin/datax.py --jvm-Xms6G -Xmx6G --logleveldebug datax/job/job.json
4.2 时区问题 如果源数据库与目标数据库时区不同需要命令行中添加 -Duser.timezoneGMTxxx 选项设置源数据库的时区信息。例如源库使用 UTC 时区则启动任务时需添加参数 -Duser.timezoneGMT0
4.3 性能调优
4.3.1 合理拆分任务 合理配置任务参数让DataX任务拆分为多个Task同时提升DataX Channel并发数。以mysqlreader为例就要合理配置splitPk参数如果splitPk不填写包括不提供splitPk或者splitPk值为空DataX会视作使用单通道同步该表数据。
4.3.2 配置堆内存 当提升DataX Job内Channel并发数时内存的占用也会显著增加因为DataX作为数据交换通道在内存中会缓存较多的数据。例如Channel中会有一个Buffer作为临时的数据交换的缓冲区而在部分Reader和Writer的中也会存在一些Buffer为了防止OOM等错误调大JVM的堆内存。调整JVM xms xmx参数的两种方式一种是直接更改datax.py脚本另一种是在启动的时候在命令行添加对应的参数如下xms:初始化堆内存; xmx:堆最大内存
python datax/bin/datax.py --jvm-Xms6G -Xmx6G --logleveldebug datax/job/job.json
ps建议将初始化堆内存与堆最大内存配置一致这样可以让同步数据处理起来更快也可以避免内存的抖动。
4.3.3 任务限速 使用DataX进行数据同步的另一个优势是可以限速进而降低同步过程中对业务库的压力影响。DataX3.0提供了包括通道并发、记录流、字节流三种流控模式可以方便的控制同步作业速度让同步作业在库可以承受的范围内达到最佳的同步速度。以最常用的字节流限速为例 修改datax/conf/core.json限制单个chanel的速度为2M 2*1024*1024 2097152 byte
speed: {byte: 2097152,},
同时修改作业json部分的速度限制例如限制为4M这样任务会用4/22个channel并发进行任务修改 job: {setting: {speed: {byte : 4194304}},...}
以及
speed: {channel: 5,byte: 1048576,record: 10000}
4.3.4 读取StarRocks数据 StarRocks兼容MySQL协议当我们需要将StarRocks中的数据同步至其他数据库时可以使用mysqlreader来直接读取但这种JDBC的方式性能可能不是很好推荐Flink Connector或者Spark Connector来进行处理。
参考文章
第3.5章StarRocks数据导入--DataX StarRocksWriter_datax-starrockswriter-CSDN博客