面試官問:「在項目中用過多線程嗎?」你就把這個案例講給他聽!

在面試當中,有時候會問到你在項目中用過多線程么?

對於普通的應屆生或者工作時間不長的初級開發 ???—— crud仔流下了沒有技術的眼淚。

流下了沒技術的眼淚_流下_眼淚_技術表情

博主這裡整理了項目中用到了多線程的一個簡單的實例,希望能對你有所啟發。

多線程開發實例

應用背景

應用的背景非常簡單,博主做的項目是一個審核類的項目,審核的數據需要推送給第三方監管系統,這只是一個很簡單的對接,但是存在一個問題。

我們需要推送的數據大概三十萬條,但是第三方監管提供的接口只支持單條推送(別問為什麼不支持批量,問就是沒過)。可以估算一下,三十萬條數據,一條數據按3秒算,大概需要250(為什麼恰好會是這個數)個小時。

所以就考慮到引入多線程來進行並發操作,降低數據推送的時間,提高數據推送的實時性。

業務示例

設計要點

防止重複

我們推送給第三方的數據肯定是不能重複推送的,必須要有一個機制保證各個線程推送數據的隔離。

這裡有兩個思路:

    1. 將所有數據取到集合(內存)中,然後進行切割,每個線程推送不同段的數據
    1. 利用 數據庫分頁的方式,每個線程取 [start,limit] 區間的數據推送,我們需要保證start的一致性

這裡採用了第二種方式,因為考慮到可能數據量後續會繼續增加,把所有數據都加載到內存中,可能會有比較大的內存佔用。

失敗機制

我們還得考慮到線程推送數據失敗的情況。

如果是自己的系統,我們可以把多線程調用的方法抽出來加一個事務,一個線程異常,整體回滾。

但是是和第三方的對接,我們都沒法做事務的,所以,我們採用了直接在數據庫記錄失敗狀態的方法,可以在後面用其它方式處理失敗的數據。

線程池選擇

在實際使用中,我們肯定是要用到線程池來管理線程,關於線程池,我們常用 ThreadPoolExecutor提供的線程池服務,SpringBoot中同樣也提供了線程池異步的方式,雖然SprignBoot異步可能更方便一點,但是使用ThreadPoolExecutor更加直觀地控制線程池,所以我們直接使用ThreadPoolExecutor構造方法創建線程池。

大概的技術設計示意圖:

設計示意圖

核心代碼

上面叭叭了一堆,到了show you code的環節了。我將項目里的代碼抽取出來,簡化出了一個示例。

核心代碼如下:

/**
 * @Author 三分惡
 * @Date 2021/3/5
 * @Description
 */
@Service
public class PushProcessServiceImpl implements PushProcessService {
    @Autowired
    private PushUtil pushUtil;
    @Autowired
    private PushProcessMapper pushProcessMapper;

    private final static Logger logger = LoggerFactory.getLogger(PushProcessServiceImpl.class);

    //每個線程每次查詢的條數
    private static final Integer LIMIT = 5000;
    //起的線程數
    private static final Integer THREAD_NUM = 5;
    //創建線程池
    ThreadPoolExecutor pool = new ThreadPoolExecutor(THREAD_NUM, THREAD_NUM * 2, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100));

    @Override
    public void pushData() throws ExecutionException, InterruptedException {
        //計數器,需要保證線程安全
        int count = 0;
        //未推送數據總數
        Integer total = pushProcessMapper.countPushRecordsByState(0);
        logger.info("未推送數據條數:{}", total);
        //計算需要多少輪
        int num = total / (LIMIT * THREAD_NUM) + 1;
        logger.info("要經過的輪數:{}", num);
        //統計總共推送成功的數據條數
        int totalSuccessCount = 0;
        for (int i = 0; i < num; i++) {
            //接收線程返回結果
            List<Future<Integer>> futureList = new ArrayList<>(32);
            //起THREAD_NUM個線程並行查詢更新庫,加鎖
            for (int j = 0; j < THREAD_NUM; j++) {
                synchronized (PushProcessServiceImpl.class) {
                    int start = count * LIMIT;
                    count++;
                    //提交線程,用數據起始位置標識線程
                    Future<Integer> future = pool.submit(new PushDataTask(start, LIMIT, start));
                    //先不取值,防止阻塞,放進集合
                    futureList.add(future);
                }
            }
            //統計本輪推送成功數據
            for (Future f : futureList) {
                totalSuccessCount = totalSuccessCount + (int) f.get();
            }
        }
        //更新推送標誌
        pushProcessMapper.updateAllState(1);
        logger.info("推送數據完成,需推送數據:{},推送成功:{}", total, totalSuccessCount);
    }

    /**
     * 推送數據線程類
     */
    class PushDataTask implements Callable<Integer> {
        int start;
        int limit;
        int threadNo;   //線程編號

        PushDataTask(int start, int limit, int threadNo) {
            this.start = start;
            this.limit = limit;
            this.threadNo = threadNo;
        }

        @Override
        public Integer call() throws Exception {
            int count = 0;
            //推送的數據
            List<PushProcess> pushProcessList = pushProcessMapper.findPushRecordsByStateLimit(0, start, limit);
            if (CollectionUtils.isEmpty(pushProcessList)) {
                return count;
            }
            logger.info("線程{}開始推送數據", threadNo);
            for (PushProcess process : pushProcessList) {
                boolean isSuccess = pushUtil.sendRecord(process);
                if (isSuccess) {   //推送成功
                    //更新推送標識
                    pushProcessMapper.updateFlagById(process.getId(), 1);
                    count++;
                } else {  //推送失敗
                    pushProcessMapper.updateFlagById(process.getId(), 2);
                }
            }
            logger.info("線程{}推送成功{}條", threadNo, count);
            return count;
        }
    }
}

代碼很長,我們簡單說一下關鍵的地方:

  • 線程創建:線程內部類選擇了實現Callable接口,這樣方便獲取線程任務執行的結果,在示例里用於統計線程推送成功的數量
 class PushDataTask implements Callable<Integer> {
  • 使用 ThreadPoolExecutor 創建線程池,
  //創建線程池
      ThreadPoolExecutor pool = new ThreadPoolExecutor(THREAD_NUM, THREAD_NUM * 2, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100));

主要構造參數如下:

​ – corePoolSize:線程核心參數選擇了5

​ – maximumPoolSize:最大線程數選擇了核心線程數2倍數

​ – keepAliveTime:非核心閑置線程存活時間直接置為0

​ – unit:非核心線程保持存活的時間選擇了 TimeUnit.SECONDS 秒

​ – workQueue:線程池等待隊列,使用 容量初始為100的 LinkedBlockingQueue阻塞隊列

這裡還有沒寫出來的線程池拒絕策略,採用了默認AbortPolicy:直接丟棄任務,拋出異常。

  • 使用 synchronized 來保證線程安全,保證計數器的增加是有序的
  synchronized (PushProcessServiceImpl.class) {
  • 使用集合來接收線程的運行結果,防止阻塞
List<Future<Integer>> futureList = new ArrayList<>(32);

好了,主要的代碼和簡單的解析就到這裡了。

關於這個簡單的demo,這裡只是簡單地做推送數據處理。考慮一下,這個實例是不是可以用在你項目的某些地方。例如監管系統的數據校驗、審計系統的數據統計、電商系統的數據分析等等,只要是有大量數據處理的地方,都可以把這個例子結合到你的項目里,這樣你就有了多線程開發的經驗。

完整代碼倉庫地址在文章底部👇👇

對線面試官

  • 面試官:小夥子,不錯,你這個整挺好。
  • 老三:那是自然。
  • 面試官:呦,小夥子,挺自信,那我得好好考考你。
  • 老三:放馬過來,但考無妨。

配圖

面試官:先從最簡單的開始,說說什麼是線程吧

要說線程,必先說進程。

進程是程序的⼀次執⾏過程,是系統運⾏程序的基本單位,因此進程是動態的。系統運⾏⼀個程序即是⼀個進程從創建,運⾏到消亡的過程。

線程與進程相似,但線程是⼀個⽐進程更⼩的執⾏單位。⼀個進程在其執⾏的過程中可以產⽣多個線程。與進程不同的是同類的多個線程共享進程的堆和⽅法區資源,但每個線程有⾃⼰的程序計數器、虛擬機棧和本地⽅法棧,所以系統在產⽣⼀個線程,或是在各個線程之間作切換⼯作時,負擔要⽐進程⼩得多,也正因為如此,線程也被稱為輕量級進程。

面試官:說說Java里怎麼創建線程吧

Java里創建線程主要有三種方式:

  • 繼承 Thread類:Thread 類本質上是實現了 Runnable 接口的一個實例,代表一個線程的實例。啟動線程的唯一方法就是通過 Thread 類的 start()實例方法。start()方法是一個 native 方法,它將啟動一個新線程,並執行 run()方法。

  • 實現 Runnable接口:如果自己的類已經 extends 另一個類,就無法直接 extends Thread,此時,可以實現一個Runnable 接口。

  • 實現 Callable接口:實現Callable接口,重寫call()方法,可以返回一個 Future類型的返回值。我在上面的例子里就是用到了這種方式。

面試官:說說線程的生命周期和狀態

在Java中,線程共有六種狀態:

狀態 說明
NEW 初始狀態:線程被創建,但還沒有調用start()方法
RUNNABLE 運行狀態:Java線程將操作系統中的就緒和運行兩種狀態籠統的稱作「運行」
BLOCKED 阻塞狀態:表示線程阻塞於鎖
WAITING 等待狀態:表示線程進入等待狀態,進入該狀態表示當前線程需要等待其他線程做出一些特定動作(通知或中斷)
TIME_WAITING 超時等待狀態:該狀態不同於 WAITIND,它是可以在指定的時間自行返回的
TERMINATED 終止狀態:表示當前線程已經執行完畢

線程在自身的生命周期中, 並不是固定地處於某個狀態,而是隨着代碼的執行在不同的狀態之間進行切換,Java線程狀態變化如圖示:

Java線程狀態變化

面試官:我看你提到了線程阻塞,那你再說說線程死鎖吧

線程死鎖描述的是這樣⼀種情況:多個線程同時被阻塞,它們中的⼀個或者全部都在等待某個資源被釋放。由於線程被⽆限期地阻塞,因此程序不可能正常終⽌。

如下圖所示,線程 A 持有資源 2,線程 B 持有資源 1,他們同時都想申請對⽅的資源,所以這兩個線程就會互相等待⽽進⼊死鎖狀態。

image-20210306115418949

產生死鎖必須滿足四個條件:

  1. 互斥條件:該資源任意⼀個時刻只由⼀個線程占⽤。

  2. 請求與保持條件:⼀個進程因請求資源⽽阻塞時,對已獲得的資源保持不放。

  3. 不剝奪條件:線程已獲得的資源在末使⽤完之前不能被其他線程強⾏剝奪,只有⾃⼰使⽤完畢後才釋放資源。

  4. 循環等待條件:若⼲進程之間形成⼀種頭尾相接的循環等待資源關係。

面試官:怎麼避免死鎖呢?

我上⾯說了產⽣死鎖的四個必要條件,為了避免死鎖,我們只要破壞產⽣死鎖的四個條件中的其中⼀個就可以了。

  1. 破壞互斥條件 :這個條件我們沒有辦法破壞,因為我們⽤鎖本來就是想讓他們互斥的(臨界資源需要互斥訪問)。

  2. 破壞請求與保持條件 :⼀次性申請所有的資源。

  3. 破壞不剝奪條件 :占⽤部分資源的線程進⼀步申請其他資源時,如果申請不到,可以主動釋放它佔有的資源。

  4. 破壞循環等待條件 :靠按序申請資源來預防。按某⼀順序申請資源,釋放資源則反序釋放。破壞循環等待條件。

面試官:我看你的例子里用到了synchronized,說說 synchronized的用法吧

synchronized 關鍵字最主要的三種使⽤⽅式:

1.修飾實例⽅法: 作⽤於當前對象實例加鎖,進⼊同步代碼前要獲得 當前對象實例的鎖

synchronized void method() {
 //業務代碼
}

2.修飾靜態⽅法: 也就是給當前類加鎖,會作⽤於類的所有對象實例 ,進⼊同步代碼前要獲得當前 class 的鎖。因為靜態成員不屬於任何⼀個實例對象,是類成員( static 表明這是該類的⼀個靜態資源,不管 new 了多少個對象,只有⼀份)。所以,如果⼀個線程 A 調⽤⼀個實例對象的⾮靜態 synchronized ⽅法,⽽線程 B 需要調⽤這個實例對象所屬類的靜態 synchronized ⽅法,是允許的,不會發⽣互斥現象,因為訪問靜態 synchronized ⽅法佔⽤的鎖是當前類的鎖,⽽訪問⾮靜態 synchronized ⽅法佔⽤的鎖是當前實例對象鎖。

synchronized void staic method() {
 //業務代碼
}

3.修飾代碼塊 :指定加鎖對象,對給定對象/類加鎖。 synchronized(this|object) 表示進⼊同步代碼庫前要獲得給定對象的鎖。 synchronized(類.class) 表示進⼊同步代碼前要獲得 當前 class 的鎖

synchronized(this) {
 //業務代碼
}

在我的例子里使用synchronized修飾代碼塊,給PushProcessServiceImpl類加鎖,進⼊同步代碼前要獲得 當前 class 的鎖,防止PushProcessServiceImpl類的對象在控制層調用推送數據的方法。

面試官:除了使用synchronized,還有什麼辦法來加鎖嗎?詳細說一下

可以使用juc包提供的鎖。Lock接口主要相關的類和接口如下。

image-20210306162316895

Lock中的主要方法:

  • lock:用來獲取鎖,如果鎖被其他線程獲取,進入等待狀態。
  • lockInterruptibly:通過這個方法去獲取鎖時,如果線程正在等待獲取鎖,則這個線程能夠響應中斷,即中斷線程的等待狀態。
  • tryLock:tryLock方法是有返回值的,它表示用來嘗試獲取鎖,如果獲取成功,則返回true,如果獲取失敗(即鎖已被其他線程獲取),則返回false。
  • tryLock(long,TimeUnit):與tryLock類似,只不過是有等待時間,在等待時間內獲取到鎖返回true,超時返回false。
  • unlock:釋放鎖。

其它接口和類:

  • ReetrantLock(可重入鎖):實現了Lock接口,可重入鎖,內部定義了公平鎖與非公平鎖。可以完成synchronized 所能完成的所有工作。
  • ReadWriteLock(讀寫鎖):
public interface ReadWriteLock {  
    Lock readLock();       //獲取讀鎖  
    Lock writeLock();      //獲取寫鎖  
}  

一個用來獲取讀鎖,一個用來獲取寫鎖。也就是說將文件的讀寫操作分開,分成2個鎖來分配給線程,從而使得多個線程可以同時進行讀操作。

  • ReetrantReadWriteLock(可重入讀寫鎖):ReetrantReadWriteLock同樣支持公平性選擇,支持重進入,鎖降級。

面試官:說說synchronized和Lock的區別

類別 synchronized Lock
存在層次 Java的關鍵字,在jvm層面上 是一個接口,api級別
鎖的釋放 1、以獲取鎖的線程執行完同步代碼,釋放鎖 2、線程執行發生異常,jvm會讓線程釋放鎖 在finally中必須釋放鎖,不然容易造成線程死鎖
鎖的獲取 假設A線程獲得鎖,B線程等待。如果A線程阻塞,B線程會一直等待 分情況而定,Lock有多個鎖獲取的方式,具體下面會說道,大致就是可以嘗試獲得鎖,線程可以不用一直等待
鎖狀態 無法判斷 可以判斷
鎖類型 可重入 不可中斷 非公平 可重入 可判斷 可公平(兩者皆可)
性能 少量同步 大量同步

面試官:你提到了synchronized基於jvm層面,對這個有了解嗎?

synchronized是利用java提供的原⼦性內置鎖(monitor 對象),每個對象中都內置了⼀個 ObjectMonitor 對象。這種內置的並且使⽤者看不到的鎖也被稱為監視器鎖。

同步語句塊

synchronized 同步語句塊的實現使⽤的是 monitorentermonitorexit 指令,其中monitorenter 指令指向同步代碼塊的開始位置monitorexit 指令則指明同步代碼塊的結束位置。

執⾏monitorenter指令時會嘗試獲取內置鎖,如果對象沒有被鎖定或者已經獲得了鎖,鎖的計數器+1。此時其他競爭鎖的線程則會進⼊等待隊列中。

執⾏monitorexit指令時則會把計數器-1,當計數器值為0時,則鎖釋放,處於等待隊列中的線程再繼續競爭鎖。

synchronized 修飾⽅法

synchronized 修飾的⽅法並沒有 monitorenter 指令和 monitorexit 指令,取得代之的確實是ACC_SYNCHRONIZED 標識,該標識指明了該⽅法是⼀個同步⽅法。JVM 通過該ACC_SYNCHRONIZED 訪問標誌來辨別⼀個⽅法是否聲明為同步⽅法,從⽽執⾏相應的同步調⽤。

當然,二者細節略有不同,但本質上都是獲取原子性內置鎖。

再深入一點,synchronized實際上有兩個隊列waitSet和entryList。

  1. 當多個線程進⼊同步代碼塊時,⾸先進⼊entryList

  2. 有⼀個線程獲取到monitor鎖後,就賦值給當前線程,並且計數器+1

  3. 如果線程調⽤wait⽅法,將釋放鎖,當前線程置為null,計數器-1,同時進⼊waitSet等待被喚醒,調⽤notify或者notifyAll之後⼜會進⼊entryList競爭鎖

  4. 如果線程執⾏完畢,同樣釋放鎖,計數器-1,當前線程置為null

image-20210306140522318

synchronized的優化能說一說嗎?

從JDK1.6版本之後,synchronized本身也在不斷優化鎖的機制,有些情況下他並不會是⼀個很重量級的鎖。優化機制包括⾃適應鎖、⾃旋鎖、鎖消除、鎖粗化、偏向鎖、輕量級鎖。

鎖的狀態從低到⾼依次為⽆鎖->偏向鎖->輕量級鎖->重量級鎖,升級的過程就是從低到⾼。

img

自旋鎖:由於⼤部分時候,鎖被占⽤的時間很短,共享變量的鎖定時間也很短,所有沒有必要掛起線程,⽤戶態和內核態的來回上下⽂切換嚴重影響性能。⾃旋的概念就是讓線程執⾏⼀個忙循環,可以理解為就是啥也不⼲,防⽌從⽤戶態轉⼊內核態,⾃旋鎖可以通過設置-XX:+UseSpining來開啟,⾃旋的默認次數是10次,可以使⽤-XX:PreBlockSpin設置。

自適應鎖:自適應鎖就是自適應的自旋鎖,自旋鎖的時間不是固定時間,而是由前⼀次在同⼀個鎖上的⾃旋時間和鎖的持有者狀態來決定。

鎖消除:鎖消除指的是JVM檢測到⼀些同步的代碼塊,完全不存在數據競爭的場景,也就是不需要加鎖,就會進⾏鎖消除。

鎖粗化:鎖粗化指的是有很多操作都是對同⼀個對象進⾏加鎖,就會把鎖的同步範圍擴展到整個操作序列之外。

偏向鎖:當線程訪問同步塊獲取鎖時,會在對象頭和棧幀中的鎖記錄⾥存儲偏向鎖的線程ID,之後這個線程再次進⼊同步塊時都不需要CAS來加鎖和解鎖了,偏向鎖會永遠偏向第⼀個獲得鎖的線程,如果後續沒有其他線程獲得過這個鎖,持有鎖的線程就永遠不需要進⾏同步,反之,當有其他線程競爭偏向鎖時,持有偏向鎖的線程就會釋放偏向鎖。可以⽤過設置-XX:+UseBiasedLocking開啟偏向鎖。

輕量級鎖:JVM的對象的對象頭中包含有⼀些鎖的標誌位,代碼進⼊同步塊的時候,JVM將會使⽤CAS⽅式來嘗試獲取鎖,如果更新成功則會把對象頭中的狀態位標記為輕量級鎖,如果更新失敗,當前線程就嘗試⾃旋來獲得鎖。

鎖升級的過程非常複雜,簡單點說,偏向鎖就是通過對象頭的偏向線程ID來對⽐,甚⾄都不需要CAS了,⽽輕量級鎖主要就是通過CAS修改對象頭鎖記錄和⾃旋來實現,重量級鎖則是除了擁有鎖的線程其他全部阻塞。

image-20210306142412015

面試官:說一下CAS

CAS(Compare And Swap/Set)比較並交換,CAS 算法的過程是這樣:它包含 3 個參數CAS(V,E,N)。V 表示要更新的變量(內存值),E 表示預期值(舊的),N 表示新值。當且僅當 V 值等於 E 值時,才會將 V 的值設為 N,如果 V 值和 E 值不同,則說明已經有其他線程做了更新,則當前線程什麼都不做。最後,CAS 返回當前 V 的真實值。

CAS是一種樂觀鎖,它總是認為自己可以成功完成操作。當多個線程同時使用 CAS 操作一個變量時,只有一個會勝出,並成功更新,其餘均會失敗。失敗的線程不會被掛起,僅是被告知失敗,並且允許再次嘗試,當然也允許失敗的線程放棄操作。基於這樣的原理,CAS 操作即使沒有鎖,也可以發現其他線程對當前線程的干擾,並進行恰當的處理。

java.util.concurrent.atomic 包下的類大多是使用 CAS 操作來實現的 (AtomicInteger,AtomicBoolean,AtomicLong)。

面試官:CAS會導致什麼問題?

  1. ABA 問題:

比如說一個線程 one 從內存位置 V 中取出 A,這時候另一個線程 two 也從內存中取出 A,並且 two 進行了一些操作變成了 B,然後 two 又將 V 位置的數據變成 A,這時候線程 one 進行 CAS 操作發現內存中仍然是 A,然後 one 操作成功。儘管線程 one 的 CAS 操作成功,但可能存在潛藏的問題。從 Java1.5 開始 JDK 的 atomic 包里提供了一個類 AtomicStampedReference 來解決 ABA 問題。

  1. 循環時間長開銷大:

對於資源競爭嚴重(線程衝突嚴重)的情況,CAS 自旋的概率會比較大,從而浪費更多的 CPU 資源,效率低於 synchronized。

  1. 只能保證一個共享變量的原子操作:

當對一個共享變量執行操作時,我們可以使用循環 CAS 的方式來保證原子操作,但是對多個共享變量操作時,循環 CAS 就無法保證操作的原子性,這個時候就可以用鎖。

面試官:能說一下說下ReentrantLock原理嗎

ReentrantLock 是基於 Lock 實現的可重入鎖,所有的 Lock 都是基於 AQS 實現的,AQS 和 Condition 各自維護不同的對象,在使用 Lock 和 Condition 時,其實就是兩個隊列的互相移動。它所提供的共享鎖、互斥鎖都是基於對 state 的操作。

面試官:能說一下AQS嗎

AbstractQueuedSynchronizer,抽象的隊列式的同步器,AQS 定義了一套多線程訪問共享資源的同步器框架,許多同步類實現都依賴於它,如常用的

ReentrantLock/Semaphore/CountDownLatch。

AQS 核⼼思想是,如果被請求的共享資源空閑,則將當前請求資源的線程設置為有效的⼯作線程,並且將共享資源設置為鎖定狀態。如果被請求的共享資源被占⽤,那麼就需要⼀套線程阻塞等待以及被喚醒時鎖分配的機制,這個機制 AQS 是⽤ CLH 隊列鎖實現的,即將暫時獲取不到鎖的線程加⼊到隊列中。

看個 AQS原理圖:

image-20210306144734263

AQS 使⽤⼀個 int 成員變量來表示同步狀態,通過內置的 FIFO 隊列來完成獲取資源線程的排隊⼯作。AQS 使⽤ CAS 對該同步狀態進⾏原⼦操作實現對其值的修改。

private volatile int state;//共享變量,使⽤volatile修飾保證線程可⻅性

狀態信息通過 protected 類型的 getState,setState,compareAndSetState 進⾏操作

//返回同步狀態的當前值
protected final int getState() {
 return state; }
// 設置同步狀態的值
protected final void setState(int newState) {
 state = newState; }
//原⼦地(CAS操作)將同步狀態值設置為給定值update如果當前同步狀態的值等於expect(期望值)
protected final boolean compareAndSetState(int expect, int update) {
 return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

嘗試加鎖的時候通過CAS(CompareAndSwap)修改值,如果成功設置為1,並且把當前線程ID賦值,則代表加鎖成功,⼀旦獲取到鎖,其他的線程將會被阻塞進⼊阻塞隊列⾃旋,獲得鎖的線程釋放鎖的時候將會喚醒阻塞隊列中的線程,釋放鎖的時候則會把state重新置為0,同時當前線程ID置為空。

image-20210306145019137

面試官:能說一下Semaphore/CountDownLatch/CyclicBarrier嗎

  • Semaphore(信號量)-允許多個線程同時訪問: synchronized 和 ReentrantLock 都是一次只允許一個線程訪問某個資源,Semaphore(信號量)可以指定多個線程同時訪問某個資源。
  • CountDownLatch(倒計時器): CountDownLatch是一個同步工具類,用來協調多個線程之間的同步。這個工具通常用來控制線程等待,它可以讓某一個線程等待直到倒計時結束,再開始執行。
  • CyclicBarrier(循環柵欄): CyclicBarrier 和 CountDownLatch 非常類似,它也可以實現線程間的技術等待,但是它的功能比 CountDownLatch 更加複雜和強大。主要應用場景和 CountDownLatch 類似。CyclicBarrier 的字面意思是可循環使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組線程到達一個屏障(也可以叫同步點)時被阻塞,直到最後一個線程到達屏障時,屏障才會開門,所有被屏障攔截的線程才會繼續幹活。CyclicBarrier默認的構造方法是 CyclicBarrier(int parties),其參數表示屏障攔截的線程數量,每個線程調用await()方法告訴 CyclicBarrier 我已經到達了屏障,然後當前線程被阻塞。

volatile原理知道嗎?

相⽐synchronized的加鎖⽅式來解決共享變量的內存可⻅性問題,volatile就是更輕量的選擇,他沒有上下⽂切換的額外開銷成本。使⽤volatile聲明的變量,可以確保值被更新的時候對其他線程⽴刻可⻅。

volatile使⽤內存屏障來保證不會發⽣指令重排,解決了內存可⻅性的問題。

我們知道,線程都是從主內存中讀取共享變量到⼯作內存來操作,完成之後再把結果寫會主內存,但是這樣就會帶來可⻅性問題。舉個例⼦,假設現在我們是兩級緩存的雙核CPU架構,包含L1、L2兩級緩存。

image-20210306150003231

那麼,如果X變量⽤volatile修飾的話,當線程A再次讀取變量X的話,CPU就會根據緩存⼀致性協議強制線程A重新從主內存加載最新的值到⾃⼰的⼯作內存,⽽不是直接⽤緩存中的值。

再來說內存屏障的問題,volatile修飾之後會加⼊不同的內存屏障來保證可⻅性的問題能正確執⾏。這⾥寫的屏障基於書中提供的內容,但是實際上由於CPU架構不同,重排序的策略不同,提供的內存屏障也不⼀樣,⽐如x86平台上,只有StoreLoad⼀種內存屏障。

  1. StoreStore屏障,保證上⾯的普通寫不和volatile寫發⽣重排序

  2. StoreLoad屏障,保證volatile寫與後⾯可能的volatile讀寫不發⽣重排序

  3. LoadLoad屏障,禁⽌volatile讀與後⾯的普通讀重排序

  4. LoadStore屏障,禁⽌volatile讀和後⾯的普通寫重排序

image-20210306150147271

面試官:說說你對Java內存模型(JMM)的理解,為什麼要用JMM

本身隨着CPU和內存的發展速度差異的問題,導致CPU的速度遠快於內存,所以現在的CPU加⼊了⾼速緩存,⾼速緩存⼀般可以分為L1、L2、L3三級緩存。基於上⾯的例⼦我們知道了這導致了緩存⼀致性的問題,所以加⼊了緩存⼀致性協議,同時導致了內存可⻅性的問題,⽽編譯器和CPU的重排序導致了原⼦性和有序性的問題,JMM內存模型正是對多線程操作下的⼀系列規範約束,通過JMM我們才屏蔽了不同硬件和操作系統內存的訪問差異,這樣保證了Java程序在不同的平台下達到⼀致的內存訪問效果,同時也是保證在⾼效並發的時候程序能夠正確執⾏。

image-20210306150619637

面試官:看你用到了線程池,能說說為什麼嗎

  1. 提高線程的利用率,降低資源的消耗。
  2. 提高響應速度,線程的創建時間為T1,執行時間T2,銷毀時間T3,用線程池可以免去T1和T3的時間。
  3. 便於統一管理線程對象
  4. 可控制最大並發數

面試官:能說一下線程池的核心參數嗎?

來看一ThreadPoolExecutor的構造方法:

public ThreadPoolExecutor(int corePoolSize,
                         int maximumPoolSize,
                         long keepAliveTime,
                         TimeUnit unit,
                        BlockingQueue<Runnable> workQueue,
                        ThreadFactory threadFactory,
                        RejectedExecutionHandler handler) 
  • 核⼼線程數corePoolSize :此值是用來初始化線程池中核心線程數,當線程池中線程池數< corePoolSize時,系統默認是添加一個任務才創建一個線程池。可以通過調用prestartAllCoreThreads方法一次性的啟動corePoolSize個數的線程。當線程數 = corePoolSize時,新任務會追加到workQueue中。

  • 允許的最大線程數maximumPoolSize:maximumPoolSize表示允許的最大線程數 = (非核心線程數+核心線程數),當BlockingQueue也滿了,但線程池中總線程數 < maximumPoolSize時候就會再次創建新的線程。

  • 活躍時間keepAliveTime:非核心線程 =(maximumPoolSize – corePoolSize ) ,非核心線程閑置下來不幹活最多存活時間。

  • 保持存活時間unit:線程池中非核心線程保持存活的時間

  • 等待隊列workQueue:線程池 等待隊列,維護着等待執行的Runnable對象。當運行當線程數= corePoolSize時,新的任務會被添加到workQueue中,如果workQueue也滿了則嘗試用非核心線程執行任務

  • 線程工廠 threadFactory:創建一個新線程時使用的工廠,可以用來設定線程名、是否為daemon線程等等。

  • 拒絕策略RejectedExecutionHandler:corePoolSizeworkQueuemaximumPoolSize都不可用的時候執行的 飽和策略。

面試官:完整說一下線程池的工作流程

  1. 線程池剛創建時,裏面沒有一個線程。任務隊列是作為參數傳進來的。不過,就算隊列裏面有任務,線程池也不會馬上執行它們。

  2. 當調用 execute() 方法添加一個任務時,線程池會做如下判斷:

  • a) 如果正在運行的線程數量小於 corePoolSize,那麼馬上創建線程運行這個任務;

  • b) 如果正在運行的線程數量大於或等於 corePoolSize,那麼將這個任務放入隊列;

  • c) 如果這時候隊列滿了,而且正在運行的線程數量小於 maximumPoolSize,那麼還是要創建非核心線程立刻運行這個任務;

  • d) 如果隊列滿了,而且正在運行的線程數量大於或等於 maximumPoolSize,那麼線程池會根據拒絕策略來對應處理。

  1. 當一個線程完成任務時,它會從隊列中取下一個任務來執行。

  2. 當一個線程無事可做,超過一定的時間(keepAliveTime)時,線程池會判斷,如果當前運行的線程數大於 corePoolSize,那麼這個線程就被停掉。所以線程池的所有任務完成後,它最終會收縮到 corePoolSize 的大小。

image-20210306152737204

面試官:拒絕策略有哪些

主要有4種拒絕策略:

  1. AbortPolicy:直接丟棄任務,拋出異常,這是默認策略

  2. CallerRunsPolicy:只⽤調⽤者所在的線程來處理任務

  3. DiscardOldestPolicy:丟棄等待隊列中最舊的任務,並執⾏當前任務

  4. DiscardPolicy:直接丟棄任務,也不拋出異常

面試官:說一下你的核心線程數是怎麼選的

線程在Java中屬於稀缺資源,線程池不是越大越好也不是越小越好。任務分為計算密集型、IO密集型、混合型。

  1. 計算密集型一般推薦線程池不要過大,一般是CPU數 + 1,+1是因為可能存在頁缺失(就是可能存在有些數據在硬盤中需要多來一個線程將數據讀入內存)。如果線程池數太大,可能會頻繁的 進行線程上下文切換跟任務調度。獲得當前CPU核心數代碼如下:
Runtime.getRuntime().availableProcessors();
  1. IO密集型:線程數適當大一點,機器的Cpu核心數*2。
  2. 混合型:如果密集型站大頭則拆分的必要性不大,如果IO型佔據不少有必要,Mark 下。

面試官:說一下有哪些常見阻塞隊列

  1. ArrayBlockingQueue :由數組結構組成的有界阻塞隊列。

  2. LinkedBlockingQueue :由鏈表結構組成的有界阻塞隊列。

  3. PriorityBlockingQueue :支持優先級排序的無界阻塞隊列。

  4. DelayQueue:使用優先級隊列實現的無界阻塞隊列。

  5. SynchronousQueue:不存儲元素的阻塞隊列。

  6. LinkedTransferQueue:由鏈表結構組成的無界阻塞隊列。

  7. LinkedBlockingDeque:由鏈表結構組成的雙向阻塞隊列

面試官:說一下有哪幾種常見的線程池吧

在上面我們直接用到了ThreadPoolExecutor的構造方法創建線程池,還有另一種方式,通過Executors 創建線程。

需要注意的是,阿里巴巴Java開發手冊強制禁止使用Executors創建線程

image-20210306153832125

比較典型常見的四種線程池包括:newFixedThreadPool newSingleThreadExecutornewCachedThreadPool

newScheduledThreadPool

FixedThreadPool

  1. 定長的線程池,有核心線程,核心線程的即為最大的線程數量,沒有非核心線程。

  2. 使用的無界的等待隊列是LinkedBlockingQueue。使用時候有堵滿等待隊列的風險。

image-20210306161851855

SingleThreadPool

只有一條線程來執行任務,適用於有順序的任務的應用場景,也是用的界等待隊列

image-20210306161915934

CachedThreadPool

可緩存的線程池,該線程池中沒有核心線程,非核心線程的數量為Integer.max_value,就是無限大,當有需要時創建線程來執行任務,沒有需要時回收線程,適用於耗時少,任務量大的情況。任務隊列用的是SynchronousQueue如果生產多快消費慢,則會導致創建很多線程需注意。

image-20210306161950237

ScheduledThreadPoolExecutor

周期性執行任務的線程池,按照某種特定的計劃執行線程中的任務,有核心線程,但也有非核心線程,非核心線程的大小也為無限大。適用於執行周期性的任務。

看構造函數:調用的還是ThreadPoolExecutor構造函數,區別不同點在於任務隊列是用的DelayedWorkQueue。

image-20210306162020463


  • 面試官:這些題都能回答出來,很好,小夥子,很有精神!
  • 老三:謝謝。那面試官老師,你看這一輪面試……
  • 面試官:雖然你答的很好,但你的項目數據量只有十萬級,不符合我們的要求。所以,面試不能讓你過。

老三上去就是一個左刺拳,再接一個右正蹬……

  • 面試官:啊……年輕人不講武德,來偷襲……

代碼地址://gitee.com/fighter3/thread-demo.git

好了,通過本文,相信你對多線程的應用和原理都有了一定的了解。文章開頭提到的crud仔就是博主本人了,技術水平有限,難免錯漏,歡迎指出,謝謝!

參考:

【1】:使用多線程查詢百萬條用戶數據將漢字轉化成拼音

【2】:講真 這次絕對讓你輕鬆學習線程池

【3】:SpringBoot學習筆記(十七:異步調用)

【4】:JavaGuide編著《JavaGuide面試突擊版》

【5】:艾小仙編著 《我想進大廠面試總結》

【6】:佚名編著 《Java核心知識點整理》

【7】:Java並發基礎知識,我用思維導圖整理好了

【8】:並發編程的鎖機制:synchronized和lock

【9】:詳解synchronized與Lock的區別與使用

【10】:bugstack小傅哥編著《Java面經手冊》