Java线程的基本使用

  • 2019 年 12 月 24 日
  • 筆記

首先,这篇文章写的都是一些比较基础的内容,也就是从API层面解释一下我们平时用的比较多的东西,其实我倒是也想写点底层的东西,可是我也不懂啊。虽然比较基础,但可能却比较容易忽略吧

在Java中使用多线程,本质上还是对Thread对象的操作。线程池只是为了方便对线程的管理,避免频繁的创建和销毁线程带来不必要的系统开销,内部通过指定的线程数和阻塞队列实现。

基本使用

创建一个Thread对象的时候一般会传递一个Runnable对象,任务逻辑就写在Runnable的run方法中。感觉这个Runnable的名字取得不太好,如果叫Task是不是会更好一些呢?

new Thread(()-> doXX() ).start();

获取返回值

上面的那种方式使用起来是挺简单,但会遇到一些问题,比如:能获取返回值不?

通过全局变量

像上面这样是没办法获取返回值的,所以我们需要做一些处理,比如,将结果赋值给一个全局变量

private static int result;    public static void main(String[] args) throws InterruptedException {      new Thread(() -> {          System.out.println("处理业务逻辑");          result = 1000;      }).start();      Thread.sleep(1000);      System.out.println(result);  }

result就是一个全局变量,当任务执行完成之后,更新这个值。这其实都不能算是返回值,但有时候也能用:不需要立即知道任务的执行结果,在访问全部变量的时候,只需要获取它的值就好了。比如通过定时任务去更新缓存,不需要关注任务什么时候执行完成,我需要的只是缓存的值,任务执行了就获取最新的值,没有执行就获取旧值。

通过空轮询

那假如我就是想现在获取返回值咋办?因为我要用这个返回值作为下面逻辑的输入。那或许可以通过轮询的方式检测全局变量来达到目的?

while(result == null){  }

除了白白浪费CPU,好像也行啊?但我现在考虑的只是两个线程,如果有多个线程该对全局变量修改该怎么办呢?那用ThreadLocal?算了,就此打住吧

通过简单封装

或许可以封装一下?再封装之前,先考虑几个问题

  1. 任务的逻辑定义在哪里? 如果用Runnable,就无法返回值,所以可以定义一个有返回值的@FunctionalInterface接口,叫 Task
  2. 返回的值存到哪里?怎么返回?Thread没有相关的方法,扩展一下?
public static void main(String[] args) throws InterruptedException {      CallableThread callableThread = new CallableThread(() -> {          try {              Thread.sleep(3000);          } catch (InterruptedException e) {              e.printStackTrace();          }          return "ccccc";      });        callableThread.start();      System.out.println("开始时间 " + LocalDateTime.now());      System.out.println(callableThread.get());      System.out.println("结束时间 " + LocalDateTime.now());  }      class CallableThread<T> extends Thread {      private Task<T> task;        private T result;        private volatile boolean finished = false;        public CallableThread(Task<T> task) {          this.task = task;      }        @Override      public void run() {          synchronized (this) {              result = task.call();              finished = true;              notifyAll();          }      }        public T get() {          synchronized (this) {              while (!finished) {                  try {                      wait();                  } catch (InterruptedException e) {                      e.printStackTrace();                  }              }              return result;          }      }  }    @FunctionalInterface  interface Task<T> {      T call();  }

这样貌似也可以,但是不太好。Thread本来只是用于处理和线程相关的事情,现在将它和逻辑(Task)绑定在一起,如果有多个任务想共用一个Thread,那返回值怎么处理?

是否可以将这部分逻辑抽出来,放到一个新类当中?

public static void main(String[] args) throws InterruptedException {      MyRunnable<String> myRunnable = new MyRunnable(() -> {          // 模拟耗时的业务操作          try {              Thread.sleep(3000);          } catch (InterruptedException e) {              e.printStackTrace();          }          return "我是结果";      });      System.out.println("开始时间 " + LocalDateTime.now());      new Thread(myRunnable).start();        System.out.println("result: " + myRunnable.get());      System.out.println("结束时间 " + LocalDateTime.now());  }      class MyRunnable<T> implements Runnable {      private Task<T> task;        private T result;        private volatile boolean finished = false;        public MyRunnable(Task<T> task) {          this.task = task;      }        @Override      public void run() {          synchronized (this) {              result = task.call();              finished = true;              notifyAll();          }      }        public T get() {          synchronized (this) {              while (!finished) {                  try {                      wait();                  } catch (InterruptedException e) {                      e.printStackTrace();                  }              }              return result;          }      }  }

这不是和java里面的Future有点像吗?确实有点像

Future模式

Future里面有几个比较核心的概念

  1. Future:抽象出 获取任务返回值获取任务执行状态 等常用方法的接口
  2. Callable:类似于上面的 Task
  3. FutureTask:类似于上面的 MyRunnable

下面看一个例子

public static void main(String[] args) throws ExecutionException, InterruptedException {      FutureTask<String> future = new FutureTask<>(() -> {          Thread.sleep(3000);          System.out.println(System.currentTimeMillis());          return "hehehh";      });      new Thread(future).start();      System.out.println("Start Get Result : " + System.currentTimeMillis());      System.out.println("Get Result : " + future.get() + System.currentTimeMillis());  }

Future

Future接口除了提供获取返回值的接口,还提供了一些其他的接口,根据名字大概也可以猜到什么意思,不过多解释了。实在不行看看源码吧,这样子就很愉快了。

boolean cancel(boolean mayInterruptIfRunning);  boolean isCancelled();  boolean isDone();  V get() throws InterruptedException, ExecutionException;  V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

FutureTask

FutureTask同时实现了RunnableFuture接口,

任务状态

FutureTask中,任务的不同状态通过state变量来表示,状态有以下几种:

/*   * NEW -> COMPLETING -> NORMAL   * NEW -> COMPLETING -> EXCEPTIONAL   * NEW -> CANCELLED   * NEW -> INTERRUPTING -> INTERRUPTED   */    private volatile int state;    private static final int NEW          = 0;  private static final int COMPLETING   = 1;  private static final int NORMAL       = 2;  private static final int EXCEPTIONAL  = 3;  private static final int CANCELLED    = 4;  private static final int INTERRUPTING = 5;  private static final int INTERRUPTED  = 6;
任务执行

因为FutureTask本身也实现了 Runnable 接口,所以核心关注它的run方法,执行逻辑其实比较简单:

  1. 先判断状态,如果不为NEW 或者通过cas更新 runner 失败,则直接返回
  2. 执行Callable#call方法,根据执行结果,设置状态, 如果执行成功:先将state设置成COMPLETING,然后保存返回的结果保存到属性outcome,再将state设置成NORMAL,最后通过LockSupport.unpark(t)解除阻塞的线程; 如果执行失败:先将state设置成COMPLETING,然后异常信息保存到属性outcome,再将state设置成EXCEPTIONAL,最后通过LockSupport.unpark(t)解除阻塞的线程;
如何阻塞

当我们通过FutureTask#get方法获取返回值的时候,会阻塞当前线程,那是通过什么方式阻塞当前线程的?是通过LockSupport阻塞的,这个推荐看看博客吧。我也是看博客的,自己也解释的没人家好,嗯,就是这样的

public V get() throws InterruptedException, ExecutionException {      int s = state;      if (s <= COMPLETING)          s = awaitDone(false, 0L);      return report(s);  }    private int awaitDone(boolean timed, long nanos)      throws InterruptedException {      final long deadline = timed ? System.nanoTime() + nanos : 0L;      WaitNode q = null;      boolean queued = false;      for (;;) {          if (Thread.interrupted()) {              removeWaiter(q);              throw new InterruptedException();          }            int s = state;          // state > COMPLETING ,说明任务要么正常执行,要么异常结束,所以这里可以直接返回          if (s > COMPLETING) {              if (q != null)                  q.thread = null;   // 这应该是help GC吧?              return s;          }          // 如果正在收尾阶段,交出CPU, 等下次循环          else if (s == COMPLETING) // cannot time out yet              Thread.yield();          else if (q == null)              q = new WaitNode();          // 通过UNSAFE 设置 waiters          else if (!queued)              // 将新的`WaitNode`添加到单向链表的头部,waiters即对应头节点              queued = UNSAFE.compareAndSwapObject(this, waitersOffset,                                                      q.next = waiters, q);          else if (timed) {              nanos = deadline - System.nanoTime();              if (nanos <= 0L) {                  removeWaiter(q);                  return state;              }              LockSupport.parkNanos(this, nanos);          }          else              LockSupport.park(this);   // 阻塞当前线程      }  }

上面我们看到了有一个waiters,这是用来干嘛的呢?它是一个单向链表结构,主要是为了处理多次调用FutureTask#get的情况,每调用一次FutureTask#get就会生成一个WaitNode节点,然后将它添加到单向链表的头部

那什么时候用到这个链表呢?在任务执行完成的时候,会执行finishCompletion方法,主要就是从头节点依次往下遍历,获取节点的thread属性,然后执行LockSupport.unpark(thread)解除阻塞

回调如何处理

相对之前的那种方式来说,FutureTask已经很好用了,直接通过FutureTask#get方法就可以获取返回值了,确实蛮方便的。

不过方便是方便,但假如我想在获取返回值之后执行一些其他的逻辑该怎么处理呢?其实我最直接的想法就是回调了。比如,我们可以对上面的MyRunnable代码再扩展一下,例如

public MyRunnable addListener(Consumer c) {      // 这里是一个例子,肯定不会每次都new一个线程,一般是使用线程池          while (!finished) {              try {                  Thread.sleep(1);              } catch (InterruptedException e) {                  e.printStackTrace();              }          }          c.accept(result);      }).start();      return this;  }

我们给MyRunnable添加了一个addListener方法,接收一个Consumer作为入参,当任务执行完成之后就执行这段逻辑,如下:

public static void main(String[] args) throws InterruptedException {      MyRunnable<String> myRunnable = new MyRunnable(() -> {          // 模拟耗时的业务操作          try {              Thread.sleep(3000);          } catch (InterruptedException e) {              e.printStackTrace();          }          return "我是结果";      });      System.out.println("开始时间 " + LocalDateTime.now());      new Thread(myRunnable).start();        myRunnable.addListener(result -> {          System.out.println("当xxx执行完成之后,线程:" + Thread.currentThread().getName() + " 执行一些其他的任务");          result = result + "   ggggg";          System.out.println(result);      });  }

ListenableFuture

ListenableFutureguava包里面的,对Future进行了增强,ListenableFuture继承了Future,新增了一个添加回调的方法

/**   * @param listener the listener to run when the computation is complete     回调逻辑   * @param executor the executor to run the listener in  回调在哪个线程池执行   */  void addListener(Runnable listener, Executor executor);

ListenableFutureTask继承了FutureTask并且是实现了ListenableFuture接口,看一个简单例子

public static void main(String[] args) throws InterruptedException {      ListenableFutureTask futureTask = ListenableFutureTask.create(() -> {          System.out.println("执行任务开始  " + LocalDateTime.now());          Thread.sleep(3000);          System.out.println("执行任务完成  " + LocalDateTime.now());          return "结果";      });        futureTask.addListener(() -> System.out.println("获取结果之后,输出一条日志"), MoreExecutors.directExecutor());      new Thread(futureTask).start();  }
源码分析

原理就是将所有回调维护在一个单向链表中,也就是ExecutionList,然后通过重写“FutureTask#done`方法,在任务完成之后执行回调逻辑

// 每个回调就相当于是一个RunnableExecutorPair节点,所有RunnableExecutorPair节点构成一条链表,头插链表  private final ExecutionList executionList = new ExecutionList();    // ListenableFutureTask#addListener  public void addListener(Runnable listener, Executor exec) {      executionList.add(listener, exec);  }      // ExecutionList#add  public void add(Runnable runnable, Executor executor) {      // 上锁,因为它的内部属性 executed 可能会被任务逻辑线程更新,即 ListenableFutureTask 实现了 FutureTask 的done方法,然后会在里面更新 executed 的值为true      // 还有一点,如果不加锁,当多个线程同时添加回调的时候,可能会造成节点丢失      synchronized (this) {          // 如果任务还没有执行完成,就将当前节点添加到头节点          if (!executed) {              runnables = new RunnableExecutorPair(runnable, executor, runnables);              return;          }      }        // 如果任务执行完成,就开始执行回调      executeListener(runnable, executor);  }      // ExecutionList#executeListener  private static void executeListener(Runnable runnable, Executor executor) {      try {          // 直接将任务交给线程池          executor.execute(runnable);      } catch (RuntimeException e) {          log.log(Level.SEVERE, "RuntimeException while executing runnable " + runnable + " with executor " + executor, e);      }  }    // ExecutionList.RunnableExecutorPair  private static final class RunnableExecutorPair {      final Runnable runnable;      final Executor executor;      @Nullable RunnableExecutorPair next;        RunnableExecutorPair(Runnable runnable, Executor executor, RunnableExecutorPair next) {          this.runnable = runnable;          this.executor = executor;          this.next = next;      }  }

ListenableFutureTask是怎么知道任务是否执行完成了呢?FutureTask#finishCompletion方法中,解除阻塞的线程之后,还会执行一个done方法,不过该方法在FutureTask没有任何逻辑,可以把它当作是一个模板方法,而ListenableFutureTask实现了该方法,如下:

// ListenableFutureTask#done  protected void done() {      executionList.execute();  }      // ExecutionList#execute  public void execute() {      RunnableExecutorPair list;      synchronized (this) {          if (executed) {              return;          }          // 首先将executed置为true          executed = true;          // runnables代表链表的头节点          list = runnables;          runnables = null; // allow GC to free listeners even if this stays around for a while.      }            RunnableExecutorPair reversedList = null;      // 这其实是一个倒置的过程,因为我们添加节点的时候,是插入到头部的,为了保证回调按照我们添加时的顺序执行,即 先添加先执行,所以做了一个倒置      while (list != null) {          RunnableExecutorPair tmp = list;          list = list.next;          tmp.next = reversedList;          reversedList = tmp;      }        // 遍历链表,依次执行回调逻辑      while (reversedList != null) {          executeListener(reversedList.runnable, reversedList.executor);          reversedList = reversedList.next;      }  }

FutureCallback

通过ListenableFutureTask,我们可以在任务执行完成之后执行一些回调逻辑。可是细心的同学会发现,回调方法无法使用任务的返回值,那假如我就是想先获取值然后再用这个返回值做下一步操作怎么办?还是只能先通过get方法阻塞当前线程吗?其实guava包中也给了我们相关的接口。先看一个例子:

public static void main(String[] args) throws InterruptedException {      ListenableFutureTask futureTask = ListenableFutureTask.create(() -> {          System.out.println("执行任务开始  " + LocalDateTime.now());          Thread.sleep(3000);          System.out.println("执行任务完成  " + LocalDateTime.now());          return "结果";      });        Futures.addCallback(futureTask, new FutureCallback<String>() {          @Override          public void onSuccess(String result) {              System.out.println("执行成功: " + result);          }            @Override          public void onFailure(Throwable t) {              System.out.println("执行失败");          }      });        new Thread(futureTask).start();  }
源码分析

FutureCallback接口里面有两个方法,分别对应任务执行成功逻辑和任务失败逻辑

void onSuccess(@Nullable V result);    void onFailure(Throwable t);

Futures可以堪称是一个门面类,里面封装了一些操作

// Futures#addCallback  public static <V> void addCallback(      ListenableFuture<V> future, FutureCallback<? super V> callback) {          // 这里使用了DirectExecutor线程池,即直接在当前线程执行          addCallback(future, callback, directExecutor());  }    // Futures#addCallback  public static <V> void addCallback(final ListenableFuture<V> future, final FutureCallback<? super V> callback, Executor executor) {      Runnable callbackListener =          new Runnable() {              @Override              public void run() {                  final V value;                  try {                      value = getDone(future);                  } catch (ExecutionException e) {                      callback.onFailure(e.getCause());                      return;                  } catch (RuntimeException e) {                      callback.onFailure(e);                      return;                  } catch (Error e) {                      callback.onFailure(e);                      return;                  }                  callback.onSuccess(value);              }          };      // 最终还是将这部分逻辑封装成一个回调,然后在这个回调中获取返回值,根据返回值的结果执行相应的FutureCallback方法      future.addListener(callbackListener, executor);  }    // Futures#getDone  public static <V> V getDone(Future<V> future) throws ExecutionException {      checkState(future.isDone(), "Future was expected to be done: %s", future);      return getUninterruptibly(future);  }  public static <V> V getUninterruptibly(Future<V> future) throws ExecutionException {      boolean interrupted = false;      try {        while (true) {          try {            return future.get();          } catch (InterruptedException e) {            interrupted = true;          }        }      } finally {        if (interrupted) {          Thread.currentThread().interrupt();        }      }  }

本质上其实就是将获取返回值的逻辑封装成一个回调,在这个回调中获取返回值,根据返回值的结果执行相应的FutureCallback方法,不过在使用上却方便了好多。

与我们直接通过get方法获取返回值然后再执行其他逻辑还是有区别的,因为我们直接调用Future#get方法会阻塞当前线程,而guava是在回调中执行这部逻辑,类似于一种通知机制,所以不会阻塞当前线程。

ListenableFutureTask

其实Spring里面也有一个ListenableFutureTask,实现上和guava大同小异,也是继承了FutureTask并且实现了自己的ListenableFuture接口,通过重写FutureTask#done方法,在该方法中获取返回值然后执行回调逻辑

public static void main(String[] args) {      ListenableFutureTask future = new ListenableFutureTask(() -> "结果");        future.addCallback(new ListenableFutureCallback() {          @Override          public void onSuccess(Object result) {              System.out.println("callback " + result);          }            @Override          public void onFailure(Throwable ex) {              System.out.println("执行失败 ");          }      });      new Thread(future).start();  }

核心源码

它的Callback是保存在两个Queue里面的:successCallbacksfailureCallbacks,数据结构是LinkedList

private final Queue<SuccessCallback<? super T>> successCallbacks = new LinkedList<SuccessCallback<? super T>>();    private final Queue<FailureCallback> failureCallbacks = new LinkedList<FailureCallback>();

重写的done方法如下,逻辑很简单,就不解释了

protected void done() {      Throwable cause;      try {          T result = get();          this.callbacks.success(result);          return;      }catch (InterruptedException ex) {          Thread.currentThread().interrupt();          return;      }catch (ExecutionException ex) {          cause = ex.getCause();          if (cause == null) {              cause = ex;          }      }      catch (Throwable ex) {          cause = ex;      }      this.callbacks.failure(cause);  }

CompletableFuture

可能是之前的Future功能太少了,所以Java8推出了CompletableFuture,功能强大,除了上面说的那些功能,还有很多其他的功能,反正就是吊炸天。而且从DUBBO 2.7开始异步处理都是通过CompletableFuture来实现。

CompletableFuture ForkJoinPoll

总结

总结下来就发现,那些很好用的API,真的是封装的好啊。所以,设计模式真的很重要啊,老铁。。。。。。