网站信用认证可以自己做吗,高级程序员培训,网络营销与网络推广的异同,公司做网站需要对于未上传完的文件片占用的磁盘空间#xff0c;则是通过RedisKafka实现动态延时任务的存储与下发执行#xff0c;保证与MySQL的最终一致性
从 “业务背景”、“技术方案”、“为何用RedisKafka”、“如何保证最终一致性” 四个角度来分析#xff1a; #x1f9e9; 一、业…对于未上传完的文件片占用的磁盘空间则是通过RedisKafka实现动态延时任务的存储与下发执行保证与MySQL的最终一致性
从 “业务背景”、“技术方案”、“为何用RedisKafka”、“如何保证最终一致性” 四个角度来分析 一、业务背景
在实现大文件 分片上传、断点续传 功能时有以下问题
用户上传的文件被分成多个小片每个片段会先暂存在服务器硬盘的临时目录中文件合并前这些文件片会一直占据磁盘空间如果用户上传到一半就不继续了无论是误操作还是恶意攻击这些碎片就永远不会被合并清理从而造成 磁盘空间长期占用、资源浪费、甚至导致服务崩溃被攻击压垮。
所以必须有一种机制自动清理这些未完成上传的文件碎片。 二、技术方案总览 核心目标每个文件片上传完后给它设置一个“延时任务”比如如果 5 秒内没有继续上传那就清理这个文件的所有碎片。 技术选型
Redis用来快速缓存上传状态、延时任务高性能读写Kafka作为可靠的异步消息中间件负责延时任务的可靠下发和最终落地执行高可靠性MySQL真正的数据持久化落地低频写、稳定性要求高。 三、为什么用 Redis Kafka
✅ Redis 的作用
用 ZSet有序集合存储所有文件上传任务及其过期时间每次上传文件片都会刷新延时时间例如当前时间 5秒定时调度器比如每秒执行扫描 Redis 中到期的任务如果任务到期未上传完就发消息到 Kafka。 Redis 保证快速更新和调度精度但本身不做复杂业务处理。 ✅ Kafka 的作用
Kafka 消费端监听 Redis 发过来的超时任务收到后执行任务删除对应磁盘碎片更新数据库中“未完成上传空间”等信息Kafka 保证消息可靠投递、可重试、可顺序、具备持久化和幂等保障。 Kafka 确保任务“最终会被执行”即使 Redis/调度器短暂异常消息也不会丢。 四、如何保证与 MySQL 的最终一致性
这是句子中的关键——“最终一致性”的意思是即使中间发生延迟、系统抖动等只要系统恢复MySQL 里的数据最终会被正确更新和 Redis 的状态保持一致。
⚙️ 具体机制
阶段数据存储层行为上传过程中Redis每次上传一个文件片就更新 Redis 中该任务的延迟时间同时更新 Redis 中该用户“未完成上传空间”字段任务过期KafkaRedis 将过期任务发到 Kafka 任务队列任务执行消费端处理1. 删除磁盘碎片2. 将 Redis 中统计的空间同步到 MySQL更新“未完成上传空间”字段3. 删除 Redis 中该任务记录上传完成应用服务合并文件Redis 中清除上传状态同时从 Redis 和数据库中扣除“未完成上传空间”并加到“已完成上传空间”
只要 Kafka 不丢消息消费端一定会最终完成这个任务 → 保证 MySQL 和 Redis 中的数据最终一致。 ✅ 总结 “对于未上传完的文件片占用的磁盘空间则是通过RedisKafka实现动态延时任务的存储与下发执行保证与MySQL的最终一致性” 可以如下理解
未上传完的文件片会临时占用磁盘需要被及时清理每个上传任务会被缓存到 Redis使用 ZSet到期的任务由 Redis 调度器识别后通过 Kafka 发送消息通知执行清理Kafka 消费端负责真正去清理碎片文件 同步更新数据库即便系统某一刻异常只要 Kafka 的消息还在最终一定会将 MySQL 的状态修正正确这样就达到了高性能缓存 高可靠调度 数据最终一致。 ✅ 简历中表达方式 为应对大文件上传中因中断或恶意攻击导致的磁盘碎片堆积问题设计并实现基于 Redis ZSet Kafka 的动态延时清理机制 使用 Redis 高性能缓存上传任务及延时状态超时任务通过 Kafka 异步下发保障任务可靠落地执行任务执行过程更新用户未完成上传空间并持久化至 MySQL最终实现数据库与缓存状态一致。 当然可以下面我通过一个实际例子完整演示如何用 Redis Kafka 来管理未上传完成的文件片所引发的磁盘空间占用问题以及如何保证最终一致性。 背景场景
假设用户 user123 在上传一个视频文件文件大小为 100MB分成了 10个片段每片10MB。系统配置
用户上传空间上限500MB磁盘碎片的保存目录按月存储例如 /tmp/upload/2025-06/user123/abcd1234/延时清理时间为5秒未上传下一个片段就触发清理任务 实际操作流程 第一次上传上传第1个文件片 用户上传第1片10MB服务器接收后将它缓存在 /tmp/upload/2025-06/user123/abcd1234/part1。 系统计算 md5 abcd1234该文件的唯一标识。 系统将任务写入 Redis ZSet 名称zset_upload_tasks_1
key: abcd1234user123
score: 当前时间戳 5秒延时清理时间在 Redis 的另一个 Hash 中记录 user123 - unfinishedSize 10MB任务调度器每秒扫描一次 ZSet发现该任务未超时 → 不处理。 上传第2片第2次上传
用户紧接着上传第2片系统发现已有这个任务 → 刷新 Redis 中的延时任务时间当前时间 5秒。unfinishedSize 累加至 20MB。 用户断开连接上传中断
假设用户上传完第3片共30MB后就关闭浏览器了系统检测不到任何新的上传。 ⏱️ Redis 延时任务触发 5秒过去调度器再次扫描 ZSet 发现 abcd1234user123 的任务已超时。 系统将该任务信息通过 Kafka 发送出去
{type: upload_timeout,userId: user123,fileMd5: abcd1234,size: 30MB,tmpPath: /tmp/upload/2025-06/user123/abcd1234/
}Kafka 消费端处理任务
Kafka 消费者监听到这条消息进行以下操作 删除临时目录 /tmp/upload/2025-06/user123/abcd1234/ 下所有文件片节省磁盘空间。 将 Redis 中 unfinishedSize 30MB 减去该任务对应的大小。 同步更新 MySQL 数据库中 use_space_unfinished use_space_unfinished - 30MB 日志记录该任务已完成确保幂等。 最终效果总结
时间点状态磁盘使用Redis 状态数据库状态上传中正常写入30MBunfinishedSize 30MBunchanged超时后Kafka触发-30MBunfinishedSize 0MBuse_space_unfinished - 30MB 一句话总结 “用户上传文件中断后Redis 中的延时任务自动触发清理逻辑通过 Kafka 下发异步清理任务释放磁盘空间并同步更新 MySQL 中的已用空间确保缓存和持久化层的最终一致性。” 分布式缓存与消息队列协作机制的理解Redis 延时调度能力Kafka 幂等消费处理数据一致性策略最终一致 如何基于 Redis Kafka 管理未上传完成的文件片防止服务器硬盘被恶意占用并确保最终一致性 技术分享稿未上传文件片的清理与一致性保障机制
在开发 SmartDrive 云盘系统 时我们遇到一个潜在的系统稳定性问题如果用户上传文件过程中中断例如恶意攻击、频繁取消上传等未完成的文件分片会持续占用服务器磁盘空间。由于这些文件尚未合并数据库中的用户“已使用空间”不会更新长此以往可能导致服务器磁盘资源被占满影响服务可用性。
为了解决这一问题我们设计并实现了一个基于 Redis Kafka 的动态延时任务系统实现文件片清理、用户空间回收以及数据一致性保障。 实现方案 文件上传分片记录与限时清理机制 用户上传每一个文件片时我们通过 文件MD5 用户ID 构造唯一标识并将该上传任务存入 Redis 的分布式 ZSet有序集合中设置延迟时间如5秒作为 score。若用户持续上传则不断刷新该任务的过期时间若中断延迟任务最终会被触发。 空间占用统计 Redis 中为每个用户维护两个字段use_space_finished已上传完毕和 use_space_unfinished上传中。每次分片上传成功后use_space_unfinished 累加对应片段大小但不立即写入数据库以减少数据库压力。 Kafka 异步任务分发 当调度器检测到某个上传任务超时后会将其作为“上传中断”事件写入 Kafka 任务队列。Kafka 消费者接收到该事件后执行清理逻辑删除临时目录下的文件片、扣减 Redis 和数据库中的 use_space_unfinished同时记录日志确保幂等性。 一致性保障 Redis 负责高频写操作Kafka 消费者在空闲时批量落库保证 Redis 与 MySQL 的最终一致性。通过唯一任务 ID 和幂等机制确保即使任务重复下发也不会出现重复扣减或误删文件。 举例说明
假设用户 user123 上传一个 100MB 的视频被分成 10 片。上传前3片后中断系统记录
Redis 中 use_space_unfinished 30MB临时目录占用磁盘空间为 30MB
当用户5秒内未继续上传调度器触发 Kafka 任务
删除 /tmp/upload/.../user123/md5xyz/ 下所有片段Kafka 消费者更新 Redis 和 MySQL使 use_space_unfinished - 30MB
最终系统磁盘被及时释放数据状态一致避免了无效数据积压。 效果与优势
⚙ 系统可用性提升及时清理无效数据防止磁盘被打爆⚡ 高性能Redis 实现高频写入Kafka 异步处理延时任务系统抗压能力强 安全性好即使遭遇恶意攻击也能快速识别并清理 一致性保障Redis 与 MySQL 数据最终一致确保用户体验无误 这个机制目前已经稳定运行在我们云盘系统的上传链路中极大地提升了系统的健壮性与可维护性。如果大家有类似文件上传、延时处理或一致性问题也可以参考我们这套 Redis Kafka 架构模式。 架构优点、对比其他方案、以及可选替代方案
✅ 为什么选择 Redis Kafka 架构模式
一、Redis 的优势高速缓存 精准调度
Redis 支持毫秒级延时处理通过 ZSet score timestamp 实现适合存储上传任务及调度时间。具备高并发处理能力可支持上传过程中对用户空间的快速写入与实时更新。使用内存存储避免频繁访问数据库缓解数据库压力。
二、Kafka 的优势高吞吐 异步解耦
Kafka 具备高可用、高吞吐、可回溯等特性适合作为任务通知的“缓冲带”。Redis 负责状态判断和触发Kafka 负责任务发出后的“慢处理”避免阻塞请求线程。异步解耦上传请求不需要等待清理逻辑完成极大提升响应速度与系统稳定性。
三、二者组合的优势
Redis 做“调度器”Kafka 做“执行者”既确保调度精度又实现任务缓冲与最终一致性。Redis 失效后 Kafka 仍可保留任务Kafka 消费异常可重试具备很好的鲁棒性和可恢复性。 与其他架构方案对比
架构模式优势缺点✅ Redis Kafka高性能、高解耦、最终一致性强系统设计复杂度略高❌ 直接写数据库简单直观并发高时容易写崩库写放大严重影响主业务❌ 仅用 Redis 实现延时清理写性能好无法可靠落盘易丢失任务需要自行实现幂等与持久化❌ Quartz / ScheduledExecutorService精度较低线程消耗高不适合大规模任务调度任务量大时调度不稳定❌ RabbitMQ / 延迟队列可替代 Kafka但吞吐与可靠性不如 Kafka且不易追踪任务执行状态 其他可选实现方式如果 Redis Kafka 不可用
1. 使用 Redis 延迟队列中间件如 RabbitMQ 的延迟插件、RocketMQ 延迟消息
替代 Kafka适合中小型系统且部署运维成本较低
2. 使用 Redis 定时任务轮询如 ScheduledExecutor Redis
定期扫描 Redis 中即将过期的任务并执行适用于任务量不大、调度精度要求不高的场景
3. 利用 Redisson 的 RDelayedQueue
支持分布式延迟队列适用于 Redis 原生不支持的延迟任务功能配合业务逻辑处理较轻的场景快速上线 总结为什么 Redis Kafka 更好
Redis 解决了高频访问场景下的快速读写 精准延时调度Kafka 解决了异步、解耦、幂等、高吞吐处理的问题两者结合
性能强、可靠性高调度精度好任务追踪容易系统扩展性强
这套架构非常适合大型上传系统中复杂的上传状态追踪、用户空间管控与数据一致性需求能够在面对高并发、突发流量、异常上传行为时保持系统稳定。 Redis ZSet Kafka 在上传文件片时的使用场景中的技术细节解析 背景场景简介
当用户上传大文件时通常会进行分片上传。但如果恶意用户仅上传部分文件片不合并完成这些碎片可能长时间占用磁盘资源导致服务器空间耗尽。
为此系统需监控这些“未合并文件片”的生命周期并在长时间未完成上传的情况下及时清理无效文件片。
这就引出了 Redis Kafka 的组合使用 ✅ Redis ZSet 的作用任务调度器
技术细节
使用 Redis 的有序集合ZSet 存储每个未完成上传任务。Key如 upload_timeout_task_bucket_{n}分桶方案Member任务唯一 ID例如 md5_useridScore当前时间戳 超时时间例如 5 分钟
示例
ZADD upload_timeout_task_bucket_1 1718178000 md5_1234表示用户1234上传的某个文件片在 1718178000约5分钟后仍未完成则视为超时。
扩展细节
使用内存 Map 缓存 md5userid → zset桶编号 映射提高查找性能。每次用户上传一个新分片时就更新对应任务的超时时间延迟5秒或更长时间再次触发判断。如果任务上传完成则从 ZSet 删除避免误清理。 ✅ Kafka 的作用任务下发执行者
技术细节
Redis 的调度器定期扫描 ZSet 中即将到期的任务例如每秒扫描 score ≤ 当前时间戳 的任务。对这些任务使用 Kafka 发送清理事件异步通知后端执行清理动作。
Kafka 消息示例
{taskId: md5_1234,userId: 1234,action: clean_unfinished_chunks,timestamp: 1718178000
}消息被下游服务异步消费删除文件碎片、释放空间并更新数据库与缓存状态。具备 幂等处理能力每条消息带唯一 ID消费者侧使用去重机制防止重复清理。 并发与一致性保障机制
分桶设计
将所有延时任务分散到多个 ZSet例如 10 个桶upload_timeout_task_bucket_0 到 _9按哈希值取模分配。减少单个 ZSet 的 size提升调度器扫描效率和延迟控制精度。
幂等性控制
Kafka 消息处理必须是幂等的即同一条任务消息不管消费几次最终状态一致。可在数据库或 Redis 中记录任务处理状态如 taskId → processed防止重复执行。
与数据库一致性
Redis 在内存中快速处理临时状态Kafka 负责写操作的异步最终一致性通知最终由数据库记录实际使用空间如已上传/未完成空间大小字段。 总结一句话 使用 Redis ZSet 精准调度未完成上传的文件片生命周期Kafka 异步可靠下发清理任务两者协作实现了高并发场景下的磁盘保护、状态可追踪、任务幂等、最终一致性处理有效防止恶意上传攻击保障系统稳定性。 ZSet 分桶
“ZSet 分桶”是一个在高并发或大规模数据处理场景下的性能优化策略。它的核心思想是将原本存储在一个 Redis 有序集合ZSet中的大量任务拆分成多个 ZSet 存储分散访问压力提高查询效率与调度精度。 为什么要“分桶”
当你把所有延迟任务都放在一个 Redis ZSet 里比如叫 upload_timeout_tasks随着时间推移这个集合会变得非常大。ZSet 的查询效率虽然不错但
ZRANGEBYSCORE 查询的是有序数据任务一多扫描就慢每秒调度器都要扫描一次“过期任务”访问压力集中单个 ZSet 容量太大也可能成为 Redis 内存碎片化或阻塞操作的隐患。
为了解决这个问题我们“分桶”。 ZSet 分桶的做法
举个例子
假设我们要存储 100 万个上传文件的延时清理任务不再用一个 ZSet而是
upload_timeout_task_bucket_0
upload_timeout_task_bucket_1
...
upload_timeout_task_bucket_9共 10 个“桶”ZSet。我们把任务“均匀分布”到这些桶中。
如何分布任务
可以根据任务的哈希值取模分桶例如
int bucketIndex (md5 userId).hashCode() % 10;将这个任务放入第 bucketIndex 个桶中。
这样每个 ZSet 只维护一小部分任务大大减少了单个桶的查询开销。 调度器如何配合分桶
原来调度器每秒只扫描一个 ZSet
ZRANGEBYSCORE upload_timeout_tasks 0 currentTime现在变成轮询每个桶
for i in 0..9:ZRANGEBYSCORE upload_timeout_task_bucket_i 0 currentTime这样每次每个桶扫描的数据量变小了调度延迟更小、吞吐量更高也方便并发处理。 ✅ 总结
特性说明 目的降低单个 Redis ZSet 的压力提高调度效率和查询性能⚙️ 方式将任务分散放入多个 ZSet桶中按照哈希取模分配 优势并发性能更强、查询更快、调度更精准、避免单点瓶颈 适用场景上传分片延时清理、定时任务调度、过期资源管理等场景 Kafka 一、Kafka 是什么
Kafka 是一个高吞吐、可持久化的分布式消息队列系统主要特点
特性说明发布-订阅模式生产者发布消息消费者订阅处理消息高吞吐每秒处理百万级消息持久化数据写入磁盘支持消息持久保存可扩展性支持多 Broker 组成集群水平扩展容错性强支持副本机制节点宕机也不会丢消息 二、Kafka 在“文件片延迟清理”中的作用
在我们的系统中用户上传文件是通过分片方式进行的
如果某些分片长时间未上传完成它们就一直占据服务器磁盘空间我们通过 Redis 的 ZSet 创建动态延时任务比如“10分钟后检查是否上传完成”但延时任务并不直接清理磁盘而是将清理请求发送到 Kafka 这个消息队列中Kafka 中的消费者再异步消费任务做清理、更新数据库等工作。 三、流程图示简化
[用户上传文件片] ↓
[Redis ZSet 生成延时任务]上传超时5分钟↓
[调度器扫描 ZSet到期后将任务发送到 Kafka Topic]↓
[Kafka 消费者消费消息]↓
[清理临时分片 更新 Redis/MySQL 空间使用数据]四、Kafka 的使用细节
1. Producer生产者发送消息
当某个文件片超时未合并
// 伪代码发送清理任务
kafkaTemplate.send(file-cleanup-topic, msg);消息内容一般包括
用户 ID文件 MD5分片路径文件大小
2. Consumer消费者消费清理任务
KafkaListener(topics file-cleanup-topic)
public void cleanupHandler(String msgJson) {// 解析消息// 删除磁盘中的分片// 更新 Redis 的未完成空间大小// 更新数据库最终一致性
}3. 幂等性保证
Kafka 提供 消息持久化不怕宕机 消息重复消费你需要在处理逻辑中加入幂等性设计比如 先检查文件是否已清理过或根据“唯一ID”判断是否是同一任务。 ✅ 五、使用 Kafka 的优势
优势说明解耦调度与处理逻辑Redis 延时任务调度只负责“发通知”真正清理由 Kafka 消费者异步处理提升系统性能与可扩展性通过 Kafka 实现异步批量处理避免同步阻塞高可用保障Kafka 的持久化机制确保任务不会丢失支持幂等处理可以防止重复删除、误删等操作 一句话总结 在文件上传场景中为防止未完成分片长期占用磁盘空间我们基于 Redis ZSet 构建延迟任务调度机制并结合 Kafka 进行任务异步下发和消费实现高吞吐、高可用的碎片清理架构同时通过唯一任务 ID 实现幂等处理与最终一致性保障。 系统性地解释整个链路 场景背景
用户上传大文件时会被切分为多个文件片。为防止用户上传未完成就退出或恶意攻击我们使用Redis Kafka架构实现
文件片超时未合并则定时清理延迟任务上传空间计算最终一致性Redis缓存MySQL同步 整体流程图
上传分片 → Redis ZSet 记录延时任务 → 到期 → 发送 Kafka 消息 → Kafka 消费者执行任务↓ ↓
更新 Redis 缓存使用量未完成 清理临时文件 更新 Redis/MySQL 空间使用情况一、如何将任务发送到 Kafka Topic
我们使用 Spring Boot Kafka 的集成方式Spring for Apache Kafka。
示例代码发送任务
Autowired
private KafkaTemplateString, String kafkaTemplate;public void sendCleanupTask(String userId, String fileMd5, long unfinishedSize) {JSONObject task new JSONObject();task.put(userId, userId);task.put(fileMd5, fileMd5);task.put(unfinishedSize, unfinishedSize);kafkaTemplate.send(file-cleanup-topic, task.toJSONString());
}当 Redis 的 ZSet 检测到任务到期即当前时间 score就调用该方法将任务发到 Kafka 的 file-cleanup-topic 主题中。 二、Kafka 消费者如何消费消息
你可以通过 KafkaListener 注解监听某个 Topic
KafkaListener(topics file-cleanup-topic, groupId file-cleaner-group)
public void consumeCleanupTask(String messageJson) {JSONObject task JSONObject.parseObject(messageJson);String userId task.getString(userId);String fileMd5 task.getString(fileMd5);long size task.getLong(unfinishedSize);// 1. 删除 Redis 中对应的文件分片记录redisTemplate.delete(chunk: userId : fileMd5);// 2. 删除磁盘临时分片fileService.deleteTempChunks(userId, fileMd5);// 3. 更新 Redis 中的 use_space_unfinished 字段redisTemplate.opsForHash().increment(user: userId, use_space_unfinished, -size);// 4. 将最终结果异步持久化到数据库userMapper.decreaseUnfinishedSize(userId, size);
}三、Redis 与 MySQL 的数据更新方式
Redis 缓存的设计
我们使用 Hash 存储用户空间信息
Key: user:{userId}
Field: use_space // 已上传完毕的空间
Field: use_space_unfinished // 未上传完毕的空间更新逻辑
每上传一片use_space_unfinished chunkSize清理use_space_unfinished - chunkSize合并use_space_unfinished - totalSize, use_space totalSize
MySQL 最终一致性异步批量同步
UPDATE user_space
SET use_space_unfinished use_space_unfinished - #{size}
WHERE user_id #{userId};这一步是为了防止 Redis 异常丢失数据时系统还能恢复一致性。 ✅ 总结一下完整链路
步骤描述1. 上传分片用户上传某个分片时记录上传大小更新 Redis 中的 use_space_unfinished2. 创建延迟任务使用 Redis ZSet 记录fileMd5userId 上传时间戳3. 定时扫描任务到期后调用 KafkaTemplate.send() 发送清理任务到 Kafka4. Kafka 消费者处理监听 topic执行任务清文件 更新 Redis 更新数据库5. 最终一致性Redis 快速缓存写MySQL 异步持久化确保数据准确 一句话总结 为了防止文件片上传未完成导致磁盘资源被长期占用我们使用 Redis ZSet 实现延时任务调度通过 Kafka 实现任务异步消费。Redis 记录用户未完成空间信息以减轻数据库压力Kafka 消费者在任务触发后清理磁盘分片并更新 Redis 与数据库最终实现空间使用信息的一致性同步和系统高性能处理。