线程池的学习和使用

什么是线程池

线程池的作用是初始化一些线程,当有任务的时候,就从中启动一个来执行相关任务,执行完后,线程资源重新回收到线程池中,达到复用的效果,从而减少资源的开销

创建线程池

在JDK中,Executors类已经帮我们封装了创建线程池的方法。

    Executors.newFixedThreadPool();      Executors.newCachedThreadPool();      Executors.newScheduledThreadPool();

但是点进去看的话,

public static ExecutorService newFixedThreadPool(int nThreads) {      return new ThreadPoolExecutor(nThreads, nThreads,                                    0L, TimeUnit.MILLISECONDS,                                    new LinkedBlockingQueue<Runnable>());  }

它的内部实现还是基于ThreadPoolExecutor来实现的。通过阿里代码规范插件扫描会提示我们用ThreadPoolExecutor去实现线程池。通过查看ThreadPoolExecutor的构造方法

public ThreadPoolExecutor(int corePoolSize,                                int maximumPoolSize,                                long keepAliveTime,                                TimeUnit unit,                                BlockingQueue<Runnable> workQueue,                                ThreadFactory threadFactory,                                RejectedExecutionHandler handler) {                                      ...                                    do something                                    ...                                }

我觉得有以下几方面的原因。

  1. 可以灵活设置keepAliveTime(当线程池中线程数大于corePoolSize的数m, 为这m个线程设置的最长等待时间 ),节约系统资源。
  2. workQueue:线程等待队列,在Executors中默认的是LinkedBlockingQueue。可以理解是一种无界的数组,当有不断有线程来的时候,可能会撑爆机器内存。
  3. 可以设线程工厂,里面添加自己想要的一些元素,只需要实现JDK的ThreadFactory类。
  4. 按照自己的业务设置合适的拒绝策略。策略有以下几种
    1. AbortPolicy:直接抛出拒绝异常(继承自RuntimeException),会中断调用者的处理过程,所以除非有明确需求,一般不推荐
    2. DiscardPolicy:默默丢弃无法加载的任务。
    3. DiscardOldestPolicy:丢弃队列中最老的,然后再次尝试提交新任务。
    4. CallerRunsPolicy:在调用者线程中(也就是说谁把 r 这个任务甩来的),运行当前被丢弃的任务。只会用调用者所在线程来运行任务,也就是说任务不会进入线程池。如果线程池已经被关闭,则直接丢弃该任务。

使用线程池

声明ThreadFactory

public class NacosSyncThreadFactory implements ThreadFactory {      private final AtomicInteger threadNum = new AtomicInteger(1);      private String threadPrefix = null;      private ThreadGroup threadGroup;        public NacosSyncThreadFactory(String prefix) {          this.threadPrefix = "thread" + "-" + prefix + "-" ;          threadGroup = Thread.currentThread().getThreadGroup();        }        public NacosSyncThreadFactory() {          this("pool");      }        @Override      public Thread newThread(Runnable r) {          String name = threadPrefix + threadNum.incrementAndGet();          Thread thread = new Thread(threadGroup, r, name);          return thread;      }  }

创建线程池类

public class MyThreadPool {      private ThreadFactory threadFactory;      private int threadNum;      private BlockingQueue blockingQueue;      private RejectedExecutionHandler handler;        public MyThreadPool(ThreadFactory threadFactory, int threadNum,                          BlockingQueue blockingQueue,                          RejectedExecutionHandler handler ) {          this.threadFactory = threadFactory;          this.threadNum = threadNum;          this.blockingQueue = blockingQueue;          this.handler = handler;      }        public MyThreadPool() {          this(Executors.defaultThreadFactory(), 10,                  new ArrayBlockingQueue(10), new ThreadPoolExecutor.AbortPolicy());      }        public ThreadPoolExecutor initThreadPool(ThreadFactory threadFactory, int threadNum, BlockingQueue blockingQueue, RejectedExecutionHandler handler) {          if (handler == null) {              handler = new ThreadPoolExecutor.AbortPolicy();          }              return new ThreadPoolExecutor(1, threadNum, 5, TimeUnit.SECONDS, blockingQueue, threadFactory, handler);      }    }

调用线程池

  1. 初始化线程池类

      MyThreadPool myThreadPool = new MyThreadPool();        threadPoolExecutor = myThreadPool.initThreadPool(              new NacosSyncThreadFactory("nacos-sync"),              threadNum,              new ArrayBlockingQueue(10),              new ThreadPoolExecutor.DiscardPolicy()        );    }
  2. 创建Callable(FutureTask)

    /**   * 分页获取task信息   * @return   */  private List<Task> getTask(int pageNum) {     IPage<Task> page = new Page(pageNum, 25);     IPage<Task> taskIPage = this.taskService.page(page);     if (null == taskIPage || CollectionUtils.isEmpty(taskIPage.getRecords())) {         return null;     }       return taskIPage.getRecords();    }  // 执行任务  private FutureTask<String> assembleTaskFuture(Task task) {      FutureTask<String> futureTask = new FutureTask(() -> {            // 执行任务          this.doSyncWork(task);          return "success";      });        return futureTask;  }
  3. 执行任务(FutureTask)

    public void zkSync() {      // 获取数据总数,得到线程数      int count = this.taskService.count();      int pageSize = 25;      int num = count / pageSize;      int pageTotal = count % pageSize == 0 ? num : num + 1;      log.info("========总记录数:{}=====总页数:{}", count, pageTotal);        for (int i = 1; i <=  pageTotal; i++) {          List<Task> taskList = this.getTask(i);          if (CollectionUtils.isEmpty(taskList)) {              break;          }          List<Integer> collect = taskList.stream().map(task -> task.getId()).collect(Collectors.toList());          taskList.forEach(task -> {              FutureTask<String> futureTask = this.assembleTaskFuture(task);              threadPoolExecutor.execute(futureTask);          });        }        threadPoolExecutor.shutdown();    }