衡水企业网站制作报价,在深圳市住房和建设局网站,seo优化介绍,做文案的网站有些什么软件Disruptor
1、基本介绍
说到队列#xff0c;除了常见的mq中间件#xff0c;java中也自带线程安全的BlockingQueue#xff0c;但是BlockingQueue通过在入队和出队时加锁的方式避免并发操作#xff0c;性能上会大打折扣。 而Disruptor是一个线程安全、低延迟、吞吐量高的队…Disruptor
1、基本介绍
说到队列除了常见的mq中间件java中也自带线程安全的BlockingQueue但是BlockingQueue通过在入队和出队时加锁的方式避免并发操作性能上会大打折扣。 而Disruptor是一个线程安全、低延迟、吞吐量高的队列并且解决BlockingQueue加锁带来的性能下降问题十分适合单机使用。 Disruptor是英国外汇交易公司LMAX开发的一个高性能队列研发的初衷是解决内存队列的延迟问题。基于Disruptor开发的系统单线程能支撑每秒600万订单。
2、与BlockingQueue对比
使用CAS代替锁多播模式同一事件可以交给多个消费者处理基于环形数组RingBuffer创建时就固定长度不出现空间新分配情况减少垃圾回收
这是官网与BlockingQueue对比的延迟直方图可以看出BlockingQueue出现延迟的机率比Disruptor高得多。 3、生产者消费者模式
在Disruptor中生产者与消费者支持一对一、一对多或者多对多的关系。下面举例如何实现
引入最新包 dependencygroupIdcom.lmax/groupIdartifactIddisruptor/artifactIdversion4.0.0/version/dependency定义一个商品
Data
public class Goods {private String name;}定义生产者
public class Producer {private final RingBufferGoods ringBuffer;public Producer(RingBufferGoods ringBuffer) {this.ringBuffer ringBuffer;}/*** 生产货品* param goodsName*/public void onData(String goodsName) {long sequence ringBuffer.next();try {Goods goods ringBuffer.get(sequence);goods.setName(goodsName);} finally {ringBuffer.publish(sequence);}}
}
定义消费者
Data
public class Consumer implements EventHandlerGoods{private String name;public Consumer(String name){this.name name;}Overridepublic void onEvent(Goods goods, long l, boolean b) {//消费者接收到货品System.out.println(name消费了goods.getName());}Overridepublic void onBatchStart(long batchSize, long queueDepth) {EventHandler.super.onBatchStart(batchSize, queueDepth);}Overridepublic void onStart() {EventHandler.super.onStart();}Overridepublic void onShutdown() {EventHandler.super.onShutdown();}Overridepublic void onTimeout(long sequence) throws Exception {EventHandler.super.onTimeout(sequence);}Overridepublic void setSequenceCallback(Sequence sequenceCallback) {EventHandler.super.setSequenceCallback(sequenceCallback);}
}一个生产者对一个消费者 public class DisruptorDemo {public static void main(String[] args) throws InterruptedException {DisruptorGoods disruptor new Disruptor(Goods::new,16, // RingBuffer 大小必须是 2 的 N 次方Executors.defaultThreadFactory(), //线程池ProducerType.SINGLE, //指定单生产者还是多生产者new YieldingWaitStrategy() //等待策略);RingBufferGoods ringBuffer disruptor.getRingBuffer();//单生产者单消费者disruptor.handleEventsWith(new Consumer(Consumer1));disruptor.start();Producer producer new Producer(ringBuffer);while (true){producer.onData(goodsUUID.randomUUID());Thread.sleep(1000);}}
}一个生产者对多个消费者
消费者按顺序消费 public class DisruptorDemo {public static void main(String[] args) throws InterruptedException {DisruptorGoods disruptor new Disruptor(Goods::new,16, // RingBuffer 大小必须是 2 的 N 次方Executors.defaultThreadFactory(), //线程池ProducerType.MULTI, //指定单生产者还是多生产者new YieldingWaitStrategy() //等待策略);RingBufferGoods ringBuffer disruptor.getRingBuffer();//多个消费者按顺序消费disruptor.handleEventsWith(new Consumer(Consumer1)).then(new Consumer(Consumer2));disruptor.start();Producer producer new Producer(ringBuffer);while (true){producer.onData(goodsUUID.randomUUID());Thread.sleep(1000);}}
}多播模式同一事件可以交给多个消费者处理 只需要将上述代码修改一下即可 //Consumer1、Consumer2、Consumer3先消费Consumer4后消费disruptor.handleEventsWith(new Consumer(Consumer1),new Consumer(Consumer2),new Consumer(Consumer3)).then(new Consumer(Consumer4));多个生产者对多个消费者 public class DisruptorDemo {public static void main(String[] args) throws InterruptedException {DisruptorGoods disruptor new Disruptor(Goods::new,16, // RingBuffer 大小必须是 2 的 N 次方Executors.defaultThreadFactory(), //线程池ProducerType.MULTI, //指定单生产者还是多生产者new YieldingWaitStrategy() //等待策略);RingBufferGoods ringBuffer disruptor.getRingBuffer();disruptor.handleEventsWith(new Consumer(Consumer1)).then(new Consumer(Consumer2));disruptor.start();Producer producer1 new Producer(ringBuffer);Producer producer2 new Producer(ringBuffer);Producer producer3 new Producer(ringBuffer);while (true){producer1.onData(goodsUUID.randomUUID());producer2.onData(goodsUUID.randomUUID());producer3.onData(goodsUUID.randomUUID());Thread.sleep(1000);}}
}除了上述多播模式中多个消费者各自处理事件一个event事件会同时被多个消费者处理其实还有Disruptor另一种模式多个消费者合作处理一批事件一个event事件会被其中一个消费者处理由Disruptor 的 WorkPool 支持不过在4.0中已经被去除了 看了github的issue作者大概意思说难以维护并且在LMAX公司也不会用到WorkPool所以就去除了。 4、RingBuffer原理
Disruptor内部由环形数组Ring Buffer数组必须为2的n次方。 1、Ring Buffer使用环形数组有效避免线性数组index越界问题而且数组内元素的内存地址是连续的对CPU缓存友好在硬件级别数组中的元素是会被预加载的所以RingBuffer中CPU无需时不时去主内存加载数组中的下一个元素。通过对cursor指针的移动可以实现数据在数组中的环形存取。 2、在多生产者场景下多个生产者会进行竞争防止读到还未写的元素。引入了一个与Ring Buffer大小相同的bufferavailable Buffer用来判断Ring Buffer某个元素是否已经就绪。 3、为什么available Buffer也做成圈呢这样做是防止把上一轮的数据当成这一轮的数据错误判断Ring Buffer元素可用。 4、为什么Ring Buffer要2的n次方因为会涉及到二进制运算来算出元素位置在源码中可以找到。 5、具体RingBuffer写数据和读数据流程可以参考美团技术博客https://tech.meituan.com/2016/11/18/disruptor.html
5、等待策略
生产者和消费者都可能出现速度过快的情况比如队列满了生产者需要等待消费者消费后才能生产或者消费者消费过快导致队列为空进而需要等待生产者生产。 Disruptor目前一共内置了8种等待策略。 BlockingWaitStrategy用了ReentrantLock的等待唤醒机制实现等待逻辑是默认策略对CPU的消耗最小BusySpinWaitStrategy 持续自旋会消耗大量CPU资源LiteBlockingWaitStrategy 基于BlockingWaitStrategy非重入锁的阻塞等待策略在没有锁竞争的时候会省去唤醒操作TimeoutBlockingWaitStrategy 超时等待策略它会使消费者线程进入阻塞状态在指定的时间内等待新的事件如果等待超时则退出LiteTimeoutBlockingWaitStrategy 基于TimeoutBlockingWaitStrategy在没有锁竞争的时候会省去唤醒操作SleepingWaitStrategy 三段式第一阶段自旋第二阶段执行Thread.yield交出CPU第三阶段睡眠执行时间反复的睡眠YieldingWaitStrategy 二段式第一阶段自旋第二阶段执行Thread.yield交出CPUPhasedBackoffWaitStrategy 四段式第一阶段自旋指定次数第二阶段自旋指定时间第三阶段执行Thread.yield交出CPU第四阶段调用成员变量的waitFor方法这个成员变量可以被设置为BlockingWaitStrategy、LiteBlockingWaitStrategy、SleepingWaitStrategy这三个中的一个
6、结束
Disruptor简单的介绍已经结束了点个赞再走啦~