工作室 网站 备案,wordpress添加轮播图,上海关键词优化,中国建设信息网官网八大员证查询Canal实时同步Mysql Binlog至 Elasticsearch 文章目录 Canal实时同步Mysql **Binlog**至**Elasticsearch** 一. 环境准备1.环境检查检查Mysql是否开启BinLog开启Mysql BinlogJava环境检查 2.新建测试库和表3.新建Es索引 二.**部署 Canal Server****2.1 解压安装包****2.2 配置 … Canal实时同步Mysql Binlog至 Elasticsearch 文章目录 Canal实时同步Mysql **Binlog**至**Elasticsearch** 一. 环境准备1.环境检查检查Mysql是否开启BinLog开启Mysql BinlogJava环境检查 2.新建测试库和表3.新建Es索引 二.**部署 Canal Server****2.1 解压安装包****2.2 配置 Canal Server****2.3 启动 Canal Server** **三. 部署 Canal Adapter同步到 Elasticsearch****3.1 配置 Adapter****3.2 配置数据映射****3.3 启动 Adapter** **4. 验证同步****4.1 插入测试数据到 MySQL****4.2 查询 Elasticsearch** 一. 环境准备 操作系统LinuxUbuntu 20.04Java 环境JDK 8建议 OpenJDK 11MySQL已启用 BinlogROW 模式并创建 Canal 用户Elasticsearch已部署版本 7.x 或 8.xCanal 二进制包从 Canal Release 下载 canal.deployer-1.1.8.tar.gz 和 canal.adapter-1.1.8.tar.gz 1.环境检查 检查Mysql是否开启BinLog
#root账号执行
SHOW VARIABLES LIKE log_bin;
SHOW VARIABLES LIKE binlog_format;输出如下证明已经打开 创建 Canal 用户并授权
#创建用户
CREATE USER canal% IDENTIFIED WITH mysql_native_password BY Password123;
# 给新创建账户赋予从库权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO canal%;# 刷新权限
FLUSH PRIVILEGES;如果没打开BinLog可以通过如下方法打开 开启Mysql Binlog
修改my.cnf文件加入如下内容
log_binmysql-bin
binlog_formatROW
binlog_expire_logs_seconds172800
expire_logs_days2log_bin:启用二进制日志日志文件会以 mysql-bin 为前缀并依次生成日志文件例如mysql-bin.000001mysql-bin.000002 等。 binlog_format:设置使用的二进制日志格式在 MySQL 8.0 版本中binlog_format 的默认值已经变为 ROW。所以即使你在配置文件中没有明确设置 binlog_formatMySQL 会默认使用 ROW 作为二进制日志格式。在较早的 MySQL 版本中默认值是 STATEMENT。 binlog_expire_logs_seconds172800 和 expire_logs_days2这些配置设置了二进制日志的过期时间默认情况下MySQL 会保留二进制日志直到它们过期或达到日志文件数的限制。在这种情况下日志会在 2 天后过期。 配置好后重启Mysql:
systemctl restart mysqld.serviceJava环境检查
echo $JAVA_HOME2.新建测试库和表 CREATE DATABASE IF NOT EXISTS canal default charset utf8 COLLATE utf8_general_ci;CREATE TABLE test_user (id bigint unsigned NOT NULL AUTO_INCREMENT,name varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT 姓名,sex varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT 性别,tel varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT 电话,PRIMARY KEY (id) USING BTREE
) ENGINEInnoDB DEFAULT CHARSETutf8mb4 COLLATEutf8mb4_general_ci ROW_FORMATDYNAMIC;3.新建Es索引
curl -X PUT http://your es IP:9200/test_user -H Content-Type: application/json -u es账号:es 密码 -d
{mappings: {properties: {id: {type: long},title: {type: text},sex: {type: text},tel: {type: text}}}
}二.部署 Canal Server
2.1 解压安装包
# 创建目录
mkdir -p /opt/canal/server /opt/canal/adapter# 解压 Server
tar -zxvf canal.deployer-1.1.8.tar.gz -C /opt/canal/server# 解压 Adapter
tar -zxvf canal.adapter-1.1.8.tar.gz -C /opt/canal/adapter2.2 配置 Canal Server
修改配置文件 /opt/canal/server/conf/canal.properties
# tcp bind ip
canal.ip 127.0.0.1
# register ip to zookeeper
canal.register.ip
canal.port 11111
canal.metrics.pull.port 11112# 目标实例名称默认 example
canal.destinations example# 持久化模式默认内存可选 H2/MySQL
canal.instance.tsdb.spring.xml classpath:spring/tsdb/h2-tsdb.xml这里主要修改canal.ip其他保持默认即可。
修改实例配置 /opt/canal/server/conf/example/instance.properties
#被同步的mysql地址填写自己的IP地址
canal.instance.master.address127.0.0.1:3306
#第一步创建的数据库从库权限账号/密码
canal.instance.dbUsernamecanal
canal.instance.dbPasswordPassword123
#数据库连接编码
canal.instance.connectionCharset UTF-8
#Binlog 过滤规则监控所有库表
canal.instance.filter.regex.*\\..*
#指定了 Canal 消费者比如 MQ 客户端读取和写入消息的目标主题保持默认即可
canal.mq.topicexample2.3 启动 Canal Server
cd /opt/canal/server/bin
./startup.sh# 查看日志
tail -f /opt/canal/server/logs/canal/canal.log
tail -f /opt/canal/server/logs/example/example.log可以看到日志没有明显报错且进程已经启动则表示Canal Server已经启动成功。 三. 部署 Canal Adapter同步到 Elasticsearch
3.1 配置 Adapter
修改配置文件 /opt/canal/adapter/conf/application.yml
server:port: 8081
spring:jackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT8default-property-inclusion: non_nullcanal.conf:mode: tcp # 客户端的模式可选tcp kafka rocketMQflatMessage: true # 扁平message开关, 是否以json字符串形式投递数据, 仅在kafka/rocketMQ模式下有效zookeeperHosts: # 对应集群模式下的zk地址syncBatchSize: 1000 # 每次同步的批数量retries: 0 # 重试次数, -1为无限重试timeout: # 同步超时时间, 单位毫秒accessKey:secretKey:consumerProperties:# canal tcp consumercanal.tcp.server.host: 127.0.0.1:11111 #配置canal-server的地址canal.tcp.zookeeper.hosts:canal.tcp.batch.size: 500canal.tcp.username:canal.tcp.password:srcDataSources: # 源数据库配置defaultDScanal是测试数据库url: jdbc:mysql://yourIP:3306/canal?useUnicodetrueuseSSLtrue #数据库连接,canal是测试用的数据库username: root #数据库账号password: Pass1234 #数据库密码canalAdapters: # 适配器列表- instance: example # canal实例名,和上述Server的配置一样groups: # 分组列表- groupId: g1 # 分组id, 如果是MQ模式将用到该值outerAdapters:- name: logger # 日志打印适配器- name: es8 # ES同步适配器根据自己的es版本来hosts: your IP:9200 # ES连接地址properties:mode: rest # 模式可选transport端口(9300) 或者 rest端口(9200)security.auth: elastic:123456 # 连接es的用户和密码仅rest模式使用cluster.name: elasticsearch # ES集群名称如何获取es集群名称,命令输出的cluster_name就是上面需要配置的集群名字
curl -u elastic:esPass -X GET http://es IP:9200/_cluster/health?pretty3.2 配置数据映射
创建 Elasticsearch 映射文件 /opt/canal/adapter/conf/es8/mytest_user.yml
dataSourceKey: defaultDS # 源数据源的key, 对应上面application配置的srcDataSources中的值
destination: example # canal的instance或者MQ的topic
groupId: g1 # 对应MQ模式下的groupId, 只会同步对应groupId的数据
esMapping:_index: test_user # es 的索引名称_id: _id # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配sql: SELECTtb.id AS _id,tb.name,tb.sex,tb.telFROMtest_user us # sql映射etlCondition: where p.id{} #etl的条件参数commitBatch: 3000 # 提交批大小3.3 启动 Adapter
cd /opt/canal/adapter/bin
./startup.sh#查看日志
tail -f /opt/canal/adapter/logs/adapter/adapter.log会输出很多数据库变更的日志 4. 验证同步
4.1 插入测试数据到 MySQL
#执行sql
INSERT INTO test_user (name, sex, tel) VALUES (Paco, 男, 123456789);4.2 查询 Elasticsearch
curl -u elastic:esPass -X GET http://esIP:9200/test_user/_search?pretty也可以在工具上查看这边是Eage插件: 至此即可验证可同步成功。我们可以修改数据测试看是否能同步。 然后我们测试修改Es的数据: 可以发现数据库并没有变至此Canal单向实时同步Mysql Binlog至Elasticsearch就配置完成了。