做网站公司的使命,搜索技巧,黄冈网站建设公司,中国建设银行手机网站开启 canal同步MySQL增量数据到ES
canal 是阿里知名的开源项目#xff0c;主要用途是基于 MySQL 数据库增量日志解析#xff0c;提供增量数据订阅和消费。示使用 canal 将 MySQL 增量数据同步到ES。
一、集群模式 图中 server 对应一个 canal 运行实例 #xff0c;对应一…开启 canal同步MySQL增量数据到ES
canal 是阿里知名的开源项目主要用途是基于 MySQL 数据库增量日志解析提供增量数据订阅和消费。示使用 canal 将 MySQL 增量数据同步到ES。
一、集群模式 图中 server 对应一个 canal 运行实例 对应一个 JVM 。
server 中包含 1…n 个 instance 我们可以将 instance 理解为配置任务。
instance 包含如下模块
eventParser数据源接入模拟 slave 协议和 master 进行交互协议解析。
eventSinkParser 和 Store 链接器进行数据过滤加工分发的工作。
eventStore数据存储。
metaManager增量订阅 消费信息管理器。
真实场景中canal 高可用依赖 zookeeper 笔者将客户端模式可以简单划分为TCP 模式 和 MQ 模式 。
实战中我们经常会使用 MQ 模式 。因为 MQ 模式的优势在于解耦 canal server 将数据变更信息发送到消息队列 kafka 或者 RocketMQ 消费者消费消息顺序执行相关逻辑即可。
顺序消费
对于指定的一个 Topic 所有消息根据 Sharding Key 进行区块分区同一个分区内的消息按照严格的先进先出FIFO原则进行发布和消费。同一分区内的消息保证顺序不同分区之间的消息顺序不做要求。
二、MySQL配置
1、对于自建 MySQL , 需要先开启 Binlog 写入功能配置 binlog-format 为 ROW 模式my.cnf 中配置如下
[mysqld] log-binmysql-bin # 开启 binlog binlog-formatROW # 选择 ROW 模式 server_id1 # 配置 MySQL replaction 需要定义不要和 canal 的 slaveId 重复
注意针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步。
2、授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant
CREATE USER canal IDENTIFIED BY ‘canal’; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO ‘canal’‘%’; – GRANT ALL PRIVILEGES ON . TO ‘canal’‘%’ ; FLUSH PRIVILEGES;
3、创建数据库商品表 t_product
CREATE TABLE t_product ( id BIGINT ( 20 ) NOT NULL AUTO_INCREMENT, name VARCHAR ( 255 ) COLLATE utf8mb4_bin NOT NULL, price DECIMAL ( 10, 2 ) NOT NULL, status TINYINT ( 4 ) NOT NULL, create_time datetime NOT NULL, update_time datetime NOT NULL, PRIMARY KEY ( id ) ) ENGINE INNODB DEFAULT CHARSET utf8mb4 COLLATE utf8mb4_bin
三、Elasticsearch配置
使用 Kibana 创建商品索引 。
PUT /t_product { “settings”: { “number_of_shards”: 2, “number_of_replicas”: 1 }, “mappings”: { “properties”: { “id”: { “type”:“keyword” }, “name”: { “type”:“text” }, “price”: { “type”:“double” }, “status”: { “type”:“integer” }, “createTime”: { “type”: “date”, “format”: “yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis” }, “updateTime”: { “type”: “date”, “format”: “yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis” } } } }
执行完成如图所示 四、RocketMQ 配置
创建主题product-syn-topic canal 会将 Binlog 的变化数据发送到该主题。
五、canal 配置
我们选取 canal 版本 1.1.6 进入 conf 目录。
1、配置 canal.properties
#集群模式 zk地址 canal.zkServers localhost:2181 #本质是MQ模式和tcp模式 tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ canal.serverMode rocketMQ #instance 列表 canal.destinations product-syn #conf root dir canal.conf.dir …/conf #全局的spring配置方式的组件文件 生产环境集群化部署 canal.instance.global.spring.xml classpath:spring/default-instance.xml
######以下部分是默认值 展示出来 #Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下) canal.mq.canalBatchSize 50 #Canal get数据的超时时间, 单位: 毫秒, 空为不限超时 canal.mq.canalGetTimeout 100 #是否为 flat json格式对象 canal.mq.flatMessage true
2、instance 配置文件
在 conf 目录下创建实例目录 product-syn , 在 product-syn 目录创建配置文件 instance.properties。
#按需修改成自己的数据库信息 ################################################# … canal.instance.master.address192.168.1.20:3306 #username/password,数据库的用户名和密码 … canal.instance.dbUsername canal canal.instance.dbPassword canal …
#table regex canal.instance.filter.regexmytest.t_product
#mq config canal.mq.topicproduct-syn-topic #针对库名或者表名发送动态topic #canal.mq.dynamicTopicmytest,.,mytest.user,mytest\…,.\… canal.mq.partition0 #hash partition config #canal.mq.partitionsNum3 #库名.表名: 唯一主键多个表之间用逗号分隔 #canal.mq.partitionHashmytest.person:id,mytest.role:id #################################################
3、服务启动
启动两个 canal 服务我们从 zookeeper gui 中查看服务运行情况 。 修改一条 t_product 表记录可以从 RocketMQ 控制台中观测到新的消息。
六、消费者
1、产品索引操作服务
2、消费监听器 消费者逻辑重点有两点
顺序消费监听器
将消息数据转换成 JSON 字符串从 data 节点中获取表最新数据批量操作可能是多条。然后根据操作类型 UPDATE、 INSERT、DELETE 执行产品索引操作服务的方法。