横店影视城网站建设,建设婚纱摄影网站的重要性,什么是网站静态化,服务类的网站怎么做➢ LinkedBlockingQueue阻塞队列 LinkedBlockingQueue类图 LinkedBlockingQueue 中也有两个 Node 分别用来存放首尾节点#xff0c;并且里面有个初始值为 0 的原子变量 count 用来记录队列元素个数#xff0c;另外里面有两个ReentrantLock的独占锁#xff0c;分别用来控制… ➢ LinkedBlockingQueue阻塞队列 LinkedBlockingQueue类图 LinkedBlockingQueue 中也有两个 Node 分别用来存放首尾节点并且里面有个初始值为 0 的原子变量 count 用来记录队列元素个数另外里面有两个ReentrantLock的独占锁分别用来控制元素入队和出队加锁其中takeLock 用来控制同时只有一个线程可以从队列获取元素其他线程必须等待putLock 控制同时只能有一个线程可以获取锁 去添加元素其他线程必须等待。另外notEmpty和notFull用来实现入队和出队的同步。 另外由于出入队是两个非 公平独占锁所以可以同时又一个线程入队和一个线程出队其实这个是个生产者-消费者模型 /** 通过take取出进行加锁、取出 */ private final ReentrantLock takeLock new ReentrantLock(); /** 等待中的队列等待取出 */ private final Condition notEmpty takeLock.newCondition(); /*通过put放置进行加锁、放置*/ private final ReentrantLock putLock new ReentrantLock(); /** 等待中的队列等待放置 */ private final Condition notFull putLock.newCondition(); /* 记录集合中的个数计数器 */ private final AtomicInteger count new AtomicInteger(0); 队列的容量 //队列初始容量Integer最大值 public static final int MAX_VALUE 0x7fffffff; public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity 0) throw new IllegalArgumentException(); this.capacity capacity; //初始化首尾节点 last head new NodeE(null); } 如图默认队列容量为0x7fffffff;用户也可以自己指定容量。 LinkedBlockingQueue方法 ps下面介绍LinkedBlockingQueue用到很多Lock对象。详细可以查找Lock对象的介绍 ✓ 带时间的Offer操作-生产者 在ArrayBlockingQueue中已经简单介绍了Offer()方法LinkedBlocking的Offer 方法类似在此就不过多去 介绍。这次我们从介绍下带时间的Offer方法 public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { //空元素抛空指针异常 if (e null) throw new NullPointerException(); long nanos unit.toNanos(timeout); int c -1; final ReentrantLock putLock this.putLock; final AtomicInteger count this.count; //获取可被中断锁只有一个线程克获取 putLock.lockInterruptibly(); try { //如果队列满则进入循环 while (count.get() capacity) { //nanos0直接返回 if (nanos 0) return false; //否者调用await进行等待超时则返回01 nanos notFull.awaitNanos(nanos); } //await在超时时间内返回则添加元素2 enqueue(new NodeE(e)); c count.getAndIncrement(); //队列不满则激活其他等待入队线程3 if (c 1 capacity) notFull.signal(); } finally { //释放锁 putLock.unlock(); } //c0说明队列里面有一个元素这时候唤醒出队线程4 if (c 0) signalNotEmpty(); return true; } private void enqueue(NodeE node) { last last.next node; } private void signalNotEmpty() { final ReentrantLock takeLock this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } } ✓ 带时间的poll操作-消费者 获取并移除队首元素在指定的时间内去轮询队列看有没有首元素有则返回否者超时后返回null。 public E poll(long timeout, TimeUnit unit) throws InterruptedException { E x null; int c -1; long nanos unit.toNanos(timeout); final AtomicInteger count this.count; final ReentrantLock takeLock this.takeLock; //出队线程获取独占锁 takeLock.lockInterruptibly(); try { //循环直到队列不为空 while (count.get() 0) { //超时直接返回null if (nanos 0) return null; nanos notEmpty.awaitNanos(nanos); } //出队计数器减一 x dequeue(); c count.getAndDecrement(); //如果出队前队列不为空则发送信号激活其他阻塞的出队线程 if (c 1) notEmpty.signal(); } finally { //释放锁 takeLock.unlock(); } //当前队列容量为最大值-1则激活入队线程。 if (c capacity) signalNotFull(); return x; } 首先获取独占锁然后进入循环当当前队列有元素才会退出循环或者超时了直接返回null。 超时前退出循环后就从队列移除元素然后计数器减去一如果减去1 前队列元素大于1 则说明当前移除后队 列还有元素那么就发信号激活其他可能阻塞到当前条件信号的线程。 最后如果减去 1 前队列元素个数最大值那么移除一个后会腾出一个空间来这时候可以激活可能存在的入队阻塞线程。 ✓ put操作-生产者 与带超时时间的poll类似不同在于put时候如果当前队列满了它会一直等待其他线程调用notFull.signal才会被 唤醒。 ✓ take操作-消费者 与带超时时间的poll类似不同在于take时候如果当前队列空了它会一直等待其他线程调用notEmpty.signal()才 会被唤醒。 ✓ size操作-消费者 当前队列元素个数如代码直接使用原子变量count获取。 public int size() { return count.get(); } ✓ peek操作 获取但是不移除当前队列的头元素没有则返回null。 public E peek() { //队列空则返回null if (count.get() 0) return null; final ReentrantLock takeLock this.takeLock; takeLock.lock(); try { NodeE first head.next; if (first null) return null; else return first.item; } finally { takeLock.unlock(); } } ✓ remove操作 删除队列里面的一个元素有则删除返回true没有则返回false在删除操作时候由于要遍历队列所以加了双重 锁也就是在删除过程中不允许入队也不允许出队操作。 public boolean remove(Object o) { if (o null) return false; //双重加锁 fullyLock(); try { //遍历队列找则删除返回true for (NodeE trail head, p trail.next; p ! null; trail p, p p.next) { if (o.equals(p.item)) { unlink(p, trail); return true; } } //找不到返回false return false; } finally { //解锁 fullyUnlock(); } } void fullyLock() { putLock.lock(); takeLock.lock(); } void fullyUnlock() { takeLock.unlock(); putLock.unlock(); } void unlink(NodeE p, NodeE trail) { p.item null; trail.next p.next; if (last p) last trail; //如果当前队列满删除后也不忘记最快的唤醒等待的线程 if (count.getAndDecrement() capacity) notFull.signal(); } ✓ 开源框架的使用 tomcat中任务队列TaskQueue。 类结构图 可知TaskQueue继承了LinkedBlockingQueue并且泛化类型固定了为Runnalbe.重写了offer,polltake方法。 tomcat 中有个线程池 ThreadPoolExecutor在 NIOEndPoint 中当 acceptor 线程接受到请求后会把任务放入队列然后poller 线程从队列里面获取任务然后就把任务放入线程池执行。这个ThreadPoolExecutor中的的一个参数就是TaskQueue。 先看看ThreadPoolExecutor的参数如果是普通LinkedBlockingQueue是怎么样的执行逻辑 当调用线程池方法 execute() 方法添加一个任务时 l 如果当前运行的线程数量小于 corePoolSize则创建新线程运行该任务 l 如果当前运行的线程数量大于或等于 corePoolSize则将这个任务放入阻塞队列。 l 如果当前队列满了并且当前运行的线程数量小于 maximumPoolSize则创建新线程运行该任务 l 如果当前队列满了并且当前运行的线程数量大于或等于 maximumPoolSize那么线程池将会抛出 RejectedExecutionException异常。 如果线程执行完了当前任务那么会去队列里面获取一个任务来执行如果任务执行完了并且当前线程数大于 corePoolSize那么会根据线程空闲时间keepAliveTime回收一些线程保持线程池corePoolSize个线程。 首先看下线程池中exectue添加任务时候的逻辑 public void execute(Runnable command) { if (command null) throw new NullPointerException(); //当前工作线程个数小于core个数则开新线程执行1 int c ctl.get(); if (workerCountOf(c) corePoolSize) { if (addWorker(command, true)) return; c ctl.get(); } //放入队列2 if (isRunning(c) workQueue.offer(command)) { int recheck ctl.get(); if (! isRunning(recheck) remove(command)) reject(command); else if (workerCountOf(recheck) 0) addWorker(null, false); } //如果队列满了则开新线程但是个数要不超过最大值超过则返回false //然后执行reject handler3 else if (!addWorker(command, false)) reject(command); } 可知当当前工作线程个数为corePoolSize后如果在来任务会把任务添加到队列队列满了或者入队失败了则开启新线程。 然后看看TaskQueue中重写的offer方法的逻辑 public boolean offer(Runnable o) { // 如果parent为null则直接调用父类方法 if (parentnull) return super.offer(o); //如果当前线程池中线程个数达到最大则无条件调用父类方法 if (parent.getPoolSize() parent.getMaximumPoolSize()) return super.offer(o); //如果当前提交的任务小于当前线程池线程数说明线程用不完没必要重新开线程 if (parent.getSubmittedCount()(parent.getPoolSize())) return super.offer(o); //如果当前线程池线程个数core个数但是小于最大个数则开新线程代替放入队列 if (parent.getPoolSize()parent.getMaximumPoolSize()) return false; //到了这里无条件调用父类 return super.offer(o); } 可知parent.getPoolSize()parent.getMaximumPoolSize()普通队列会把当前任务放入队列TaskQueue则是返回false因为这会开启新线程执行任务当然前提是当前线程个数没有达到最大值。 LinkedBlockingQueue安全分析总结 仔细思考下阻塞队列是如何实现并发安全的维护队列链表的先分析下简单的情况就是当队列里面有多个元素时候由于同时只有一个线程通过独占锁putLock实现入队元素并且是操作last节点而同时只有一个出队线程(通过独占锁takeLock实现操作head节点所以不存在并发安全问题。 考虑当队列为空的时候队列状态为 这时候假如一个线程调用了 take 方法,由于队列为空所以 count.get()0 所以当前线程会调用notEmpty.await()把自己挂起并且放入 notEmpty 的条件队列并且释放当前条件变量关联的通过takeLock.lockInterruptibly()获取的独占锁。由于释放了锁所以这时候其他线程调用 take 时候就会通过takeLock.lockInterruptibly()获取独占锁然后同样阻塞到notEmpty.await()同样会被放入notEmpty的条件队列也就说在队列为空的情况下可能会有多个线程因为调用take被放入了notEmpty的条件队列。 这时候如果有一个线程调用了 put 方法那么就会调用 enqueue 操作该操作会在 last 节点后面添加新元素并且设置 last 为新节点。然后 count.getAndIncrement()先获取当前队列元个数为 0 保存到 c然后自增 count 为 1 由于 c0 所以调用 signalNotEmpty 激活notEmpty 的条件队列里面的阻塞时间最长的线程这时候 take 中调用notEmpty.await()的线程会被激活await内部会重新去获取独占锁获取成功则返回否者被放入AQS的阻塞队列如果获取成功那么count.get() 0因为可能多个线程put了所以调用dequeue从队列获取元素这时候一定可以获取到然后调用c count.getAndDecrement() 把当前计数返回后并减去1如果c1 说明当前队列还有其他元素那么就调用 notEmpty.signal()去激活 notEmpty的条件队列里面的其他阻塞线程。 考虑当队列满的时候 当队列满的时候调用 put 方法时候会由于 notFull.await()当前线程被阻塞放入 notFull 管理的条件队列里面同理可能会有多个调用put方法的线程都放到了notFull的条件队列里面。 这时候如果有一个线程调用了take方法,调用dequeue()出队一个元素c count.getAndDecrement()count值减一ccapacity;现在队列有一个空的位置所以调用 signalNotFull()激活 notFull 条件队列里面等待最久的一个线程。 LinkedBlockingQueue简单示例 并发库中的BlockingQueue 是一个比较好玩的类顾名思义就是阻塞队列。该类主要提供了两个方法put()和take()前者将一个对象放到队列中如果队列已经满了就等待直到有空闲节点后者从head取一个对象如果没有对象就等待直到有可取的对象。 下面的例子比较简单一个读线程用于将要处理的文件对象添加到阻塞队列中 另外四个写线程用于取出文件对象为了模拟写操作耗时长的特点特让线程睡眠一段随机长度的时间。另外该Demo也使用到了线程池和原子整型 AtomicIntegerAtomicInteger可以在并发情况下达到原子化更新避免使用了synchronized而且性能非常高。由 于阻塞队列的 put 和 take 操作会阻塞为了使线程退出特在队列中添加了一个“标识”算法中也叫“哨兵”当发现这个哨兵后写线程就退出。 当然线程池也要显式退出了。 package concurrent; import java.io.File; import java.io.FileFilter; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; public class TestBlockingQueue { static long randomTime() { return (long) (Math.random() * 1000); } public static void main(String[] args) { // 能容纳100个文件 final BlockingQueueFile queue new LinkedBlockingQueueFile(100); // 线程池 final ExecutorService exec Executors.newFixedThreadPool(5); final File root new File(F:\\JavaLib); // 完成标志 final File exitFile new File(); // 读个数 final AtomicInteger rc new AtomicInteger(); // 写个数 final AtomicInteger wc new AtomicInteger(); // 读线程 Runnable read new Runnable() { public void run() { scanFile(root); scanFile(exitFile); } public void scanFile(File file) { if (file.isDirectory()) { File[] files file.listFiles(new FileFilter() { public boolean accept(File pathname) { return pathname.isDirectory() || pathname.getPath().endsWith(.java); } }); for (File one : files) scanFile(one); } else { try { int index rc.incrementAndGet(); System.out.println(Read0: index file.getPath()); queue.put(file); } catch (InterruptedException e) { } } } }; exec.submit(read); // 四个写线程 for (int index 0; index 4; index) { // write thread final int NO index; Runnable write new Runnable() { String threadName Write NO; public void run() { while (true) { try { Thread.sleep(randomTime()); int index wc.incrementAndGet(); File file queue.take(); // 队列已经无对象 if (file exitFile) { // 再次添加标志以让其他线程正常退出 queue.put(exitFile); break; } System.out.println(threadName : index file.getPath()); } catch (InterruptedException e) { } } } }; exec.submit(write); } exec.shutdown(); }
}