自定义FutureTask实现

  • 2019 年 10 月 3 日
  • 筆記

FutureTask

FutureTask是Future的实现,用来异步任务的获取结果,可以启动和取消异步任务,查询异步任务是否计算结束以及获取最终的异步任务的结果。通过get()方法来获取异步任务的结果,但是会阻塞当前线程直至异步任务执行结束。一旦任务执行结束,任务不能重新启动或取消,除非调用runAndReset()方法。

代码示例:

public class ThreadTest {      public static void main(String[] args) throws Exception {        Callable<String> myCallable = new MyCallableThread();      FutureTask<String> futureTask = new FutureTask<>(myCallable);      Thread myCallableThread = new Thread(futureTask);      myCallableThread.setName("MyThread-implements-Callable-test");      myCallableThread.start();      System.out.println("Run by Thread:" + futureTask.get());        //通过线程池执行      ExecutorService executorService = Executors.newCachedThreadPool();      executorService.submit(futureTask);      executorService.shutdown();      System.out.println("Run by ExecutorService:" + futureTask.get());    }  }    class MyCallableThread implements Callable<String> {      @Override    public String call() throws Exception {      return Thread.currentThread().getName();    }  }  

实现一个自己的FutureTask

根据FutureTask核心原理,要实现一个FutureTask必须满足以下方面:

  • 需要泛型定义用以返回结果类型
  • 需要一个callable对象,在构造方法中传入
  • 需要实现runnable接口,在run方法中实现具体结果计算
  • 需要一个公开的get方法来获取结果
  • 如果线程没有执行完,则调用get方法的线程需要进入等待队列
  • 需要一个字段记录线程执行的状态
  • 需要一个等待队列存储等待结果的线程

代码示例:

/**   * 1. 泛型定义   * 2. 构造方法 callable   * 3. 实现了runnable   * 4. get方法返回callable执行结果   * 5. get方法有阻塞的效果(未执行结束的话)   */  public class MyFutureTask<T> implements Runnable {      // 程序执行的结果    private T result;      // 要执行的任务    private Callable<T> callable;      // 任务运行的状态    private volatile int state = NEW;      // 任务运行的状态值    private static final int NEW = 0;    private static final int RUNNING = 1;    private static final int FINISHED = 2;      // 获取结果的线程等待队列    LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>(100);      // 执行当前FutureTask的线程,用CAS进行争抢    AtomicReference<Thread> runner = new AtomicReference<>();      public MyFutureTask(Callable<T> task) {      this.callable = task;    }      @Override    public void run() {      // 判断当前对象的状态,如果是New且抢锁成功就执行      if (state != NEW || !runner.compareAndSet(null, Thread.currentThread())) return;      state = RUNNING;      try {        result = callable.call();      } catch (Exception e) {        e.printStackTrace();      } finally {        state = FINISHED;      }        // 方法执行完,唤醒所有线程      while (true) {        Thread waiterThread = waiters.poll();        if (waiterThread == null) break;        LockSupport.unpark(waiterThread);      }    }      public T get() {      // 如果状态不是FINISHED,则进入等待队列      if (state != FINISHED) {        waiters.offer(Thread.currentThread());      }      while (state != FINISHED) {        LockSupport.park();      }      return result;    }  }    // MyFutureTask 测试  public class FutureTaskTest {    public static void main(String[] args) {        Callable<String> myCallable = new MyCallableThread();      MyFutureTask<String> futureTask = new MyFutureTask<>(myCallable);      Thread myCallableThread = new Thread(futureTask);      myCallableThread.setName("MyFutureTask-test");      myCallableThread.start();      System.out.println("Run by Thread:" + futureTask.get());    }  }    class MyCallableThread implements Callable<String> {      @Override    public String call() throws Exception {      return Thread.currentThread().getName();    }  }