­

位元組跳動二面!面試官直接問我生產環境下如何監控線程池?還好我看了這篇文章!

  • 2021 年 10 月 19 日
  • 筆記

線程池的監控很重要,對於前面章節講的動態參數調整,其實還是得依賴於線程池監控的數據反饋之後才能做出調整的決策。還有就是線程池本身的運行過程對於我們來說像一個黑盒,我們沒辦法了解線程池中的運行狀態時,出現問題沒有辦法及時判斷和預警。

對於監控這類的場景,核心邏輯就是要拿到關鍵指標,然後進行上報,只要能實時拿到這些關鍵指標,就可以輕鬆實現監控以及預警功能。

ThreadPoolExecutor中提供了以下方法來獲取線程池中的指標。

  • getCorePoolSize():獲取核心線程數。
  • getMaximumPoolSize:獲取最大線程數。
  • getQueue():獲取線程池中的阻塞隊列,並通過阻塞隊列中的方法獲取隊列長度、元素個數等。
  • getPoolSize():獲取線程池中的工作線程數(包括核心線程和非核心線程)。
  • getActiveCount():獲取活躍線程數,也就是正在執行任務的線程。
  • getLargestPoolSize():獲取線程池曾經到過的最大工作線程數。
  • getTaskCount():獲取歷史已完成以及正在執行的總的任務數量。

除此之外,ThreadPoolExecutor中還提供了一些未實現的鉤子方法,我們可以通過重寫這些方法來實現更多指標數據的獲取。

  • beforeExecute,在Worker線程執行任務之前會調用的方法。
  • afterExecute,在Worker線程執行任務之後會調用的方法。
  • terminated,當線程池從狀態變更到TERMINATED狀態之前調用的方法。

比如我們可以在beforeExecute方法中記錄當前任務開始執行的時間,再到afterExecute方法來計算任務執行的耗時、最大耗時、最小耗時、平均耗時等。

線程池監控的基本原理

我們可以通過Spring Boot提供的Actuator,自定義一個Endpoint來發佈線程池的指標數據,實現線程池監控功能。當然,除了Endpoint以外,我們還可以通過JMX的方式來暴露線程池的指標信息,不管通過什麼方法,核心思想都是要有一個地方看到這些數據。

了解對於Spring Boot應用監控得讀者應該知道,通過Endpoint發佈指標數據後,可以採用一些主流的開源監控工具來進行採集和展示。如圖10-9所示,假設在Spring Boot應用中發佈一個獲取線程池指標信息的Endpoint,那麼我們可以採用Prometheus定時去抓取目標服務器上的Metric數據,Prometheus會將採集到的數據通過Retrieval分發給TSDB進行存儲。這些數據可以通過Prometheus自帶的UI進行展示,也可以使用Grafana圖表工具通過PromQL語句來查詢Prometheus中採集的數據進行渲染。最後採用AlertManager這個組件來觸發預警功能。

在這裡插入圖片描述

圖10-9 線程池指標監控

圖10-9中所涉及到的工具都是比較程度的開源監控組件,大家可以自行根據官方教程配置即可,而在本章節中要重點講解的就是如何自定義Endpoint發佈線程池的Metric數據。

在Spring Boot應用中發佈線程池信息

對於線程池的監控實現,筆者開發了一個相對較為完整的小程序,主要涉及到幾個功能:

  • 可以通過配置文件來構建線程池。
  • 擴展了ThreadPoolExecutor的實現。
  • 發佈一個自定義的Endpoint。

該小程序包含的類以及功能說明如下:

  • ThreadPoolExecutorForMonitor:擴展ThreadPoolExecutor的實現類。
  • ThreadPoolConfigurationProperties:綁定application.properties的配置屬性。
  • ThreadPoolForMonitorManager:線程池管理類,實現線程池的初始化。
  • ThreadPoolProperties:線程池基本屬性。
  • ResizeLinkedBlockingQueue:這個類是直接複製了LinkedBlockingQueue,提供了setCapacity方法,在前面有講解到,源碼就不貼出來。
  • ThreadPoolEndpoint:自定義Endpoint。

ThreadPoolExecutorForMonitor

繼承了ThreadPoolExecutor,實現了beforeExecuteafterExecute,在原有線程池的基礎上新增了最短執行時間、最長執行時間、平均執行耗時的屬性。

public class ThreadPoolExecutorForMonitor extends ThreadPoolExecutor {

  private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

  private static final String defaultPoolName="Default-Task";

  private static ThreadFactory threadFactory=new MonitorThreadFactory(defaultPoolName);

  public ThreadPoolExecutorForMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory,defaultHandler);
  }
  public ThreadPoolExecutorForMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,String poolName) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,new MonitorThreadFactory(poolName),defaultHandler);
  }
  public ThreadPoolExecutorForMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler,String poolName) {
    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory,handler);
  }

  //最短執行時間
  private long minCostTime;
  //最長執行時間
  private long maxCostTime;
  //總的耗時
  private AtomicLong totalCostTime=new AtomicLong();

  private ThreadLocal<Long> startTimeThreadLocal=new ThreadLocal<>();

  @Override
  public void shutdown() {
    super.shutdown();
  }

  @Override
  protected void beforeExecute(Thread t, Runnable r) {
    startTimeThreadLocal.set(System.currentTimeMillis());
    super.beforeExecute(t, r);
  }

  @Override
  protected void afterExecute(Runnable r, Throwable t) {
    long costTime=System.currentTimeMillis()-startTimeThreadLocal.get();
    startTimeThreadLocal.remove();
    maxCostTime=maxCostTime>costTime?maxCostTime:costTime;
    if(getCompletedTaskCount()==0){
      minCostTime=costTime;
    }
    minCostTime=minCostTime<costTime?minCostTime:costTime;
    totalCostTime.addAndGet(costTime);
    super.afterExecute(r, t);
  }

  public long getMinCostTime() {
    return minCostTime;
  }

  public long getMaxCostTime() {
    return maxCostTime;
  }

  public long getAverageCostTime(){//平均耗時
    if(getCompletedTaskCount()==0||totalCostTime.get()==0){
      return 0;
    }
    return totalCostTime.get()/getCompletedTaskCount();
  }

  @Override
  protected void terminated() {
    super.terminated();
  }

  static class MonitorThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    MonitorThreadFactory(String poolName) {
      SecurityManager s = System.getSecurityManager();
      group = (s != null) ? s.getThreadGroup() :
      Thread.currentThread().getThreadGroup();
      namePrefix = poolName+"-pool-" +
        poolNumber.getAndIncrement() +
        "-thread-";
    }

    public Thread newThread(Runnable r) {
      Thread t = new Thread(group, r,
                            namePrefix + threadNumber.getAndIncrement(),
                            0);
      if (t.isDaemon())
        t.setDaemon(false);
      if (t.getPriority() != Thread.NORM_PRIORITY)
        t.setPriority(Thread.NORM_PRIORITY);
      return t;
    }
  }
}

ThreadPoolConfigurationProperties

提供了獲取application.properties配置文件屬性的功能,

@ConfigurationProperties(prefix = "monitor.threadpool")
@Data
public class ThreadPoolConfigurationProperties {

    private List<ThreadPoolProperties>  executors=new ArrayList<>();

}

線程池的核心屬性聲明。

@Data
public class ThreadPoolProperties {

    private String poolName;
    private int corePoolSize;
    private int maxmumPoolSize=Runtime.getRuntime().availableProcessors();
    private long keepAliveTime=60;
    private TimeUnit unit= TimeUnit.SECONDS;
    private int queueCapacity=Integer.MAX_VALUE;
}

上述配置類要生效,需要通過@EnableConfigurationProperties開啟,我們可以在Main方法上開啟,代碼如下。

@EnableConfigurationProperties(ThreadPoolConfigurationProperties.class)
@SpringBootApplication
public class ThreadPoolApplication {

    public static void main(String[] args) {
        SpringApplication.run(ThreadPoolApplication.class, args);
    }
}

application.properties

配置類創建好之後,我們就可以在application.properties中,通過如下方式來構建線程池。

monitor.threadpool.executors[0].pool-name=first-monitor-thread-pool
monitor.threadpool.executors[0].core-pool-size=4
monitor.threadpool.executors[0].maxmum-pool-size=8
monitor.threadpool.executors[0].queue-capacity=100

monitor.threadpool.executors[1].pool-name=second-monitor-thread-pool
monitor.threadpool.executors[1].core-pool-size=2
monitor.threadpool.executors[1].maxmum-pool-size=4
monitor.threadpool.executors[1].queue-capacity=40

ThreadPoolForMonitorManager

用來實現線程池的管理和初始化,實現線程池的統一管理,初始化的邏輯是根據application.properties中配置的屬性來實現的。

  • 從配置類中獲得線程池的基本配置。
  • 根據配置信息構建ThreadPoolExecutorForMonitor實例。
  • 把實例信息保存到集合中。
@Component
public class ThreadPoolForMonitorManager {

  @Autowired
  ThreadPoolConfigurationProperties poolConfigurationProperties;

  private final ConcurrentMap<String,ThreadPoolExecutorForMonitor> threadPoolExecutorForMonitorConcurrentMap=new ConcurrentHashMap<>();

  @PostConstruct
  public void init(){
    poolConfigurationProperties.getExecutors().forEach(threadPoolProperties -> {
      if(!threadPoolExecutorForMonitorConcurrentMap.containsKey(threadPoolProperties.getPoolName())){
        ThreadPoolExecutorForMonitor executorForMonitor=new ThreadPoolExecutorForMonitor(
          threadPoolProperties.getCorePoolSize(),
          threadPoolProperties.getMaxmumPoolSize(),
          threadPoolProperties.getKeepAliveTime(),
          threadPoolProperties.getUnit(),
          new ResizeLinkedBlockingQueue<>(threadPoolProperties.getQueueCapacity()),
          threadPoolProperties.getPoolName());
        threadPoolExecutorForMonitorConcurrentMap.put(threadPoolProperties.getPoolName(),executorForMonitor);
      }
    });
  }

  public ThreadPoolExecutorForMonitor getThreadPoolExecutor(String poolName){
    ThreadPoolExecutorForMonitor threadPoolExecutorForMonitor=threadPoolExecutorForMonitorConcurrentMap.get(poolName);
    if(threadPoolExecutorForMonitor==null){
      throw new RuntimeException("找不到名字為"+poolName+"的線程池");
    }
    return threadPoolExecutorForMonitor;
  }

  public ConcurrentMap<String,ThreadPoolExecutorForMonitor> getThreadPoolExecutorForMonitorConcurrentMap(){
    return this.threadPoolExecutorForMonitorConcurrentMap;
  }
}

ThreadPoolEndpoint

使用Spring-Boot-Actuator發佈Endpoint,用來暴露當前應用中所有線程池的Metric數據。

讀者如果不清楚在Spring Boot中自定義Endpoint,可以直接去Spring官方文檔中配置,比較簡單。

@Configuration
@Endpoint(id="thread-pool")
public class ThreadPoolEndpoint {
  @Autowired
  private ThreadPoolForMonitorManager threadPoolForMonitorManager;

  @ReadOperation
  public Map<String,Object> threadPoolsMetric(){
    Map<String,Object> metricMap=new HashMap<>();
    List<Map> threadPools=new ArrayList<>();
    threadPoolForMonitorManager.getThreadPoolExecutorForMonitorConcurrentMap().forEach((k,v)->{
      ThreadPoolExecutorForMonitor tpe=(ThreadPoolExecutorForMonitor) v;
      Map<String,Object> poolInfo=new HashMap<>();
      poolInfo.put("thread.pool.name",k);
      poolInfo.put("thread.pool.core.size",tpe.getCorePoolSize());
      poolInfo.put("thread.pool.largest.size",tpe.getLargestPoolSize());
      poolInfo.put("thread.pool.max.size",tpe.getMaximumPoolSize());
      poolInfo.put("thread.pool.thread.count",tpe.getPoolSize());
      poolInfo.put("thread.pool.max.costTime",tpe.getMaxCostTime());
      poolInfo.put("thread.pool.average.costTime",tpe.getAverageCostTime());
      poolInfo.put("thread.pool.min.costTime",tpe.getMinCostTime());
      poolInfo.put("thread.pool.active.count",tpe.getActiveCount());
      poolInfo.put("thread.pool.completed.taskCount",tpe.getCompletedTaskCount());
      poolInfo.put("thread.pool.queue.name",tpe.getQueue().getClass().getName());
      poolInfo.put("thread.pool.rejected.name",tpe.getRejectedExecutionHandler().getClass().getName());
      poolInfo.put("thread.pool.task.count",tpe.getTaskCount());
      threadPools.add(poolInfo);
    });
    metricMap.put("threadPools",threadPools);
    return metricMap;
  }
}

如果需要上述自定義的Endpoint可以被訪問,還需要在application.properties文件中配置如下代碼,意味着thread-pool Endpoint允許被訪問。

management.endpoints.web.exposure.include=thread-pool

TestController

提供使用線程池的方法,用來實現在調用之前和調用之後,通過Endpoint獲取到Metric數據的變化。

@RestController
public class TestController {

  private final String poolName="first-monitor-thread-pool";
  @Autowired
  ThreadPoolForMonitorManager threadPoolForMonitorManager;

  @GetMapping("/execute")
  public String doExecute(){
    ThreadPoolExecutorForMonitor tpe=threadPoolForMonitorManager.getThreadPoolExecutor(poolName);
    for (int i = 0; i < 100; i++) {
      tpe.execute(()->{
        try {
          Thread.sleep(new Random().nextInt(4000));
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      });
    }
    return "success";
  }
}

效果演示

訪問自定義Endpoint: //ip:8080/actuator/thread-pool,就可以看到如下數據。我們可以把這個Endpoint配置到Prometheus中,Prometheus會定時抓取這些指標存儲並展示,從而完成線程池的整體監控。

{
    "threadPools":[
        {
            "thread.pool.queue.name":"com.concurrent.demo.ResizeLinkedBlockingQueue",
            "thread.pool.core.size":2,
            "thread.pool.min.costTime":0,
            "thread.pool.completed.taskCount":0,
            "thread.pool.max.costTime":0,
            "thread.pool.task.count":0,
            "thread.pool.name":"second-monitor-thread-pool",
            "thread.pool.largest.size":0,
            "thread.pool.rejected.name":"java.util.concurrent.ThreadPoolExecutor$AbortPolicy",
            "thread.pool.active.count":0,
            "thread.pool.thread.count":0,
            "thread.pool.average.costTime":0,
            "thread.pool.max.size":4
        },
        {
            "thread.pool.queue.name":"com.concurrent.demo.ResizeLinkedBlockingQueue",
            "thread.pool.core.size":4,
            "thread.pool.min.costTime":65,
            "thread.pool.completed.taskCount":115,
            "thread.pool.max.costTime":3964,
            "thread.pool.task.count":200,
            "thread.pool.name":"first-monitor-thread-pool",
            "thread.pool.largest.size":4,
            "thread.pool.rejected.name":"java.util.concurrent.ThreadPoolExecutor$AbortPolicy",
            "thread.pool.active.count":4,
            "thread.pool.thread.count":4,
            "thread.pool.average.costTime":1955,
            "thread.pool.max.size":8
        }
    ]
}

總結

線程池的整體實現並不算太複雜,但是裏面涉及到的一些思想和理論是可以值得我們去學習和借鑒,如基於阻塞隊列的生產者消費者模型的實現、動態擴容的思想、如何通過AQS來實現安全關閉線程池、降級方案(拒絕策略)、位運算等。實際上越底層的實現,越包含更多技術層面的思想和理論。

線程池在實際使用中,如果是新手,不建議直接用Executors中提供的工廠方法,因為線程池中的參數會影響到內存以及CPU資源的佔用,我們可以自己集成ThreadPoolExecutor這個類,擴展一個自己的實現,也可以自己構造ThreadPoolExecutor實例,這樣能夠更好的了解線程池中核心參數的意義避免不必要的生產問題。
關注[跟着Mic學架構]公眾號,獲取更多精品原創