开发区网站开发语言,我是一条龙,佛山网站建设 合优,少儿编程几岁开始学最好基于Stream的消息队列
stream是一种数据类型#xff0c;可以实现一个功能非常完善的消息队列 key#xff1a;队列名称 nomkstream#xff1a;如果队列不存在是否自动创建#xff0c;默认创建 maxlen/minid#xff1a;设置消息队列的最大消息数量 *|ID 唯一id#xff1a;…基于Stream的消息队列
stream是一种数据类型可以实现一个功能非常完善的消息队列 key队列名称 nomkstream如果队列不存在是否自动创建默认创建 maxlen/minid设置消息队列的最大消息数量 *|ID 唯一id时间戳-递增数字 field value消息体键值对 XREAD 命令特点消息可回溯一个消息可以被多个消费者读取可以阻塞读取有消息漏读风险
消费者组 特点小费可回溯消费者争抢消息加快消费速度可以阻塞读取没有漏读风险消息确认机制。
实现
创建一个消费者组
XGROUP CREATE stream.orders g1 0 MKSTREAM //创建队列和消费者组 最终版秒杀代码
Slf4j
Service
public class VoucherOrderServiceImpl extends ServiceImplVoucherOrderMapper, VoucherOrder implements IVoucherOrderService {Resourceprivate ISeckillVoucherService seckillVoucherService;Resourceprivate RedisIdWorker redisIdWorker;Resourceprivate RedissonClient redissonClient;Resourceprivate StringRedisTemplate stringRedisTemplate;private static final DefaultRedisScriptLong SECKILL_SCRIPT;static {SECKILL_SCRIPT new DefaultRedisScript();SECKILL_SCRIPT.setLocation(new ClassPathResource(seckill.lua));SECKILL_SCRIPT.setResultType(Long.class);}private static final ExecutorService SECKILL_ORDER_EXECUTOR Executors.newSingleThreadExecutor();PostConstructprivate void init() {SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());}private class VoucherOrderHandler implements Runnable {Overridepublic void run() {while (true) {try {// 1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 ListMapRecordString, Object, Object list stringRedisTemplate.opsForStream().read(Consumer.from(g1, c1), // 组名和消费者名StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)), //创建空的读一个阻塞两秒钟StreamOffset.create(stream.orders, ReadOffset.lastConsumed()) // 消息队列的名字和读取标识);// 2.判断订单信息是否为空if (list null || list.isEmpty()) {// 如果为null说明没有消息继续下一次循环continue;}// 解析数据MapRecordString, Object, Object record list.get(0);MapObject, Object value record.getValue();VoucherOrder voucherOrder BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);// 3.创建订单createVoucherOrder(voucherOrder);// 4.确认消息 XACKstringRedisTemplate.opsForStream().acknowledge(s1, g1, record.getId());} catch (Exception e) {log.error(处理订单异常, e);handlePendingList();}}}private void handlePendingList() {while (true) {try {// 1.获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 0ListMapRecordString, Object, Object list stringRedisTemplate.opsForStream().read(Consumer.from(g1, c1),StreamReadOptions.empty().count(1),StreamOffset.create(stream.orders, ReadOffset.from(0)));// 2.判断订单信息是否为空if (list null || list.isEmpty()) {// 如果为null说明没有异常消息结束循环break;}// 解析数据MapRecordString, Object, Object record list.get(0);MapObject, Object value record.getValue();VoucherOrder voucherOrder BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);// 3.创建订单createVoucherOrder(voucherOrder);// 4.确认消息 XACKstringRedisTemplate.opsForStream().acknowledge(s1, g1, record.getId());} catch (Exception e) {log.error(处理订单异常, e);}}}}private void createVoucherOrder(VoucherOrder voucherOrder) {Long userId voucherOrder.getUserId();Long voucherId voucherOrder.getVoucherId();// 创建锁对象RLock redisLock redissonClient.getLock(lock:order: userId);// 尝试获取锁boolean isLock redisLock.tryLock();// 判断if (!isLock) {// 获取锁失败直接返回失败或者重试log.error(不允许重复下单);return;}try {// 5.1.查询订单int count query().eq(user_id, userId).eq(voucher_id, voucherId).count();// 5.2.判断是否存在if (count 0) {// 用户已经购买过了log.error(不允许重复下单);return;}// 6.扣减库存boolean success seckillVoucherService.update().setSql(stock stock - 1) // set stock stock - 1.eq(voucher_id, voucherId).gt(stock, 0) // where id ? and stock 0.update();if (!success) {// 扣减失败log.error(库存不足);return;}// 7.创建订单save(voucherOrder);} finally {// 释放锁redisLock.unlock();}}Overridepublic Result seckillVoucher(Long voucherId) {// 获取用户Long userId UserHolder.getUser().getId();// 获取订单idlong orderId redisIdWorker.nextId(order);// 1.执行lua脚本Long result stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(), userId.toString(), String.valueOf(orderId));int r result.intValue();// 2.判断结果是否为0if (r ! 0) {// 2.1.不为0 代表没有购买资格return Result.fail(r 1 ? 库存不足 : 不能重复下单);}// 3.返回订单idreturn Result.ok(orderId);}
}
-- 1.参数列表
-- 1.1.优惠券id
local voucherId ARGV[1]
-- 1.2.用户id
local userId ARGV[2]
-- 1.3.订单id
local orderId ARGV[3]-- 2.数据key
-- 2.1.库存key
local stockKey seckill:stock: .. voucherId
-- 2.2.订单key
local orderKey seckill:order: .. voucherId-- 3.脚本业务
-- 3.1.判断库存是否充足 get stockKey
if(tonumber(redis.call(get, stockKey)) 0) then-- 3.2.库存不足返回1return 1
end
-- 3.2.判断用户是否下单 SISMEMBER orderKey userId
if(redis.call(sismember, orderKey, userId) 1) then-- 3.3.存在说明是重复下单返回2return 2
end
-- 3.4.扣库存 incrby stockKey -1
redis.call(incrby, stockKey, -1)
-- 3.5.下单保存用户sadd orderKey userId
redis.call(sadd, orderKey, userId)
-- 3.6.发送消息到队列中 XADD stream.orders * k1 v1 k2 v2 ...
redis.call(xadd, stream.orders, *, userId, userId, voucherId, voucherId, id, orderId)
return 0
探店达人
完善点赞功能同一个用户只能点赞一次再点就取消如果点赞会高亮 Service
public class BlogServiceImpl extends ServiceImplBlogMapper, Blog implements IBlogService {Resourceprivate IUserService userService;Resourceprivate StringRedisTemplate stringRedisTemplate;Resourceprivate IFollowService followService;Overridepublic Result queryHotBlog(Integer current) {// 根据用户查询PageBlog page query().orderByDesc(liked).page(new Page(current, SystemConstants.MAX_PAGE_SIZE));// 获取当前页数据ListBlog records page.getRecords();// 查询用户records.forEach(blog - {this.queryBlogUser(blog);this.isBlogLiked(blog);});return Result.ok(records);}Overridepublic Result queryBlogById(Long id) {// 1.查询blogBlog blog getById(id);if (blog null) {return Result.fail(笔记不存在);}// 2.查询blog有关的用户queryBlogUser(blog);// 3.查询blog是否被点赞isBlogLiked(blog);return Result.ok(blog);}private void isBlogLiked(Blog blog) {// 1.获取登录用户UserDTO user UserHolder.getUser();if (user null) {// 用户未登录无需查询是否点赞return;}Long userId user.getId();// 2.判断当前登录用户是否已经点赞String key blog:liked: blog.getId();Double score stringRedisTemplate.opsForZSet().score(key, userId.toString());blog.setIsLike(score ! null);}Overridepublic Result likeBlog(Long id) {// 1.获取登录用户Long userId UserHolder.getUser().getId();// 2.判断当前登录用户是否已经点赞String key BLOG_LIKED_KEY id;Double score stringRedisTemplate.opsForZSet().score(key, userId.toString());if (score null) {// 3.如果未点赞可以点赞// 3.1.数据库点赞数 1boolean isSuccess update().setSql(liked liked 1).eq(id, id).update();// 3.2.保存用户到Redis的set集合 zadd key value scoreif (isSuccess) {stringRedisTemplate.opsForZSet().add(key, userId.toString(), System.currentTimeMillis());}} else {// 4.如果已点赞取消点赞// 4.1.数据库点赞数 -1boolean isSuccess update().setSql(liked liked - 1).eq(id, id).update();// 4.2.把用户从Redis的set集合移除if (isSuccess) {stringRedisTemplate.opsForZSet().remove(key, userId.toString());}}return Result.ok();}Overridepublic Result queryBlogLikes(Long id) {String key BLOG_LIKED_KEY id;// 1.查询top5的点赞用户 zrange key 0 4SetString top5 stringRedisTemplate.opsForZSet().range(key, 0, 4);if (top5 null || top5.isEmpty()) {return Result.ok(Collections.emptyList());}// 2.解析出其中的用户idListLong ids top5.stream().map(Long::valueOf).collect(Collectors.toList());String idStr StrUtil.join(,, ids);// 3.根据用户id查询用户 WHERE id IN ( 5 , 1 ) ORDER BY FIELD(id, 5, 1)// mybatis plus提供自定义查询ListUserDTO userDTOS userService.query().in(id, ids).last(ORDER BY FIELD(id, idStr )).list().stream().map(user - BeanUtil.copyProperties(user, UserDTO.class)).collect(Collectors.toList());// 4.返回return Result.ok(userDTOS);}Overridepublic Result saveBlog(Blog blog) {// 1.获取登录用户UserDTO user UserHolder.getUser();blog.setUserId(user.getId());// 2.保存探店笔记boolean isSuccess save(blog);if(!isSuccess){return Result.fail(新增笔记失败!);}// 3.查询笔记作者的所有粉丝 select * from tb_follow where follow_user_id ?ListFollow follows followService.query().eq(follow_user_id, user.getId()).list();// 4.推送笔记id给所有粉丝for (Follow follow : follows) {// 4.1.获取粉丝idLong userId follow.getUserId();// 4.2.推送String key FEED_KEY userId;stringRedisTemplate.opsForZSet().add(key, blog.getId().toString(), System.currentTimeMillis());}// 5.返回idreturn Result.ok(blog.getId());}Overridepublic Result queryBlogOfFollow(Long max, Integer offset) {// 1.获取当前用户Long userId UserHolder.getUser().getId();// 2.查询收件箱 ZREVRANGEBYSCORE key Max Min LIMIT offset countString key FEED_KEY userId;SetZSetOperations.TypedTupleString typedTuples stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(key, 0, max, offset, 2);// 3.非空判断if (typedTuples null || typedTuples.isEmpty()) {return Result.ok();}// 4.解析数据blogId、minTime时间戳、offsetListLong ids new ArrayList(typedTuples.size());long minTime 0; // 2int os 1; // 2for (ZSetOperations.TypedTupleString tuple : typedTuples) { // 5 4 4 2 2// 4.1.获取idids.add(Long.valueOf(tuple.getValue()));// 4.2.获取分数(时间戳long time tuple.getScore().longValue();if(time minTime){os;}else{minTime time;os 1;}}// 5.根据id查询blogString idStr StrUtil.join(,, ids);ListBlog blogs query().in(id, ids).last(ORDER BY FIELD(id, idStr )).list();for (Blog blog : blogs) {// 5.1.查询blog有关的用户queryBlogUser(blog);// 5.2.查询blog是否被点赞isBlogLiked(blog);}// 6.封装并返回ScrollResult r new ScrollResult();r.setList(blogs);r.setOffset(os);r.setMinTime(minTime);return Result.ok(r);}private void queryBlogUser(Blog blog) {Long userId blog.getUserId();User user userService.getById(userId);blog.setName(user.getNickName());blog.setIcon(user.getIcon());}
}好友关注
共同关注查交集Set
Service
public class FollowServiceImpl extends ServiceImplFollowMapper, Follow implements IFollowService {Resourceprivate StringRedisTemplate stringRedisTemplate;Resourceprivate IUserService userService;Overridepublic Result follow(Long followUserId, Boolean isFollow) {// 1.获取登录用户Long userId UserHolder.getUser().getId();String key follows: userId;// 1.判断到底是关注还是取关if (isFollow) {// 2.关注新增数据Follow follow new Follow();follow.setUserId(userId);follow.setFollowUserId(followUserId);boolean isSuccess save(follow);if (isSuccess) {// 把关注用户的id放入redis的set集合 sadd userId followerUserIdstringRedisTemplate.opsForSet().add(key, followUserId.toString());}} else {// 3.取关删除 delete from tb_follow where user_id ? and follow_user_id ?boolean isSuccess remove(new QueryWrapperFollow().eq(user_id, userId).eq(follow_user_id, followUserId));if (isSuccess) {// 把关注用户的id从Redis集合中移除stringRedisTemplate.opsForSet().remove(key, followUserId.toString());}}return Result.ok();}Overridepublic Result isFollow(Long followUserId) {// 1.获取登录用户Long userId UserHolder.getUser().getId();// 2.查询是否关注 select count(*) from tb_follow where user_id ? and follow_user_id ?Integer count query().eq(user_id, userId).eq(follow_user_id, followUserId).count();// 3.判断return Result.ok(count 0);}Overridepublic Result followCommons(Long id) {// 1.获取当前用户Long userId UserHolder.getUser().getId();String key follows: userId;// 2.求交集String key2 follows: id;SetString intersect stringRedisTemplate.opsForSet().intersect(key, key2);if (intersect null || intersect.isEmpty()) {// 无交集return Result.ok(Collections.emptyList());}// 3.解析id集合ListLong ids intersect.stream().map(Long::valueOf).collect(Collectors.toList());// 4.查询用户ListUserDTO users userService.listByIds(ids).stream().map(user - BeanUtil.copyProperties(user, UserDTO.class)).collect(Collectors.toList());return Result.ok(users);}
}
关注推送 拉模式读扩散每次都要获取消息然后根据时间戳进行排序延迟 推模式写扩散要把消息写给所有人内存占用高
推拉结合模式读写混合
普通人发消息由于粉丝比较少所以直接推模式塞进去 大V发消息首先要区分普通粉丝和活跃粉丝对于普通粉丝采用拉模式活跃粉丝推模式 本项目采用推模式
List里面只能角标查询而Sorted支持按照score值范围查询支持滚动分页 推送到粉丝收件箱
Overridepublic Result saveBlog(Blog blog) {// 1.获取登录用户UserDTO user UserHolder.getUser();blog.setUserId(user.getId());// 2.保存探店笔记boolean isSuccess save(blog);if(!isSuccess){return Result.fail(新增笔记失败!);}// 3.查询笔记作者的所有粉丝 select * from tb_follow where follow_user_id ?ListFollow follows followService.query().eq(follow_user_id, user.getId()).list();// 4.推送笔记id给所有粉丝for (Follow follow : follows) {// 4.1.获取粉丝idLong userId follow.getUserId();// 4.2.推送String key FEED_KEY userId;stringRedisTemplate.opsForZSet().add(key, blog.getId().toString(), System.currentTimeMillis());}// 5.返回idreturn Result.ok(blog.getId());}
滚动分页
滚动分页查询参数 max第一次查询时为当前时间戳 | 上一次查询的最小时间戳 min0 offset第一次直接0 | 在上一次的结果中与最小值一样的元素的个数 count3查询条数
Data
public class ScrollResult {private List? list;private Long minTime;private Integer offset;
}Overridepublic Result queryBlogOfFollow(Long max, Integer offset) {// 1.获取当前用户Long userId UserHolder.getUser().getId();// 2.查询收件箱 ZREVRANGEBYSCORE key Max Min LIMIT offset countString key FEED_KEY userId;// 滚动分页查询SetZSetOperations.TypedTupleString typedTuples stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(key, 0, max, offset, 2);// 3.非空判断if (typedTuples null || typedTuples.isEmpty()) {return Result.ok();}// 4.解析数据blogId、minTime时间戳、offsetListLong ids new ArrayList(typedTuples.size());long minTime 0; // 2// 判断重复int os 1; // 2for (ZSetOperations.TypedTupleString tuple : typedTuples) { // 5 4 4 2 2// 4.1.获取idids.add(Long.valueOf(tuple.getValue()));// 4.2.获取分数(时间戳long time tuple.getScore().longValue();if(time minTime){os;}else{minTime time;os 1;}}// 5.根据id查询blog 不能用listByids要用orderbyString idStr StrUtil.join(,, ids);ListBlog blogs query().in(id, ids).last(ORDER BY FIELD(id, idStr )).list();for (Blog blog : blogs) {// 5.1.查询blog有关的用户queryBlogUser(blog);// 5.2.查询blog是否被点赞isBlogLiked(blog);}// 6.封装并返回ScrollResult r new ScrollResult();r.setList(blogs);r.setOffset(os);r.setMinTime(minTime);return Result.ok(r);}
附近商户 导入店铺数据 Testvoid loadShopData() {// 1.查询店铺信息ListShop list shopService.list();// 2.把店铺分组按照typeId分组typeId一致的放到一个集合MapLong, ListShop map list.stream().collect(Collectors.groupingBy(Shop::getTypeId));// 3.分批完成写入Redisfor (Map.EntryLong, ListShop entry : map.entrySet()) {// 3.1.获取类型idLong typeId entry.getKey();String key SHOP_GEO_KEY typeId;// 3.2.获取同类型的店铺的集合ListShop value entry.getValue();ListRedisGeoCommands.GeoLocationString locations new ArrayList(value.size());// 3.3.写入redis GEOADD key 经度 纬度 memberfor (Shop shop : value) {// stringRedisTemplate.opsForGeo().add(key, new Point(shop.getX(), shop.getY()), shop.getId().toString());locations.add(new RedisGeoCommands.GeoLocation(shop.getId().toString(),new Point(shop.getX(), shop.getY())));}stringRedisTemplate.opsForGeo().add(key, locations);}} public Result queryShopByType(Integer typeId, Integer current, Double x, Double y) {// 1.判断是否需要根据坐标查询if (x null || y null) {// 不需要坐标查询按数据库查询PageShop page query().eq(type_id, typeId).page(new Page(current, SystemConstants.DEFAULT_PAGE_SIZE));// 返回数据return Result.ok(page.getRecords());}// 2.计算分页参数int from (current - 1) * SystemConstants.DEFAULT_PAGE_SIZE;int end current * SystemConstants.DEFAULT_PAGE_SIZE;// 3.查询redis、按照距离排序、分页。结果shopId、distanceString key SHOP_GEO_KEY typeId;GeoResultsRedisGeoCommands.GeoLocationString results stringRedisTemplate.opsForGeo() // GEOSEARCH key BYLONLAT x y BYRADIUS 10 WITHDISTANCE.search(key,GeoReference.fromCoordinate(x, y),new Distance(5000),RedisGeoCommands.GeoSearchCommandArgs.newGeoSearchArgs().includeDistance().limit(end));// 4.解析出idif (results null) {return Result.ok(Collections.emptyList());}ListGeoResultRedisGeoCommands.GeoLocationString list results.getContent();if (list.size() from) {// 没有下一页了结束return Result.ok(Collections.emptyList());}// 4.1.截取 from ~ end的部分ListLong ids new ArrayList(list.size());MapString, Distance distanceMap new HashMap(list.size());list.stream().skip(from).forEach(result - {// 4.2.获取店铺idString shopIdStr result.getContent().getName();ids.add(Long.valueOf(shopIdStr));// 4.3.获取距离Distance distance result.getDistance();distanceMap.put(shopIdStr, distance);});// 5.根据id查询ShopString idStr StrUtil.join(,, ids);ListShop shops query().in(id, ids).last(ORDER BY FIELD(id, idStr )).list();for (Shop shop : shops) {shop.setDistance(distanceMap.get(shop.getId().toString()).getValue());}// 6.返回return Result.ok(shops);}
}
用户签到
位图BitMap Overridepublic Result sign() {// 1.获取当前登录用户Long userId UserHolder.getUser().getId();// 2.获取日期LocalDateTime now LocalDateTime.now();// 3.拼接keyString keySuffix now.format(DateTimeFormatter.ofPattern(:yyyyMM));String key USER_SIGN_KEY userId keySuffix;// 4.获取今天是本月的第几天int dayOfMonth now.getDayOfMonth();// 5.写入Redis SETBIT key offset 1stringRedisTemplate.opsForValue().setBit(key, dayOfMonth - 1, true);return Result.ok();}
签到统计
Overridepublic Result signCount() {// 1.获取当前登录用户Long userId UserHolder.getUser().getId();// 2.获取日期LocalDateTime now LocalDateTime.now();// 3.拼接keyString keySuffix now.format(DateTimeFormatter.ofPattern(:yyyyMM));String key USER_SIGN_KEY userId keySuffix;// 4.获取今天是本月的第几天int dayOfMonth now.getDayOfMonth();// 5.获取本月截止今天为止的所有的签到记录返回的是一个十进制的数字 BITFIELD sign:5:202203 GET u14 0ListLong result stringRedisTemplate.opsForValue().bitField(key,BitFieldSubCommands.create().get(BitFieldSubCommands.BitFieldType.unsigned(dayOfMonth)).valueAt(0));if (result null || result.isEmpty()) {// 没有任何签到结果return Result.ok(0);}Long num result.get(0);if (num null || num 0) {return Result.ok(0);}// 6.循环遍历int count 0;while (true) {// 6.1.让这个数字与1做与运算得到数字的最后一个bit位 // 判断这个bit位是否为0if ((num 1) 0) {// 如果为0说明未签到结束break;}else {// 如果不为0说明已签到计数器1count;}// 把数字右移一位抛弃最后一个bit位继续下一个bit位num 1;}return Result.ok(count);}
VU统计 Testvoid testHyperLogLog() {String[] values new String[1000];int j 0;for (int i 0; i 1000000; i) {j i % 1000;values[j] user_ i;if(j 999){// 发送到RedisstringRedisTemplate.opsForHyperLogLog().add(hl2, values);}}// 统计数量Long count stringRedisTemplate.opsForHyperLogLog().size(hl2);System.out.println(count count);}