传媒公司网站源码php,h5游戏代理,江西网站设计哪家强,中国设计网站排行榜前十名【黑马点评优化】2-Canel实现多级缓存#xff08;RedisCaffeine#xff09;同步 0 背景1 配置MySQL1.1 开启MySQL的binlog功能1.1.1 找到mysql配置文件my.ini的位置1.1.2 开启binlog 1.2 创建canal用户 2 下载配置canal2.1 canal 1.1.5下载2.2 配置canal2.3 启动canal2.4 测试… 【黑马点评优化】2-Canel实现多级缓存RedisCaffeine同步 0 背景1 配置MySQL1.1 开启MySQL的binlog功能1.1.1 找到mysql配置文件my.ini的位置1.1.2 开启binlog 1.2 创建canal用户 2 下载配置canal2.1 canal 1.1.5下载2.2 配置canal2.3 启动canal2.4 测试 3 canal实现双写一致3.1 Redis操作封装3.2 编写监听器3.3 修改ShopServiceImpl.java3.4 测试 参考资料 github地址如下https://github.com/xianghua-2/hm-dianping 0 背景
【黑马点评优化】之使用CaffeineRedis实现应用级二层缓存_caffeine redis二级缓存-CSDN博客
当时使用RedisCaffeine实现对商铺信息的应用层两级缓存。文章提到了两级缓存RedisCaffeine可以解决缓存雪等问题也可以提高接口的性能但是可能会出现缓存一致性问题。如果数据频繁的变更可能会导致Redis和Caffeine数据不一致的问题。
为此使用Canel来解决这一问题。
MySQL工作原理如下
一句话总结详细工作原理可以查看下面的介绍
模拟MySQL从库读取binlog实现数据变更监听。它支持数据过滤、转换并能将变更数据推送到不同的下游系统如消息队列和其他数据库。
我的相关软件版本 mysql8.0.36 redis6.2.6 canal:1.1.5
1 配置MySQL
1.1 开启MySQL的binlog功能
1.1.1 找到mysql配置文件my.ini的位置
开始的时候找不到mysql配置文件my.ini的位置
通过下列方法找到
先连接mysql
mysql -h localhost -u root -p123456 注意有密码的话才-p123456
输入以下命令
show variables like datadir;
这样就找到了配置文件所在的位置
1.1.2 开启binlog
my.ini的最后几行加上以下内容
[mysqld]
log-binmysql-bin
binlog-formatROW
server-id11.2 创建canal用户
DROP USER IF EXISTS canal%;
CREATE USER canal% IDENTIFIED BY canal;
GRANT ALL PRIVILEGES ON *.* TO canal%;
FLUSH PRIVILEGES;
但是由于mysql8.0之后 如果只设置了 % 的访问权限 会导致localhost无法访问 所以 我们需要把当前权限更新为 localhost 再执行一遍 继续执行下述命令
select mysql;
update user set host localhost where user canal and host%;
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO canal%;
FLUSH PRIVILEGES; 查看所有用户访问权限结果如下
-- 查看所有用户访问权限SELECT DISTINCT CONCAT(User: ,user,,host,;) AS query FROM mysql.user;自MySQL 8.0.3开始身份验证插件默认使用caching_sha2_password
解决修改canal用户对应的身份验证插件为mysql_native_password
因此接着执行下列命令
ALTER USER canal% IDENTIFIED WITH mysql_native_password BY password;
ALTER USER canallocalhost IDENTIFIED WITH mysql_native_password BY password;
之后我们再次查看canal用户对应的身份验证插件如下即修改成功 2 下载配置canal
2.1 canal 1.1.5下载
在这里我们选择1.1.15版本的canal因为canal1.1.15版本以后的canal不兼容Java 1.8
Release v1.1.5 · alibaba/canal (github.com)
选择canal.deployer-1.1.5.tar.gz下载之后解压到自己的软件目录下。 2.2 配置canal
进入解压后的Canal目录找到conf目录下的example实例通常情况下你可以通过修改conf/example/instance.properties文件来配置Canal连接到MySQL的参数主要配置项包括
canal.instance.master.addressMySQL服务器地址和端口。 canal.instance.dbUsername和canal.instance.dbPassword用于连接MySQL的用户名和密码。 canal.instance.connectionCharset数据库的字符集通常为UTF-8。 canal.instance.tsdb.enable是否启用表结构历史记录功能建议开启。 2.3 启动canal
当安装好canal的时候,在window中启动bat的时候有些问题,JDK17版本已经不持’PermSize128m’
因此删除start.bat中的这一行 之后双击bin/start.bat运行即可。
结果如下 2.4 测试
pom.xml中导入依赖 dependencygroupIdcom.alibaba.otter/groupIdartifactIdcanal.client/artifactIdversion1.1.7/version/dependencydependencygroupIdcom.alibaba.otter/groupIdartifactIdcanal.protocol/artifactIdversion1.1.7/version/dependency之后编写测试类如下 import java.net.InetSocketAddress;
import java.util.List;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.protocol.Message;
import org.jetbrains.annotations.NotNull;public class CanalTest {public static void main(String args[]) {// 创建链接CanalConnector connector CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),11111), example, , );int batchSize 1000;int emptyCount 0;try {connector.connect();connector.subscribe(.*\\..*);connector.rollback();int totalEmtryCount 1200;while (emptyCount totalEmtryCount) {Message message connector.getWithoutAck(batchSize); // 获取指定数量的数据long batchId message.getId();int size message.getEntries().size();if (batchId -1 || size 0) {emptyCount;System.out.println(empty count : emptyCount);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}} else {emptyCount 0;// System.out.printf(message[batchId%s,size%s] \n, batchId, size);printEntry(message.getEntries());}connector.ack(batchId); // 提交确认// connector.rollback(batchId); // 处理失败, 回滚数据}System.out.println(empty too many times, exit);} finally {connector.disconnect();}}private static void printEntry(NotNull ListEntry entrys) {for (Entry entry : entrys) {if (entry.getEntryType() EntryType.TRANSACTIONBEGIN || entry.getEntryType() EntryType.TRANSACTIONEND) {continue;}RowChange rowChage null;try {rowChage RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException(ERROR ## parser of eromanga-event has an error , data: entry.toString(),e);}EventType eventType rowChage.getEventType();System.out.println(String.format( binlog[%s:%s] , name[%s,%s] , eventType : %s,entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));for (RowData rowData : rowChage.getRowDatasList()) {if (eventType EventType.DELETE) {printColumn(rowData.getBeforeColumnsList());} else if (eventType EventType.INSERT) {printColumn(rowData.getAfterColumnsList());} else {System.out.println(------- before);printColumn(rowData.getBeforeColumnsList());System.out.println(------- after);printColumn(rowData.getAfterColumnsList());}}}}private static void printColumn(NotNull ListColumn columns) {for (Column column : columns) {System.out.println(column.getName() : column.getValue() update column.getUpdated());}}
}运行后能看到监听到了数据库的变化 3 canal实现双写一致
导入依赖 dependencygroupIdtop.javatool/groupIdartifactIdcanal-spring-boot-starter/artifactIdversion1.2.1-RELEASE/version/dependency新建com.hmdp.cache.handler包
3.1 Redis操作封装
新建ShopRedisHandler类
package com.hmdp.cache.handler;import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;import com.github.benmanes.caffeine.cache.Cache;
import com.hmdp.entity.Shop;
import com.hmdp.service.IShopService;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.List;import static com.hmdp.utils.RedisConstants.CACHE_SHOP_KEY;Component
public class ShopRedisHandler implements InitializingBean {Autowiredprivate StringRedisTemplate redisTemplate;Autowiredprivate IShopService shopService;private static final ObjectMapper MAPPER new ObjectMapper();Resourceprivate CacheString, Object shopCache;// 缓存预热Overridepublic void afterPropertiesSet() throws Exception {// 初始化缓存// 1.查询商品信息ListShop shopList shopService.list();// 2.放入缓存for (Shop shop : shopList) {// 2.1.item序列化为JSONString json MAPPER.writeValueAsString(shop);// 2.2 存入caffeindString key CACHE_SHOP_KEY shop.getId();shopCache.put(key, shop);// 2.2.存入redisredisTemplate.opsForValue().set(key, json);}// // 3.查询商品库存信息
// ListItemStock stockList stockService.list();
// // 4.放入缓存
// for (ItemStock stock : stockList) {
// // 2.1.item序列化为JSON
// String json MAPPER.writeValueAsString(stock);
// // 2.2.存入redis
// redisTemplate.opsForValue().set(item:stock:id: stock.getId(), json);
// }}public void saveShop(Shop shop) {try {String json MAPPER.writeValueAsString(shop);String key CACHE_SHOP_KEY shop.getId();redisTemplate.opsForValue().set(key shop.getId(), json);} catch (JsonProcessingException e) {throw new RuntimeException(e);}}public void deleteShopById(Long id) {String key CACHE_SHOP_KEY id;redisTemplate.delete(key id);}
}3.2 编写监听器
新建ShopHandler类
通过实现EntryHandlerT接口编写监听器监听Canal消息。注意两点
实现类通过CanalTable(tb_item)指定监听的表信息EntryHandler的泛型是与表对应的实体类
package com.hmdp.cache.handler;import com.github.benmanes.caffeine.cache.Cache;
import com.hmdp.entity.Shop;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.handler.EntryHandler;import javax.annotation.Resource;import static com.hmdp.utils.RedisConstants.CACHE_SHOP_KEY;CanalTable(value tb_shop)
Component
public class ShopHandler implements EntryHandlerShop{Autowiredprivate ShopRedisHandler redisHandler;Resourceprivate CacheString, Object shopCache;Overridepublic void insert(Shop shop) {// 写数据到JVM进程缓存String key CACHE_SHOP_KEY shop.getId();shopCache.put(key , shop);// 写数据到redisredisHandler.saveShop(shop);}Overridepublic void update(Shop before, Shop after) {// 写数据到JVM进程缓存String key CACHE_SHOP_KEY after.getId();shopCache.put(key, after);// 写数据到redisredisHandler.saveShop(after);}Overridepublic void delete(Shop shop) {// 删除数据到JVM进程缓存String key CACHE_SHOP_KEY shop.getId();shopCache.invalidate(key);// 删除数据到redisredisHandler.deleteShopById(shop.getId());}
}
3.3 修改ShopServiceImpl.java
修改ShopServiceImpl中的update方法。注释掉删除缓存的操作。
缓存更新由监听器完成。 Overridepublic Result update(Shop shop) {Long id shop.getId();if(id null){return Result.fail(店铺id不能为空);}//1.更新数据库updateById(shop);// TODO 现在不再需要删除缓存了由canal监听数据库的变化然后更新缓存//2.删除缓存
// stringRedisTemplate.delete(CACHE_SHOP_KEY id);return Result.ok();}
3.4 测试
接下来可以自行调断点测试也可以运行项目然后更改数据库查看终端输出。
参考资料
mysql 8.0找不到my.ini配置文件解决方案_mysql80没有my.ini-CSDN博客
Docker整合canal 踩坑实录_com.alibaba.otter.canal.parse.exception.canalparse-CSDN博客
Canal 1.1.5 启动报错caching_sha2_password Auth failed_canal启动出现canal.deployer-1.1.5.jar:na-CSDN博客
Canal启动和运行出现的问题_canal启动闪退-CSDN博客