注册电气工程师考试科目,seo搜索引擎优化什么意思,网站缓存实例,网页游戏网站排行目录
一. 前言
二. 并发和并行
2.1. 并发
2.2. 并行
2.3. 分治法
三. ForkJoin 并行处理框架的理论
3.1. ForkJoin 框架概述
3.2. ForkJoin 框架原理
3.3. 工作窃取算法
四. ForkJoin 并行处理框架的实现
4.1. ForkJoinPool 类
4.2. ForkJoinWorkerThread 类
4.3.…目录
一. 前言
二. 并发和并行
2.1. 并发
2.2. 并行
2.3. 分治法
三. ForkJoin 并行处理框架的理论
3.1. ForkJoin 框架概述
3.2. ForkJoin 框架原理
3.3. 工作窃取算法
四. ForkJoin 并行处理框架的实现
4.1. ForkJoinPool 类
4.2. ForkJoinWorkerThread 类
4.3. ForkJoinTask 类
4.4. ForkJoin 示例
五. 总结 一. 前言 在 JDK 中提供了这样一种功能它能够将复杂的逻辑拆分成一个个简单的逻辑来并行执行待每个并行执行的逻辑执行完成后再将各个结果进行汇总得出最终的结果数据。有点像Hadoop 中的 MapReduce。 ForkJoin 是由 JDK1.7 之后提供的多线程并发处理框架。ForkJoin 框架的基本思想是分而治之。什么是分而治之分而治之就是将一个复杂的计算按照设定的阈值分解成多个计算然后将各个计算结果进行汇总。相应的ForkJoin 将复杂的计算当做一个任务而分解的多个计算则是当做一个个子任务来并行执行。
二. 并发和并行
并发和并行在本质上还是有所区别的。
2.1. 并发 并发指的是在同一时刻只有一个线程能够获取到 CPU 执行任务而多个线程被快速的轮换执行这就使得在宏观上具有多个线程同时执行的效果并发不是真正的同时执行并发可以使用下图表示 2.2. 并行 并行指的是无论何时多个线程都是在多个 CPU 核心上同时执行的是真正的同时执行。 2.3. 分治法 把一个规模大的问题划分为规模较小的子问题然后分而治之最后合并子问题的解得到原问题的解。在分治法中子问题一般是相互独立的因此经常通过递归调用算法来求解子问题。 步骤可分为1. 分割原问题2. 求解子问题3. 合并子问题的解为原问题的解。我们可以使用如下伪代码来表示这个步骤
if (任务很小{直接计算得到结果
} else {分拆成N个子任务调用子任务的fork()进行计算调用子任务的join()合并计算结果
}
典型应用有二分搜索、大整数乘法、Strassen矩阵乘法、棋盘覆盖、合并排序、快速排序、线性时间选择、汉诺塔。
三. ForkJoin 并行处理框架的理论
3.1. ForkJoin 框架概述 Java 1.7 引入了一种新的并发框架 —— Fork/Join Framework主要用于实现“分而治之”的算法特别是分治之后递归调用的函数。 ForkJoin 框架的本质是一个用于并行执行任务的框架能够把一个大任务分割成若干个小任务最终汇总每个小任务结果后得到大任务的计算结果。在 Java 中ForkJoin 框架与 ThreadPool 共存并不是要替换 ThreadPool。 其实在 Java 8 中引入的并行流计算内部就是采用的 ForkJoinPool 来实现的。例如下面使用并行流实现打印数组元组的程序
public class SumArray {public static void main(String[] args) {ListInteger numberList Arrays.asList(1,2,3,4,5,6,7,8,9);numberList.parallelStream().forEach(System.out::println);}
}
说到这里可能有读者会问可以使用线程池的 ThreadPoolExecutor 来实现啊为什么要使用ForkJoinPool 接下来我们就来回答这个问题。
3.2. ForkJoin 框架原理 ForkJoin 框架是从 JDK1.7 中引入的新特性它同 ThreadPoolExecutor 一样也实现了Executor 和 ExecutorService 接口。它使用了一个无限队列来保存需要执行的任务而线程的数量则是通过构造函数传入如果没有向构造函数中传入指定的线程数量那么当前计算机可用的 CPU 数量会被设置为线程数量作为默认值。 ForkJoinPool 主要使用分治法Divide-and-Conquer Algorithm来解决问题。典型的应用比如快速排序算法。这里的要点在于ForkJoinPool 能够使用相对较少的线程来处理大量的任务。 比如要对1000万个数据进行排序那么会将这个任务分割成两个500万的排序任务和一个针对这两组500万数据的合并任务。以此类推对于500万的数据也会做出同样的分割处理到最后会设置一个阈值来规定当数据规模到多少时停止这样的分割处理。 比如当元素的数量小于10时会停止分割转而使用插入排序对它们进行排序。那么到最后所有的任务加起来会有大概200万个。问题的关键在于对于一个任务而言只有当它所有的子任务完成之后它才能够被执行。 所以当使用 ThreadPoolExecutor 时使用分治法会存在问题因为 ThreadPoolExecutor 中的线程无法向任务队列中再添加一个任务并在等待该任务完成之后再继续执行。而使用 ForkJoinPool就能够解决这个问题它就能够让其中的线程创建新的任务并挂起当前的任务此时线程就能够从队列中选择子任务执行。
那么使用 ThreadPoolExecutor 或者 ForkJoinPool性能上会有什么差异呢 首先使用 ForkJoinPool 能够使用数量有限的线程来完成非常多的具有父子关系的任务比如使用4个线程来完成超过200万个任务。但是使用 ThreadPoolExecutor 时是不可能完成的因为 ThreadPoolExecutor 中的 Thread 无法选择优先执行子任务需要完成200万个具有父子关系的任务时也需要200万个线程很显然这是不可行的也是很不合理的
3.3. 工作窃取算法 假如我们需要做一个比较大的任务我们可以把这个任务分割为若干互不依赖的子任务为了减少线程间的竞争于是把这些子任务分别放到不同的队列里并为每个队列创建一个单独的线程来执行队列里的任务线程和队列一一对应比如 A 线程负责处理 A 队列里的任务。但是有的线程会先把自己队列里的任务干完而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着不如去帮其他线程干活于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列所以为了减少窃取任务线程和被窃取任务线程之间的竞争通常会使用双端队列被窃取任务线程永远从双端队列的头部拿任务执行而窃取任务的线程永远从双端队列的尾部拿任务执行。
工作窃取算法的优点 充分利用线程进行并行计算并减少了线程间的竞争。
工作窃取算法的缺点 在某些情况下还是存在竞争比如双端队列里只有一个任务时。并且该算法会消耗更多的系统资源比如创建多个线程和多个双端队列。
四. ForkJoin 并行处理框架的实现
ForkJoin 框架中一些重要的类如下所示 Fork/Join 框架类图 4.1. ForkJoinPool 类 既然任务是被逐渐的细化的那就需要把这些任务存在一个池子里面这个池子就是ForkJoinPool它与其它的 ExecutorService 区别主要在于它使用“工作窃取”。由类图可以看出ForkJoinPool 类实现了线程池的 Executor接口。我们还可以使用 Executors.newWorkStealPool() 方法来创建 ForkJoinPool。
ForkJoinPool 中提供了如下提交任务的方法
public void execute(ForkJoinTask? task)
public void execute(Runnable task)
public T T invoke(ForkJoinTaskT task)
public T ListFutureT invokeAll(Collection? extends CallableT tasks)
public T ForkJoinTaskT submit(ForkJoinTaskT task)
public T ForkJoinTaskT submit(CallableT task)
public T ForkJoinTaskT submit(Runnable task, T result)
public ForkJoinTask? submit(Runnable task)
4.2. ForkJoinWorkerThread 类
实现 ForkJoin 框架中的线程。
4.3. ForkJoinTaskV 类 ForkJoinTask 封装了数据及其相应的计算并且支持细粒度的数据并行。ForkJoinTask 比线程要轻量ForkJoinPool 中少量工作线程能够运行大量的 ForkJoinTask。ForkJoinTask 类中主要包括两个方法 fork() 和 join()分别实现任务的分拆与合并。 fork() 方法类似于 Thread.start()但是它并不立即执行任务而是将任务放入工作队列中。跟Thread.join() 方法不同ForkJoinTask 的 join() 方法并不简单的阻塞线程而是利用工作线程运行其他任务当一个工作线程中调用 join()它将处理其他任务直到注意到目标子任务已经完成。
我们可以使用下图来表示这个过程 ForkJoinTask有3个子类
RecursiveAction无返回值的ForkJoinTask并实现 Runnable。RecursiveTask有返回值的ForkJoinTask并实现 Callable。CountedCompleter完成任务后将触发其他任务。
4.4. ForkJoin 示例
package com.lm.concurrency.example;import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;public class ForkJoinTaskExample extends RecursiveTaskInteger {public static final int threshold 2;private int start;private int end;public ForkJoinTaskExample(int start, int end) {this.start start;this.end end;}Overrideprotected Integer compute() {int sum 0;// 如果任务足够小就计算任务boolean canCompute (end - start) threshold;if (canCompute) {for (int i start; i end; i) {sum i;}} else {// 如果任务大于阈值就分裂成两个子任务计算int middle (start end) / 2;ForkJoinTaskExample leftTask new ForkJoinTaskExample(start, middle);ForkJoinTaskExample rightTask new ForkJoinTaskExample(middle 1, end);// 执行子任务leftTask.fork();rightTask.fork();// 等待任务执行结束合并其结果int leftResult leftTask.join();int rightResult rightTask.join();// 合并子任务sum leftResult rightResult;}return sum;}public static void main(String[] args) {ForkJoinPool forkjoinPool new ForkJoinPool();// 生成一个计算任务计算1234ForkJoinTaskExample task new ForkJoinTaskExample(1, 100);// 执行一个任务FutureInteger result forkjoinPool.submit(task);try {System.out.println(result: result.get());} catch (Exception e) {System.out.println(e);}}
}
五. 总结
Fork/Join 框架局限性
对于 Fork/Join 框架而言当一个任务正在等待它使用 Join 操作创建的子任务结束时执行这个任务的工作线程查找其他未被执行的任务并开始执行这些未被执行的任务通过这种方式线程充分利用它们的运行时间来提高应用程序的性能。为了实现这个目标Fork/Join 框架执行的任务有一些局限性。
任务只能使用 Fork 和 Join 操作来进行同步机制如果使用了其他同步机制则在同步操作时工作线程就不能执行其他任务了。比如在 Fork/Join 框架中使任务进行了睡眠那么在睡眠期间内正在执行这个任务的工作线程将不会执行其他任务了。在 Fork/Join 框架中所拆分的任务不应该去执行 IO 操作比如读写数据文件。任务不能抛出检查异常必须通过必要的代码来处理这些异常。