红河做网站,网站建设公司客户来源渠道,wordpress进不去后台,网站伪静态当遇到并发的客户端请求时#xff0c;为了缓解服务端的处理压力#xff0c;当请求对响应的处理的实时性要求不高时#xff0c;可以实现一个异步的请求消息队列。
一种实现策略是使用redis的zset#xff0c;将消息的到期处理时间作为score#xff0c;然后用多个线程去轮训…当遇到并发的客户端请求时为了缓解服务端的处理压力当请求对响应的处理的实时性要求不高时可以实现一个异步的请求消息队列。
一种实现策略是使用redis的zset将消息的到期处理时间作为score然后用多个线程去轮训获取zset中的任务并进行处理。
需要提前考虑一个问题
如何避免一个任务被多次处理
一种解决方案是当多个线程获取到任务时调用redis的zrem命令将该任务从指定的zset中移除(利用了redis处理命令时是顺序执行的)。
环境
JDK17两个jar包 Jedisfastjson2
代码
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.TypeReference;
import redis.clients.jedis.Jedis;import java.lang.reflect.Type;
import java.util.List;
import java.util.UUID;// 基于Redis实现的延迟队列
public class RedisDelayingQueueT {static class TaskItemT {public String id;public T msg;}// fastjson序列化对象时如果存在泛型需要使用TypeReferenceprivate Type TaskType new TypeReferenceTaskItemT(){}.getType();private Jedis jedis;private String queueKey;public RedisDelayingQueue(Jedis jedis, String queueKey) {this.jedis jedis;this.queueKey queueKey;}// 将任务添加到 zset 中// 分数是延时的时间public void delay(T msg) {TaskItemT task new TaskItemT();task.id UUID.randomUUID().toString();task.msg msg;// 序列化任务String s JSON.toJSONString(task);jedis.zadd(queueKey, System.currentTimeMillis() 5000, s);}public void loop() {while(!Thread.interrupted()) {// 从zset中取出一个任务ListString values jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);if(values.isEmpty()) {try {Thread.sleep(500);} catch(InterruptedException e) {break;}continue;}String s values.iterator().next();if(jedis.zrem(queueKey, s) 0) {TaskItemT task JSON.parseObject(s, TaskType);this.handleMsg(task.msg);}}}public void handleMsg(T msg) {System.out.println(msg);}
}优化
通过上面loop中代码多个线程获取到values时可能会被多个线程同时取到然后再调用zrem命令去竞争的删除该值所以会有很多无用的网络请求发送到redis。更容易想到的方案是将取值然后删除的操作变成原子性的两种实现方案
通过对代码块进行加锁的方式利用redis中lua脚本的原子执行的特点
代码块加锁
这种方案不太好如果两个命令之间发生了网络错误或者延迟将造成其它线程的阻塞 public void synchronizedLoop() {while(!Thread.interrupted()) {synchronized(this) {// 从zset中取出一个任务ListString values jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);if(values.isEmpty()) {try {Thread.sleep(500);} catch(InterruptedException e) {break;}continue;}String s values.iterator().next();if(jedis.zrem(queueKey, s) 0) {TaskItemT task JSON.parseObject(s, TaskType);this.handleMsg(task.msg);}}}}Lua脚本
local key KEYS[1]
local task redis.call(ZPOPMIN, key)
if task and next(task) ! nil thenredis.call(ZREM, key, task[1])return task[1]
elsereturn nil
end通过查阅文档发现ZRANGEBYSCORE从6 版本开始已经过时了所以这里使用ZPOPMIN来获取分数最小的value可以达到相同的效果。
通过Jedis的eval函数调用redis执行lua脚本的命令。 public void luaLoop() {while(!Thread.interrupted()) {Object ans jedis.eval(script, 1, queueKey);if(ans ! null) {String task (String) ans;TaskItemT taskItem JSON.parseObject(task, TaskType);this.handleMsg(taskItem.msg);}else{try{Thread.sleep(500);}catch(Exception e) {break;}}}}为什么可以优化
使用lua脚本的方式使得一个线程如果zset中有任务都会成功获取任务而不会多个线程同时拿到同一个任务再去竞争删除减少了无效的网络IO
测试程序
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;public class Main {public static void main(String[] args) {JedisPool jedisPool new JedisPool(url-of-redis, 6379, username, pass);Jedis jedis jedisPool.getResource();RedisDelayingQueueString queue new RedisDelayingQueue(jedis, q-demo);// 创建一个线程充当生产者并向redis中存10个异步任务Thread producer new Thread() {public void run() {for (int i 0; i 10; i) {queue.delay(codehole i);}}};// 创建一个线程充当消费者不断从redis中取任务并执行Thread consumer new Thread() {public void run() {queue.luaLoop();}};producer.start();consumer.start();try {// 等待生产者线程执行结束producer.join();Thread.sleep(6000);consumer.interrupt();consumer.join();}catch(InterruptedException e) {e.printStackTrace();}}
}一些问题
这个问题是关于Jedis的问题因为我通过上面的方式发起redis请求实际上是存在并发问题的如果将上述代码中的延时去掉这个问题发生的概率将大大发生主要是因为Jedis不是线程安全的换句话说通过JedisPool获取redis连接的实例并发访问是是通过同一个socket发送数据的。
这里使用时最好是每个线程都用有一个Jedis的实例避免数据竞争问题.这里只是用了两个线程所以简单手动使用两个redis实例如果有多个消费者存在的情况下还是每个线程单独持有一个Jedis才能解决问题。 private Jedis readJedis;private Jedis writeJedis;总结
本篇文章记录了使用zset实现一个简单异步队列的过程然后对于第一次实现存在的一个问题使用lua或者锁的方式优化网络IO。使用锁的方式会降低程序的并发度所以一般使用lua脚本的方式来实现。