哈尔滨网站建设 博客,做社交网站怎么赚钱,o2o网站建设信息,郑州网站排名哪家好1.Maxwell简介
Maxwell 是一款用Java编写的MySQL变更数据抓取软件#xff0c;它会实时监控Mysql数据库的数据变更操作#xff08;包括insert、update、delete#xff09;#xff0c;并将变更数据以 JSON 格式发送给 Kafka、Kinesi等流数据处理平台
官网地址#xff1a;M…1.Maxwell简介
Maxwell 是一款用Java编写的MySQL变更数据抓取软件它会实时监控Mysql数据库的数据变更操作包括insert、update、delete并将变更数据以 JSON 格式发送给 Kafka、Kinesi等流数据处理平台
官网地址Maxwells Daemon Maxwell输出数据格式 字段说明 字段 解释 database 变更数据所属的数据库 table 表更数据所属的表 type 数据变更类型 ts 数据变更发生的时间 xid 事务id commit 事务提交标志可用于重新组装事务 data 对于insert类型表示插入的数据对于update类型标识修改之后的数据对于delete类型表示删除的数据 old 对于update类型表示修改之前的数据只包含变更字段
2.Maxwell原理
Maxwell的工作原理是实时读取MySQL数据库的二进制日志Binlog从中获取变更数据再将变更数据以JSON格式发送至Kafka等流处理平台
2.1 MySQL二进制日志
二进制日志Binlog是MySQL服务端非常重要的一种日志它会保存MySQL数据库的所有数据变更记录。Binlog的主要作用包括主从复制和数据恢复
2.2 MySQL主从复制
MySQL的主从复制就是用来建立一个和主数据库完全一样的数据库环境这个数据库称为从数据库
2.2.1 应用场景
做数据库的热备主数据库服务器故障后可切换到从数据库继续工作。读写分离主数据库只负责业务数据的写入操作而多个从数据库只负责业务数据的查询工作在读多写少场景下可以提高数据库工作效率
2.2.2 工作原理
Master主库将数据变更记录写到二进制日志(binary log)中Slave从库向mysql master发送dump协议将master主库的binary log events拷贝到它的中继日志(relay logSlave从库读取并回放中继日志中的事件将改变的数据同步到自己的数据库 MySQL的dump协议是一种用于在客户端和服务器之间传输数据的文件格式。它主要用于备份和恢复数据库以及在不同版本的MySQL服务器之间迁移数据。dump协议的文件通常以.sql为扩展名包含了创建表、插入数据等SQL语句 由此可见maxwell的工作原理就是就是将自己伪装成slave并遵循MySQL主从复制的协议从master同步数据 3.Maxwell部署
3.1 安装Maxwell
安装包下载地址https://github.com/zendesk/maxwell/releases/download/v1.29.2/maxwell-1.29.2.tar.gz 注Maxwell-1.30.0及以上版本不再支持JDK1.8 下载完成后解压即可目录结构如下 3.2 配置MySQL
1.首先需要开启MySQL的binlog服务sudo vim /etc/my.cnf
增加以下配置
[mysqld]#数据库id
server-id 1
#启动binlog该参数的值会作为binlog的文件名
log-binmysql-bin
#binlog类型maxwell要求为row类型
binlog_formatrow
#启用binlog的数据库需根据实际情况作出修改
binlog-do-dbgmall 1.有关binlog-do-db如果需要有多个数据库来启用binlog则要增加多条语句配置 也可以使用binlog-ignore-db来设置不需监控的数据库 2.有关server-idserver-id是MySQL服务器的一个唯一标识符它通常在配置文件中设置。这个标识符在复制过程中非常重要因为它帮助确保数据的一致性和正确性 3.有关binlog模式MySQL的binlog模式有 Statement-based基于语句Binlog会记录所有写操作的SQL语句包括insert、update、delete等 优点 节省空间缺点 有可能造成数据不一致例如insert语句中包含now()函数 Row-based基于行Binlog会记录每次写操作后被操作行记录的变化 优点保持数据的绝对一致性缺点占用较大空间mixed混合模式默认是Statement-based如果SQL语句可能导致数据不一致就自动切换到Row-based Maxwell要求Binlog采用Row-based模式 2.重启MySQL服务sudo systemctl restart mysqld
3.3 创建Maxwell所需数据库和用户
1.创建数据库CREATE DATABASE maxwell
2.调整MySQL数据库密码级别
set global validate_password_policy0
set global validate_password_length4 global validate_password_policy用于指定密码的强度验证等级 LOW0只验证密码长度。MEDIUM1验证长度、数字、大小写字母、特殊字符。STRONG2验证长度、数字、大小写字母、特殊字符以及字典文件。 global validate_password_length用于设置密码的最小长度
3.创建Maxwell用户并赋予其必要权限
①CREATE USER maxwell% IDENTIFIED BY maxwell创建用户、 maxwell%用户名是maxwell主机名是%%表示这个用户可以从任何主机连接到数据库服务器 IDENTIFIED BY maxwell密码是maxwell ②GRANT ALL ON maxwell.* TO maxwell%;Maxwell用户可以操作maxwell数据库下的所有表
③GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO maxwell%;
赋予maxwell用户对于所有数据库下所有表的主从复制权限 权限SELECT查询、REPLICATION CLIENT复制客户端和REPLICATION SLAVE复制从服务器 ON *.*指定这些权限适用的数据库和表。星号*表示所有数据库和表 TO maxwell%指定要授予权限的用户 3.4 配置Maxwell
首先修改配置文件名称cp config.properties.example config.properties
然后对该文件进行修改
#Maxwell数据发送目的地可选配置有stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
producerkafka
#目标Kafka集群地址
kafka.bootstrap.servershadoop102:9092,hadoop103:9092
#目标Kafka topic可静态配置例如:maxwell也可动态配置例如%{database}_%{table}
kafka_topictopic_db#MySQL相关配置
hosthadoop102
usermaxwell
passwordmaxwell
jdbc_optionsuseSSLfalseserverTimezoneAsia/Shanghai
4.Maxwell使用
以将MySQL数据同步到Kafka为例首先启动Kafka集群
4.1 maxwell启停
启动/opt/module/maxwell/bin/maxwell --config /opt/module/maxwell/config.properties --daemon
停止ps -ef | grep maxwell | grep -v grep | grep maxwell | awk {print $2} | xargs kill -9 解析 ps -ef: 这是列出所有正在运行的进程的命令。grep maxwell: 这是搜索包含maxwell字符串的进程。grep -v grep: 这是排除包含grep字符串的进程以避免误杀grep命令本身。awk {print $2}: 这是使用awk命令提取每行输出的第二个字段即进程IDPID。xargs kill -9: 这是将提取到的进程ID作为参数传递给kill命令并使用-9选项强制终止进程。 启停脚本
MAXWELL_HOME/opt/module/maxwellstatus_maxwell(){resultps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -lreturn $result
}start_maxwell(){status_maxwellif [[ $? -lt 1 ]]; thenecho 启动Maxwell$MAXWELL_HOME/bin/maxwell --config $MAXWELL_HOME/config.properties --daemonelseecho Maxwell正在运行fi
}stop_maxwell(){status_maxwellif [[ $? -gt 0 ]]; thenecho 停止Maxwellps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | awk {print $2} | xargs kill -9elseecho Maxwell未在运行fi
}case $1 instart )start_maxwell;;stop )stop_maxwell;;restart )stop_maxwellstart_maxwell;;
esac
4.2 增量数据同步
启动kafka消费者bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic topic_db
然后在mysql中gmall数据库的activity_info表中插入一条数据 可以看到Kafka消费者成功捕获到Maxwell发送来的数据 4.3 历史数据全量同步
Maxwell提供了bootstrap功能来进行历史数据的全量同步
命令格式/opt/module/maxwell/bin/maxwell-bootstrap --database gmall --table activity_info --config /opt/module/maxwell/config.properties
执行指令后可以看到所有数据均已同步到Kafka消费者 注意
1第一条type为bootstrap-start和最后一条type为bootstrap-complete的数据是bootstrap开始和结束的标志不包含数据中间的type为bootstrap-insert的数据才包含数据
2一次bootstrap输出的所有记录的ts都相同为bootstrap开始的时间