教育网站制作哪专业,做暧昧在线网站,企业建站模板下载,海外域名怎么打开1. Maxwell框架
开发公司为Zendesk公司开源#xff0c;用java编写的MySQL变更数据抓取软件。内部是通过监控MySQL的Binlog日志#xff0c;并将变更数据以JSON格式发送到Kafka等流处理平台。
1.1 MySQL主从复制
主机每次变更数据都会生成对应的Binlog日志#xff0c;从机可…1. Maxwell框架
开发公司为Zendesk公司开源用java编写的MySQL变更数据抓取软件。内部是通过监控MySQL的Binlog日志并将变更数据以JSON格式发送到Kafka等流处理平台。
1.1 MySQL主从复制
主机每次变更数据都会生成对应的Binlog日志从机可以通过IO流的方式将Binlog日志下载到本地可以通过它创造和主机一样的环境或者作为热备。
1.2 安装Maxwell
解压改名启动MySQL Binlog, vim /etc/my.cnf. 增加如下配置 binlog_format 日志类型的三种类型 基于语句:主机执行了什么语句在从机里同样执行一遍。如果使用了random语句会导致主从不一致。但是量级比较低基于行:主机被改动后从机同步一份。不会有主从不一致的问题但是量价比较大需要将每行修改的数据都拿一份。混合模式:一般基于语句但是如果基于语句会导致前后结果产生差异自动转成基于行。
#数据库id
server-id 1
#启动binlog该参数的值会作为binlog的文件名
log-binmysql-bin
#binlog类型maxwell要求为row类型
binlog_formatrow
#启用binlog的数据库需根据实际情况作出修改
binlog-do-dbgmall重启MySQL服务创建Maxwell所需所需的数据库和用户用来存储断点续传所需的数据。
CREATE DATABASE maxwell;
CREATE USER maxwell% IDENTIFIED BY maxwell;
GRANT ALL ON maxwell.* TO maxwell%;//maxwell库的所有权限给maxwell
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO maxwell%;//其他库的查询、复制权限给maxwell修改maxwell配置文件 cp 配置文件将会复制某个文件并且可以改名。
producerkafka
# 目标Kafka集群地址
kafka.bootstrap.servershadoop102:9092,hadoop103:9092,hadoop104:9092
#目标Kafka topic可静态配置例如:maxwell也可动态配置例如%{database}_%{table}
kafka_topictopic_db
# MySQL相关配置
hosthadoop102
usermaxwell
passwordmaxwell
jdbc_optionsuseSSLfalseserverTimezoneAsia/ShanghaiallowPublicKeyRetrievaltrue# 过滤gmall中的z_log表数据该表是日志数据的备份无须采集
filterexclude:gmall.z_log
# 指定数据按照主键分组进入Kafka不同分区避免数据倾斜
producer_partition_byprimary_key1.3 Maxwell的使用
启动zookeeperkafka启动maxwell, bin/maxwell --config config.properties --daemon启动kafka消费者进程用于消费maxwell添加到kafka的变更数据启动数据生成jar包查看消费者进程是否有新数据。编写Maxwell启停脚本
#!/bin/bashMAXWELL_HOME/opt/module/maxwellstatus_maxwell(){resultps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -lreturn $result
}start_maxwell(){status_maxwellif [[ $? -lt 1 ]]; thenecho 启动Maxwell$MAXWELL_HOME/bin/maxwell --config $MAXWELL_HOME/config.properties --daemonelseecho Maxwell正在运行fi
}stop_maxwell(){status_maxwellif [[ $? -gt 0 ]]; thenecho 停止Maxwellps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | awk {print $2} | xargs kill -9elseecho Maxwell未在运行fi
}case $1 instart )start_maxwell;;stop )stop_maxwell;;restart )stop_maxwellstart_maxwell;;
esac1.4 Bootstrap全量同步
Maxwell获取的数据都是后期变更的数据但没有获取到数据库在开启Binlog日志之前的原始数据。
全量同步命令/opt/module/maxwell/bin/maxwell-bootstrap --database gmall --table user_info --config /opt/module/maxwell/config.properties
2. 数仓数据同步策略
2.1 用户行为数据
数据源Kafka 目的地HDFS 传输方式采用Flume, 其中source为Kafka source, channel为Memmory channel, sink为HDFS sink。
根据官网查找相应参数
Kafka Source type Kafka Source全类名kafka.bootstrap.servers 连接地址kafka.topics topic_logbatchSize: 批次大小batchDurationMillis: 批次间隔2s File Channel type: filedataDirs: 存储路径checkpointDir: 偏移量存储地址keep-alive: 管道满了后生产者间隔多少秒再放数据 HDFS Sink hdfs.rollInterval 文件滚动解决小文件问题每隔多久滚动一次rollSize: 文件大小hdfs.path /origin_data/gmall/log/topic_log/%Y-%m-%d, 文件存放路径hdfs.round false, 不采用系统本地时间
#定义组件
a1.sourcesr1
a1.channelsc1
a1.sinksk1#配置source1
a1.sources.r1.type org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize 5000
a1.sources.r1.batchDurationMillis 2000
a1.sources.r1.kafka.bootstrap.servers hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topicstopic_log#配置channel
a1.channels.c1.type file
a1.channels.c1.checkpointDir /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs /opt/module/flume/data/behavior1
a1.channels.c1.maxFileSize 2146435071
a1.channels.c1.capacity 1000000
a1.channels.c1.keep-alive 6#配置sink
a1.sinks.k1.type hdfs
a1.sinks.k1.hdfs.path /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix log
a1.sinks.k1.hdfs.round false # 是否获取本地时间a1.sinks.k1.hdfs.rollInterval 10
a1.sinks.k1.hdfs.rollSize 134217728
a1.sinks.k1.hdfs.rollCount 0#控制输出文件类型
a1.sinks.k1.hdfs.fileType CompressedStream
a1.sinks.k1.hdfs.codeC gzip#组装
a1.sources.r1.channels c1
a1.sinks.k1.channel c12.2 零点漂移问题
在HDFS系统存放文件时是按照时间进行分区存放的存放时查看的是header中的timestamp但是由于数据传输过程中也需要一段时间header中的时间并不是数据的实际产生时间这个就是零点漂移问题。
解决办法借助拦截器修改header中的timestamp的值。编写拦截器代码需要在IDEA中创建对应的项目并打包。
导入依赖flume-ng-core和JSON解析依赖fastjson (1.2.62)创建包gmall.interceptor创建类TimeStampInterceptor, 继承Interceptor接口实现intercept(Event event)和intercept(Event events)使用fastjson来解析json文件得到jsonObject对象用来获取时间戳ts。将获取到的时间戳覆盖header中的timestamp, 如果数据格式错误会抛异常使用try-catch来捕获它并过滤掉该条数据。注意此处不能使用for循环来一边遍历一边删除集合数据。
Overridepublic Event intercept(Event event) {//1、获取header和body的数据MapString, String headers event.getHeaders();String log new String(event.getBody(), StandardCharsets.UTF_8);try {//2、将body的数据类型转成jsonObject类型方便获取数据JSONObject jsonObject JSONObject.parseObject(log);//3、header中timestamp时间字段替换成日志生成的时间戳解决数据漂移问题String ts jsonObject.getString(ts);headers.put(timestamp, ts);return event;} catch (Exception e) {e.printStackTrace();return null;}
Override
public ListEvent intercept(ListEvent list) {IteratorEvent iterator list.iterator();while (iterator.hasNext()) {Event event iterator.next();if (intercept(event) null) {iterator.remove();//必须使用迭代器删除}}return list;
}打包时注意要带上fastjson依赖需要在maven中添加配置打包插件。依赖中有flume和fastjson但在虚拟机上有flume没有fastjson所以需要排除flume。可以使用provided标签来排除让打包时排除依赖。 compile:在单元测试、编译、运行三种方式都会使用compile表明的依赖;test:在单元测试才会使用test表明的依赖;provided:在编译才会使用test表明的依赖; Flume配置文件中添加拦截器
a1.sources.r1.interceptors i1
a1.sources.r1.interceptors.i1.type com.atguigu.gmall.flume.interceptor.TimestampInterceptor$Builder # 全类名建议在IDEA中复制Builder也需要根据自己的代码函数名修改重新生成数据查看是否根据数据本身的时间戳存放到对应的HDFS分区文件中。
3. 业务数据同步
3.1 同步策略
全量同步每天将所有数据同步一份业务数据量小优先考虑全量同步。增量同步每天只将新增和变化进行同步业务数据量大优先考虑增量同步。
3.2 数据同步工具
全量DataX、Sqoop 增量Maxwell、Canal
3.3 DataX
是一个数据同步工具致力于实现包括关系型数据库HDFS、Hive、ODPS、HBase、MySQL等等数据源之间的互传。
架构 reader framework writer运行流程 job: 单个数据同步的作业会启动一个进程。Task: 根据不同数据源的切分策略一个Job会切分为多个TaskTask是DataX作业的最小单元每个Task负责一部分由一个线程执行。 调度策略会根据系统资源设置并发度并发度为线程同时执行的个数任务会按照并发度一组一组执行。
3.4 DataX安装
下载解压DataX安装包bin/datax.py job/job.json测试安装包是否完整MySQL Reader配置文件的书写HDFS Writer配置文件的书写执行datax命令python /opt/module/datax/bin/datax.py -p-Dtargetdir/origin_data/gmall/db/activity_info_full/2022-06-08 /opt/module/datax/job/import/gmall.activity_info.json执行完后可以使用hadoop fs cat 路径名 | zcat来查看压缩文件是否正确