Fork/Join 框架及其使用

  • 2019 年 10 月 7 日
  • 筆記

fork/join框架是ExecutorService接口的一种具体实现,会将任务分发给线程池中的工作线程,更好地利用多处理器带来的好处,提供程序性能。它是为那些能够被递归地拆解成子任务的工作类型量身设计的。

fork/join框架的独特之处在与它使用工作窃取(work-stealing)算法。完成自己的工作而处于空闲的工作线程能够从其他仍然处于忙碌(busy)状态的工作线程处窃取等待执行的任务。

fork/join框架的核心是ForkJoinPool类,它是对AbstractExecutorService类的扩展。ForkJoinPool实现了工作偷取算法,并可以执行ForkJoinTask任务。

使用fork/join框架的第一步是编写执行一部分工作的代码,类似的伪代码如下:

if (当前这个任务工作量足够小)直接完成这个任务else把当前任务分解成两个部分  调用这两个部分并等待结果

此代包装在ForkJoinTask的子类中。不过,通常是RecursiveTask(会返回一个结果),或RecursiveAction。

fork分解出新任务,join汇集任务结果,其大致过程如下:

Fork/Join实际用例

fork/join的核心思想就是分而治之,将一个大任务拆分成一个一个的小任务。fork/join的使用需要定义一个任务类去实现RecursiveTask或RecursiveAction,重写compute()方法,在compute()方法中定义任务拆分的逻辑,然后借助ForkJoinPool提交任务去执行,fork/join框架会根据compute()方法中定义的拆分逻辑对任务进行具体的拆分,如果有返回值,可以借助ForkJoinTask获取返回值。假设现在有很多网络请求需要并发的去执行,然后汇总结果,使用fork/join的代码实现如下:

public class ForkJoinTest {// 测试数据  static ArrayList<String> urls =  new ArrayList<String>() {  {  add("http://www.sina.com");  add("http://www.baidu.com");  add("http://www.sina.com");  add("http://www.baidu.com");  add("http://www.sina.com");  add("http://www.baidu.com");  add("http://www.sina.com");  add("http://www.baidu.com");  add("http://www.sina.com");  add("http://www.baidu.com");  add("http://www.sina.com");  add("http://www.baidu.com");  add("http://www.sina.com");  add("http://www.baidu.com");  add("http://www.sina.com");  add("http://www.baidu.com");  add("http://www.sina.com");  add("http://www.baidu.com");  add("http://www.sina.com");  add("http://www.baidu.com");  add("http://www.sina.com");  add("http://www.baidu.com");  add("http://www.sina.com");  }  };// 核心ForkJoinPool  static ForkJoinPool firkJoinPool =  new ForkJoinPool(3, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true);  public static void main(String args[]) throws ExecutionException, InterruptedException {  Job job = new Job(urls, 0, urls.size());  ForkJoinTask<String> forkJoinTask = firkJoinPool.submit(job);  String result = forkJoinTask.get();  System.out.println(result);  }  public static String doRequest(String url) {  // 模拟网络请求  return "Kody ... test ... " + url + "n";  }// 自定义任务  static class Job extends RecursiveTask<String> {List<String> urls;  int start;int end;public Job(List<String> urls, int start, int end) {  this.urls = urls;  this.start = start;  this.end = end;}/** 核心任务的计算,任务拆分逻辑实现 */  @Override  protected String compute() {// 计算任务的大小  int count = end - start;// 如果任务拆分的足够小,则执行任务  if (count <= 5) {  // 直接执行  String result = "";  for (int i = start; i < end; i++) {  String response = doRequest(urls.get(i));  result += response;  }  return result;} else { // 如果任务较大,继续拆分任务int x = (start + end) / 2;  Job job1 = new Job(urls, start, x);  job1.fork();  Job job2 = new Job(urls, x, end);  job2.fork();// 汇总fork出的子任务结果  return String.join("", job1.join(), job2.join());}  }  }  }