Fork/Join 框架及其使用
- 2019 年 10 月 7 日
- 筆記
fork/join框架是ExecutorService接口的一种具体实现,会将任务分发给线程池中的工作线程,更好地利用多处理器带来的好处,提供程序性能。它是为那些能够被递归地拆解成子任务的工作类型量身设计的。
fork/join框架的独特之处在与它使用工作窃取(work-stealing)算法。完成自己的工作而处于空闲的工作线程能够从其他仍然处于忙碌(busy)状态的工作线程处窃取等待执行的任务。
fork/join框架的核心是ForkJoinPool类,它是对AbstractExecutorService类的扩展。ForkJoinPool实现了工作偷取算法,并可以执行ForkJoinTask任务。
使用fork/join框架的第一步是编写执行一部分工作的代码,类似的伪代码如下:
if (当前这个任务工作量足够小)直接完成这个任务else把当前任务分解成两个部分 调用这两个部分并等待结果
此代包装在ForkJoinTask的子类中。不过,通常是RecursiveTask(会返回一个结果),或RecursiveAction。
fork分解出新任务,join汇集任务结果,其大致过程如下:

Fork/Join实际用例
fork/join的核心思想就是分而治之,将一个大任务拆分成一个一个的小任务。fork/join的使用需要定义一个任务类去实现RecursiveTask或RecursiveAction,重写compute()方法,在compute()方法中定义任务拆分的逻辑,然后借助ForkJoinPool提交任务去执行,fork/join框架会根据compute()方法中定义的拆分逻辑对任务进行具体的拆分,如果有返回值,可以借助ForkJoinTask获取返回值。假设现在有很多网络请求需要并发的去执行,然后汇总结果,使用fork/join的代码实现如下:
public class ForkJoinTest {// 测试数据 static ArrayList<String> urls = new ArrayList<String>() { { add("http://www.sina.com"); add("http://www.baidu.com"); add("http://www.sina.com"); add("http://www.baidu.com"); add("http://www.sina.com"); add("http://www.baidu.com"); add("http://www.sina.com"); add("http://www.baidu.com"); add("http://www.sina.com"); add("http://www.baidu.com"); add("http://www.sina.com"); add("http://www.baidu.com"); add("http://www.sina.com"); add("http://www.baidu.com"); add("http://www.sina.com"); add("http://www.baidu.com"); add("http://www.sina.com"); add("http://www.baidu.com"); add("http://www.sina.com"); add("http://www.baidu.com"); add("http://www.sina.com"); add("http://www.baidu.com"); add("http://www.sina.com"); } };// 核心ForkJoinPool static ForkJoinPool firkJoinPool = new ForkJoinPool(3, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); public static void main(String args[]) throws ExecutionException, InterruptedException { Job job = new Job(urls, 0, urls.size()); ForkJoinTask<String> forkJoinTask = firkJoinPool.submit(job); String result = forkJoinTask.get(); System.out.println(result); } public static String doRequest(String url) { // 模拟网络请求 return "Kody ... test ... " + url + "n"; }// 自定义任务 static class Job extends RecursiveTask<String> {List<String> urls; int start;int end;public Job(List<String> urls, int start, int end) { this.urls = urls; this.start = start; this.end = end;}/** 核心任务的计算,任务拆分逻辑实现 */ @Override protected String compute() {// 计算任务的大小 int count = end - start;// 如果任务拆分的足够小,则执行任务 if (count <= 5) { // 直接执行 String result = ""; for (int i = start; i < end; i++) { String response = doRequest(urls.get(i)); result += response; } return result;} else { // 如果任务较大,继续拆分任务int x = (start + end) / 2; Job job1 = new Job(urls, start, x); job1.fork(); Job job2 = new Job(urls, x, end); job2.fork();// 汇总fork出的子任务结果 return String.join("", job1.join(), job2.join());} } } }