wordpress博客空间,百度快照优化seo,有教做鱼骨图的网站吗,请人做网站收费多少钱什么是阻塞队列
阻塞队里是在普通的队列#xff08;先进先出队列#xff09;基础上#xff0c;做出了扩充
线程安全 标准库中原有的队列 Queue 和其子类#xff0c;默认都是线程不安全的 具有阻塞特性 如果队列为空#xff0c;进行出队列操作#xff0c;此时就会出现阻…什么是阻塞队列
阻塞队里是在普通的队列先进先出队列基础上做出了扩充
线程安全 标准库中原有的队列 Queue 和其子类默认都是线程不安全的 具有阻塞特性 如果队列为空进行出队列操作此时就会出现阻塞。一直阻塞到其他线程往队列里添加元素为止如果队列满了进行入队列操作此时就会出现阻塞。一直阻塞到其他线程从队列里取走元素为止
基于阻塞队列最大的应用场景就是实现“生产者消费者模型”日常开发中常见的编程手法
生产者消费者模型
比如 小猪佩奇一家准备包饺子成员有佩奇猪爸爸和猪妈妈外加一个桌子
佩奇负责擀面皮猪爸爸和猪妈妈负责包饺子桌子用来放你擀好的面皮 每次佩奇擀好一个面皮后就放在桌子上猪爸爸和猪妈妈就用这个面皮包出一个饺子
此时
佩奇就是面皮的生产者——生产者猪爸爸和猪妈妈就是面皮的消费者——消费者桌子就是阻塞队列——阻塞队列
为什么是是阻塞队列而不是普通队列 因为阻塞队列可以很好的协调生产者和消费者
若佩奇擀面皮很快不一会桌子上就满了 阻塞队列佩奇就休息一下等面皮被消耗一些之后继续再擀普通队列不会停放不下了也一直擀 若猪爸爸和猪妈妈包的很快不一会桌子上就空了 阻塞队列猪爸爸和猪妈妈休息一下等到面皮擀出来之后再包普通队列不会停没面皮了也一直包
好处
上述生产者消费者模型在后端开发中经常会涉及到 当下后端开发常见的结构——“分布式系统”不是一台服务器解决所有问题而是分成了多个服务器服务器之间相互调用
主要有两方面的好处
1. 服务器之间解耦合
我们希望见到“低耦合”
模块之间的关联程度/影响程度
通常谈到的“阻塞队列”是代码中的一个数据结构 但是由于这个东西太好用了以至于会把这样的数据结构单独封装成一个服务器程序并且在单独的服务器机器上进行部署 此时这样的饿阻塞队列有了一个新的名字“消息队列”Message QueueMQ
如果是直接调用 编写 A 和 B 代码中会出现很多对方服务器相关的代码并且此时如果 B 服务器挂了A 可能也会直接受到影响再并且如果后续想加入一个 C 服务器此时对 A 的改动就很大 如果是通过阻塞队列 A 之和队列通信B 也只和队列通信A 和 B 互相不知道对方的存在代码中就更没有对方的影子 看起来A 和 B 之间是解耦合了但是 A 和队列B 和队列之间不是引入了新的耦合吗耦合的代码在后续的变更工程中比较复杂容易产生 bug但消息队列是成熟稳定的产品代码是稳定的不会频繁更改。A、B 和队列之间的耦合对我们的影响微乎其微再增加 C 服务器也很方便也不会影响到原有的 A 和 B 服务器 2. “削峰填谷”的效果
通过中间的阻塞队列可以起到削峰填谷的效果在遇到请求量激增突发的情况下可以有效保护下游服务器不会被请求冲垮
阻塞队列的作用就相当与三峡大坝在三峡的防汛作用 A 向队列中写入数据变快了但是 B 仍然可以按照原有的速度来消费数据阻塞队列扛下了这样的压力就像三峡大坝抗住上游的大量水量的压力如果是直接调用A 收到多少请求B 也收到多少那很可能直接就把 B 给搞挂了当 A 不再写入数据的时候但队列中还存有数据可以继续工给 B 问题
为啥一个服务器收到的请求变多就容易挂 一台服务器就是一台“电脑”上面就提供了一些硬件资源包括但不限于 CPU内存硬盘网络带宽…就算你这个及其配置再好硬件资源也是有限的服务器每次收到一个请求处理这个请求的过程就都需要执行一系列的代码在执行这些代码的过程中就需要消耗一定的硬件资源CPU内存硬盘网络带宽…这些请求小号的总的硬件资源的量超过了及其能提供的上限那么此时机器就会出现卡死程序直接崩溃等… 在请求激增的时候A 为啥不会挂队列为啥不会挂反而是 B 更容易挂呢 A 的角色是一个“网关服务器”收到客户端的请求再把请求转发给其他的服务器 这样的服务器里的代码做的工作比较简单单纯的数据转发消耗的硬件资源通常更少处理一个请求消耗的资源更少同样的配置下就能支持更多的请求处理 同理队列其实也是比较简单的程序单位请求消耗的硬件资源也是比较少见的B 这个服务器是真正干活的服务器要真正完成一系列的业务逻辑 这一系列的工作代码量非常庞大消耗的时间很多消耗的系统硬件资源也是更多的 类似的像 MySQL 这样的数据库处理每个请求的时候做的工作就是比较多的消耗的硬件资源也是比较多的因此 MySQL 也是后端系统中容易挂的部分 对应的像 Redis 这种内存数据库处理请求做的工作远远少于 MySQL消耗的资源更少Redis 就比 MySQL 硬朗很多不容易挂
代价
需要更多的机器来部署这样的消息队列小代价A 和 B 之间的通信延迟会变长 对于 A 和 B 之间的调用要求响应时间比较短就不太适合了
每个技术都有优缺点不能无脑吹也不能无脑黑
比如微服务
本质上就是把分布式系统服务拆的更细了每个服务都很小只做一项功能非常适合大公司部门分的很细但需要更多的机器处理请求需要更多的响应时间更复杂的后端结构运维成本水涨船高
Java 自带的阻塞队列
阻塞队列在 Java 标准库中也提供了现成的封装——BlockingQueue BlockingQueue 本质上是一个接口不能直接 new只能 new 一个类因为是继承与 Queue所以 Queue 的一些操作offer、poll 这些在 BlockingQueue 中同样可以使用不过不建议使用因为都不能阻塞BlockingQueue 提供了另外两个专属方法都能阻塞 put——入列take——出队列 BlockingQueueString queue new ArrayBlockingQueue(1000);capacity 指的是容量是一个需要加上的参数 public class Demo10 { public static void main(String[] args) throws InterruptedException { BlockingQueueString queue new ArrayBlockingQueue(3); queue.put(111); System.out.println(put成功); queue.put(111); System.out.println(put成功); }
}
//运行结果
put成功
put成功
put成功只打印了三个说明第四次 put 的时候容量不够阻塞了 public class Demo10 { public static void main(String[] args) throws InterruptedException { BlockingQueueString queue new ArrayBlockingQueue(3); queue.put(111); System.out.println(put 成功); queue.put(111); System.out.println(put 成功); queue.take(); System.out.println(take 成功); queue.take(); System.out.println(take 成功); queue.take(); System.out.println(take 成功); }
}
//运行结果
put 成功
put 成功
take 成功
take 成功由于只有 put 了两次所以也只有两次 take随后阻塞住了 public class Demo11 { public static void main(String[] args) { BlockingQueueInteger queue new ArrayBlockingQueue(1000); Thread t1 new Thread(() - { int i 1; while(true){ try { queue.put(i); System.out.println(生产者元素i); i; Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); Thread t2 new Thread(() - { while(true) { try { Integer i queue.take(); System.out.println(消费者元素i); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); t1.start(); t2.start(); }
}上述程序中一个线程生产一个线程消费实际开发中通常可能是多个线程生产多个线程消费 自己实现一个阻塞队列
普通队列
基于数组的队列 实现一个基础的队列
//此处不考虑泛型参数只是基于 String 进行存储
class MyBlockingQueue { private String[] data null; private int head 0; private int tail 0; private int size 0; public MyBlockingQueue(int capacity) { data new String[capacity]; } public void put(String s) { if(size data.length) { //队列满了 return; } data[tail] s; tail; if(tail data.length){ tail 0; } size; } public String take() { if(size 0) { //队列为空 return null; } String ret data[head]; head; if(head data.length){ head 0; } size--; return ret; }
}阻塞队列
队列为空take 就要阻塞在其他线程 put 的时候唤醒队列未满put 就要阻塞在其他线程 take 的时候唤醒
//此处不考虑泛型参数只是基于 String 进行存储
class MyBlockingQueue { private String[] data null; private int head 0; private int tail 0; private int size 0; private Object locker new Object(); public MyBlockingQueue(int capacity) { data new String[capacity]; } public void put(String s) throws InterruptedException { //加锁的对象可以单独定义一个也可以直接就地使用this synchronized (locker) { if (size data.length) { //队列满了需要阻塞 //return; locker.wait(); } data[tail] s; tail; if (tail data.length) { tail 0; } size; //唤醒 take 的阻塞 locker.notify(); } } public String take() throws InterruptedException { String ret ; synchronized (locker) { if (size 0) { //队列为空,需要阻塞 //return null; locker.wait(); } ret data[head]; head; if (head data.length) { head 0; } size--; //唤醒 put 的阻塞 locker.notify(); } return ret; }
}