­

執行緒池的學習和使用

什麼是執行緒池

執行緒池的作用是初始化一些執行緒,當有任務的時候,就從中啟動一個來執行相關任務,執行完後,執行緒資源重新回收到執行緒池中,達到復用的效果,從而減少資源的開銷

創建執行緒池

在JDK中,Executors類已經幫我們封裝了創建執行緒池的方法。

    Executors.newFixedThreadPool();      Executors.newCachedThreadPool();      Executors.newScheduledThreadPool();

但是點進去看的話,

public static ExecutorService newFixedThreadPool(int nThreads) {      return new ThreadPoolExecutor(nThreads, nThreads,                                    0L, TimeUnit.MILLISECONDS,                                    new LinkedBlockingQueue<Runnable>());  }

它的內部實現還是基於ThreadPoolExecutor來實現的。通過阿里程式碼規範插件掃描會提示我們用ThreadPoolExecutor去實現執行緒池。通過查看ThreadPoolExecutor的構造方法

public ThreadPoolExecutor(int corePoolSize,                                int maximumPoolSize,                                long keepAliveTime,                                TimeUnit unit,                                BlockingQueue<Runnable> workQueue,                                ThreadFactory threadFactory,                                RejectedExecutionHandler handler) {                                      ...                                    do something                                    ...                                }

我覺得有以下幾方面的原因。

  1. 可以靈活設置keepAliveTime(當執行緒池中執行緒數大於corePoolSize的數m, 為這m個執行緒設置的最長等待時間 ),節約系統資源。
  2. workQueue:執行緒等待隊列,在Executors中默認的是LinkedBlockingQueue。可以理解是一種無界的數組,當有不斷有執行緒來的時候,可能會撐爆機器記憶體。
  3. 可以設執行緒工廠,裡面添加自己想要的一些元素,只需要實現JDK的ThreadFactory類。
  4. 按照自己的業務設置合適的拒絕策略。策略有以下幾種
    1. AbortPolicy:直接拋出拒絕異常(繼承自RuntimeException),會中斷調用者的處理過程,所以除非有明確需求,一般不推薦
    2. DiscardPolicy:默默丟棄無法載入的任務。
    3. DiscardOldestPolicy:丟棄隊列中最老的,然後再次嘗試提交新任務。
    4. CallerRunsPolicy:在調用者執行緒中(也就是說誰把 r 這個任務甩來的),運行當前被丟棄的任務。只會用調用者所在執行緒來運行任務,也就是說任務不會進入執行緒池。如果執行緒池已經被關閉,則直接丟棄該任務。

使用執行緒池

聲明ThreadFactory

public class NacosSyncThreadFactory implements ThreadFactory {      private final AtomicInteger threadNum = new AtomicInteger(1);      private String threadPrefix = null;      private ThreadGroup threadGroup;        public NacosSyncThreadFactory(String prefix) {          this.threadPrefix = "thread" + "-" + prefix + "-" ;          threadGroup = Thread.currentThread().getThreadGroup();        }        public NacosSyncThreadFactory() {          this("pool");      }        @Override      public Thread newThread(Runnable r) {          String name = threadPrefix + threadNum.incrementAndGet();          Thread thread = new Thread(threadGroup, r, name);          return thread;      }  }

創建執行緒池類

public class MyThreadPool {      private ThreadFactory threadFactory;      private int threadNum;      private BlockingQueue blockingQueue;      private RejectedExecutionHandler handler;        public MyThreadPool(ThreadFactory threadFactory, int threadNum,                          BlockingQueue blockingQueue,                          RejectedExecutionHandler handler ) {          this.threadFactory = threadFactory;          this.threadNum = threadNum;          this.blockingQueue = blockingQueue;          this.handler = handler;      }        public MyThreadPool() {          this(Executors.defaultThreadFactory(), 10,                  new ArrayBlockingQueue(10), new ThreadPoolExecutor.AbortPolicy());      }        public ThreadPoolExecutor initThreadPool(ThreadFactory threadFactory, int threadNum, BlockingQueue blockingQueue, RejectedExecutionHandler handler) {          if (handler == null) {              handler = new ThreadPoolExecutor.AbortPolicy();          }              return new ThreadPoolExecutor(1, threadNum, 5, TimeUnit.SECONDS, blockingQueue, threadFactory, handler);      }    }

調用執行緒池

  1. 初始化執行緒池類

      MyThreadPool myThreadPool = new MyThreadPool();        threadPoolExecutor = myThreadPool.initThreadPool(              new NacosSyncThreadFactory("nacos-sync"),              threadNum,              new ArrayBlockingQueue(10),              new ThreadPoolExecutor.DiscardPolicy()        );    }
  2. 創建Callable(FutureTask)

    /**   * 分頁獲取task資訊   * @return   */  private List<Task> getTask(int pageNum) {     IPage<Task> page = new Page(pageNum, 25);     IPage<Task> taskIPage = this.taskService.page(page);     if (null == taskIPage || CollectionUtils.isEmpty(taskIPage.getRecords())) {         return null;     }       return taskIPage.getRecords();    }  // 執行任務  private FutureTask<String> assembleTaskFuture(Task task) {      FutureTask<String> futureTask = new FutureTask(() -> {            // 執行任務          this.doSyncWork(task);          return "success";      });        return futureTask;  }
  3. 執行任務(FutureTask)

    public void zkSync() {      // 獲取數據總數,得到執行緒數      int count = this.taskService.count();      int pageSize = 25;      int num = count / pageSize;      int pageTotal = count % pageSize == 0 ? num : num + 1;      log.info("========總記錄數:{}=====總頁數:{}", count, pageTotal);        for (int i = 1; i <=  pageTotal; i++) {          List<Task> taskList = this.getTask(i);          if (CollectionUtils.isEmpty(taskList)) {              break;          }          List<Integer> collect = taskList.stream().map(task -> task.getId()).collect(Collectors.toList());          taskList.forEach(task -> {              FutureTask<String> futureTask = this.assembleTaskFuture(task);              threadPoolExecutor.execute(futureTask);          });        }        threadPoolExecutor.shutdown();    }