面試官問:「在項目中用過多線程嗎?」你就把這個案例講給他聽!
在面試當中,有時候會問到你在項目中用過多線程么?
對於普通的應屆生或者工作時間不長的初級開發 ???—— crud仔流下了沒有技術的眼淚。
博主這裡整理了項目中用到了多線程的一個簡單的實例,希望能對你有所啟發。
多線程開發實例
應用背景
應用的背景非常簡單,博主做的項目是一個審核類的項目,審核的數據需要推送給第三方監管系統,這只是一個很簡單的對接,但是存在一個問題。
我們需要推送的數據大概三十萬條,但是第三方監管提供的接口只支持單條推送(別問為什麼不支持批量,問就是沒討撕論比好過)。可以估算一下,三十萬條數據,一條數據按3秒算,大概需要250(為什麼恰好會是這個數)個小時。
所以就考慮到引入多線程來進行並發操作,降低數據推送的時間,提高數據推送的實時性。
設計要點
防止重複
我們推送給第三方的數據肯定是不能重複推送的,必須要有一個機制保證各個線程推送數據的隔離。
這裡有兩個思路:
-
- 將所有數據取到集合(內存)中,然後進行切割,每個線程推送不同段的數據
-
- 利用 數據庫分頁的方式,每個線程取 [start,limit] 區間的數據推送,我們需要保證start的一致性
這裡採用了第二種方式,因為考慮到可能數據量後續會繼續增加,把所有數據都加載到內存中,可能會有比較大的內存佔用。
失敗機制
我們還得考慮到線程推送數據失敗的情況。
如果是自己的系統,我們可以把多線程調用的方法抽出來加一個事務,一個線程異常,整體回滾。
但是是和第三方的對接,我們都沒法做事務的,所以,我們採用了直接在數據庫記錄失敗狀態的方法,可以在後面用其它方式處理失敗的數據。
線程池選擇
在實際使用中,我們肯定是要用到線程池來管理線程,關於線程池,我們常用 ThreadPoolExecutor提供的線程池服務,SpringBoot中同樣也提供了線程池異步的方式,雖然SprignBoot異步可能更方便一點,但是使用ThreadPoolExecutor更加直觀地控制線程池,所以我們直接使用ThreadPoolExecutor構造方法創建線程池。
大概的技術設計示意圖:
核心代碼
上面叭叭了一堆,到了show you code的環節了。我將項目里的代碼抽取出來,簡化出了一個示例。
核心代碼如下:
/**
* @Author 三分惡
* @Date 2021/3/5
* @Description
*/
@Service
public class PushProcessServiceImpl implements PushProcessService {
@Autowired
private PushUtil pushUtil;
@Autowired
private PushProcessMapper pushProcessMapper;
private final static Logger logger = LoggerFactory.getLogger(PushProcessServiceImpl.class);
//每個線程每次查詢的條數
private static final Integer LIMIT = 5000;
//起的線程數
private static final Integer THREAD_NUM = 5;
//創建線程池
ThreadPoolExecutor pool = new ThreadPoolExecutor(THREAD_NUM, THREAD_NUM * 2, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100));
@Override
public void pushData() throws ExecutionException, InterruptedException {
//計數器,需要保證線程安全
int count = 0;
//未推送數據總數
Integer total = pushProcessMapper.countPushRecordsByState(0);
logger.info("未推送數據條數:{}", total);
//計算需要多少輪
int num = total / (LIMIT * THREAD_NUM) + 1;
logger.info("要經過的輪數:{}", num);
//統計總共推送成功的數據條數
int totalSuccessCount = 0;
for (int i = 0; i < num; i++) {
//接收線程返回結果
List<Future<Integer>> futureList = new ArrayList<>(32);
//起THREAD_NUM個線程並行查詢更新庫,加鎖
for (int j = 0; j < THREAD_NUM; j++) {
synchronized (PushProcessServiceImpl.class) {
int start = count * LIMIT;
count++;
//提交線程,用數據起始位置標識線程
Future<Integer> future = pool.submit(new PushDataTask(start, LIMIT, start));
//先不取值,防止阻塞,放進集合
futureList.add(future);
}
}
//統計本輪推送成功數據
for (Future f : futureList) {
totalSuccessCount = totalSuccessCount + (int) f.get();
}
}
//更新推送標誌
pushProcessMapper.updateAllState(1);
logger.info("推送數據完成,需推送數據:{},推送成功:{}", total, totalSuccessCount);
}
/**
* 推送數據線程類
*/
class PushDataTask implements Callable<Integer> {
int start;
int limit;
int threadNo; //線程編號
PushDataTask(int start, int limit, int threadNo) {
this.start = start;
this.limit = limit;
this.threadNo = threadNo;
}
@Override
public Integer call() throws Exception {
int count = 0;
//推送的數據
List<PushProcess> pushProcessList = pushProcessMapper.findPushRecordsByStateLimit(0, start, limit);
if (CollectionUtils.isEmpty(pushProcessList)) {
return count;
}
logger.info("線程{}開始推送數據", threadNo);
for (PushProcess process : pushProcessList) {
boolean isSuccess = pushUtil.sendRecord(process);
if (isSuccess) { //推送成功
//更新推送標識
pushProcessMapper.updateFlagById(process.getId(), 1);
count++;
} else { //推送失敗
pushProcessMapper.updateFlagById(process.getId(), 2);
}
}
logger.info("線程{}推送成功{}條", threadNo, count);
return count;
}
}
}
代碼很長,我們簡單說一下關鍵的地方:
- 線程創建:線程內部類選擇了實現Callable接口,這樣方便獲取線程任務執行的結果,在示例里用於統計線程推送成功的數量
class PushDataTask implements Callable<Integer> {
- 使用 ThreadPoolExecutor 創建線程池,
//創建線程池
ThreadPoolExecutor pool = new ThreadPoolExecutor(THREAD_NUM, THREAD_NUM * 2, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100));
主要構造參數如下:
– corePoolSize:線程核心參數選擇了5
– maximumPoolSize:最大線程數選擇了核心線程數2倍數
– keepAliveTime:非核心閑置線程存活時間直接置為0
– unit:非核心線程保持存活的時間選擇了 TimeUnit.SECONDS 秒
– workQueue:線程池等待隊列,使用 容量初始為100的 LinkedBlockingQueue阻塞隊列
這裡還有沒寫出來的線程池拒絕策略,採用了默認AbortPolicy:直接丟棄任務,拋出異常。
- 使用 synchronized 來保證線程安全,保證計數器的增加是有序的
synchronized (PushProcessServiceImpl.class) {
- 使用集合來接收線程的運行結果,防止阻塞
List<Future<Integer>> futureList = new ArrayList<>(32);
好了,主要的代碼和簡單的解析就到這裡了。
關於這個簡單的demo,這裡只是簡單地做推送數據處理。考慮一下,這個實例是不是可以用在你項目的某些地方。例如監管系統的數據校驗、審計系統的數據統計、電商系統的數據分析等等,只要是有大量數據處理的地方,都可以把這個例子結合到你的項目里,這樣你就有了多線程開發的經驗。
完整代碼倉庫地址在文章底部👇👇
對線面試官
- 面試官:小夥子,不錯,你這個整挺好。
- 老三:那是自然。
- 面試官:呦,小夥子,挺自信,那我得好好考考你。
- 老三:放馬過來,但考無妨。
面試官:先從最簡單的開始,說說什麼是線程吧
要說線程,必先說進程。
進程是程序的⼀次執⾏過程,是系統運⾏程序的基本單位,因此進程是動態的。系統運⾏⼀個程序即是⼀個進程從創建,運⾏到消亡的過程。
線程與進程相似,但線程是⼀個⽐進程更⼩的執⾏單位。⼀個進程在其執⾏的過程中可以產⽣多個線程。與進程不同的是同類的多個線程共享進程的堆和⽅法區資源,但每個線程有⾃⼰的程序計數器、虛擬機棧和本地⽅法棧,所以系統在產⽣⼀個線程,或是在各個線程之間作切換⼯作時,負擔要⽐進程⼩得多,也正因為如此,線程也被稱為輕量級進程。
面試官:說說Java里怎麼創建線程吧
Java里創建線程主要有三種方式:
-
繼承 Thread類:Thread 類本質上是實現了 Runnable 接口的一個實例,代表一個線程的實例。啟動線程的唯一方法就是通過 Thread 類的 start()實例方法。start()方法是一個 native 方法,它將啟動一個新線程,並執行 run()方法。
-
實現 Runnable接口:如果自己的類已經 extends 另一個類,就無法直接 extends Thread,此時,可以實現一個Runnable 接口。
-
實現 Callable接口:實現Callable接口,重寫call()方法,可以返回一個 Future類型的返回值。我在上面的例子里就是用到了這種方式。
面試官:說說線程的生命周期和狀態
在Java中,線程共有六種狀態:
狀態 | 說明 |
---|---|
NEW | 初始狀態:線程被創建,但還沒有調用start()方法 |
RUNNABLE | 運行狀態:Java線程將操作系統中的就緒和運行兩種狀態籠統的稱作「運行」 |
BLOCKED | 阻塞狀態:表示線程阻塞於鎖 |
WAITING | 等待狀態:表示線程進入等待狀態,進入該狀態表示當前線程需要等待其他線程做出一些特定動作(通知或中斷) |
TIME_WAITING | 超時等待狀態:該狀態不同於 WAITIND,它是可以在指定的時間自行返回的 |
TERMINATED | 終止狀態:表示當前線程已經執行完畢 |
線程在自身的生命周期中, 並不是固定地處於某個狀態,而是隨着代碼的執行在不同的狀態之間進行切換,Java線程狀態變化如圖示:
面試官:我看你提到了線程阻塞,那你再說說線程死鎖吧
線程死鎖描述的是這樣⼀種情況:多個線程同時被阻塞,它們中的⼀個或者全部都在等待某個資源被釋放。由於線程被⽆限期地阻塞,因此程序不可能正常終⽌。
如下圖所示,線程 A 持有資源 2,線程 B 持有資源 1,他們同時都想申請對⽅的資源,所以這兩個線程就會互相等待⽽進⼊死鎖狀態。
產生死鎖必須滿足四個條件:
-
互斥條件:該資源任意⼀個時刻只由⼀個線程占⽤。
-
請求與保持條件:⼀個進程因請求資源⽽阻塞時,對已獲得的資源保持不放。
-
不剝奪條件:線程已獲得的資源在末使⽤完之前不能被其他線程強⾏剝奪,只有⾃⼰使⽤完畢後才釋放資源。
-
循環等待條件:若⼲進程之間形成⼀種頭尾相接的循環等待資源關係。
面試官:怎麼避免死鎖呢?
我上⾯說了產⽣死鎖的四個必要條件,為了避免死鎖,我們只要破壞產⽣死鎖的四個條件中的其中⼀個就可以了。
-
破壞互斥條件 :這個條件我們沒有辦法破壞,因為我們⽤鎖本來就是想讓他們互斥的(臨界資源需要互斥訪問)。
-
破壞請求與保持條件 :⼀次性申請所有的資源。
-
破壞不剝奪條件 :占⽤部分資源的線程進⼀步申請其他資源時,如果申請不到,可以主動釋放它佔有的資源。
-
破壞循環等待條件 :靠按序申請資源來預防。按某⼀順序申請資源,釋放資源則反序釋放。破壞循環等待條件。
面試官:我看你的例子里用到了synchronized,說說 synchronized的用法吧
synchronized 關鍵字最主要的三種使⽤⽅式:
1.修飾實例⽅法: 作⽤於當前對象實例加鎖,進⼊同步代碼前要獲得 當前對象實例的鎖
synchronized void method() {
//業務代碼
}
2.修飾靜態⽅法: 也就是給當前類加鎖,會作⽤於類的所有對象實例 ,進⼊同步代碼前要獲得當前 class 的鎖。因為靜態成員不屬於任何⼀個實例對象,是類成員( static 表明這是該類的⼀個靜態資源,不管 new 了多少個對象,只有⼀份)。所以,如果⼀個線程 A 調⽤⼀個實例對象的⾮靜態 synchronized ⽅法,⽽線程 B 需要調⽤這個實例對象所屬類的靜態 synchronized ⽅法,是允許的,不會發⽣互斥現象,因為訪問靜態 synchronized ⽅法佔⽤的鎖是當前類的鎖,⽽訪問⾮靜態 synchronized ⽅法佔⽤的鎖是當前實例對象鎖。
synchronized void staic method() {
//業務代碼
}
3.修飾代碼塊 :指定加鎖對象,對給定對象/類加鎖。 synchronized(this|object) 表示進⼊同步代碼庫前要獲得給定對象的鎖。 synchronized(類.class) 表示進⼊同步代碼前要獲得 當前 class 的鎖
synchronized(this) {
//業務代碼
}
在我的例子里使用synchronized修飾代碼塊,給PushProcessServiceImpl類加鎖,進⼊同步代碼前要獲得 當前 class 的鎖,防止PushProcessServiceImpl類的對象在控制層調用推送數據的方法。
面試官:除了使用synchronized,還有什麼辦法來加鎖嗎?詳細說一下
可以使用juc包提供的鎖。Lock接口主要相關的類和接口如下。
Lock中的主要方法:
- lock:用來獲取鎖,如果鎖被其他線程獲取,進入等待狀態。
- lockInterruptibly:通過這個方法去獲取鎖時,如果線程正在等待獲取鎖,則這個線程能夠響應中斷,即中斷線程的等待狀態。
- tryLock:tryLock方法是有返回值的,它表示用來嘗試獲取鎖,如果獲取成功,則返回true,如果獲取失敗(即鎖已被其他線程獲取),則返回false。
- tryLock(long,TimeUnit):與tryLock類似,只不過是有等待時間,在等待時間內獲取到鎖返回true,超時返回false。
- unlock:釋放鎖。
其它接口和類:
- ReetrantLock(可重入鎖):實現了Lock接口,可重入鎖,內部定義了公平鎖與非公平鎖。可以完成synchronized 所能完成的所有工作。
- ReadWriteLock(讀寫鎖):
public interface ReadWriteLock {
Lock readLock(); //獲取讀鎖
Lock writeLock(); //獲取寫鎖
}
一個用來獲取讀鎖,一個用來獲取寫鎖。也就是說將文件的讀寫操作分開,分成2個鎖來分配給線程,從而使得多個線程可以同時進行讀操作。
- ReetrantReadWriteLock(可重入讀寫鎖):ReetrantReadWriteLock同樣支持公平性選擇,支持重進入,鎖降級。
面試官:說說synchronized和Lock的區別
類別 | synchronized | Lock |
---|---|---|
存在層次 | Java的關鍵字,在jvm層面上 | 是一個接口,api級別 |
鎖的釋放 | 1、以獲取鎖的線程執行完同步代碼,釋放鎖 2、線程執行發生異常,jvm會讓線程釋放鎖 | 在finally中必須釋放鎖,不然容易造成線程死鎖 |
鎖的獲取 | 假設A線程獲得鎖,B線程等待。如果A線程阻塞,B線程會一直等待 | 分情況而定,Lock有多個鎖獲取的方式,具體下面會說道,大致就是可以嘗試獲得鎖,線程可以不用一直等待 |
鎖狀態 | 無法判斷 | 可以判斷 |
鎖類型 | 可重入 不可中斷 非公平 | 可重入 可判斷 可公平(兩者皆可) |
性能 | 少量同步 | 大量同步 |
面試官:你提到了synchronized基於jvm層面,對這個有了解嗎?
synchronized是利用java提供的原⼦性內置鎖(monitor 對象),每個對象中都內置了⼀個 ObjectMonitor 對象。這種內置的並且使⽤者看不到的鎖也被稱為監視器鎖。
同步語句塊
synchronized 同步語句塊的實現使⽤的是 monitorenter 和 monitorexit 指令,其中monitorenter 指令指向同步代碼塊的開始位置monitorexit 指令則指明同步代碼塊的結束位置。
執⾏monitorenter指令時會嘗試獲取內置鎖,如果對象沒有被鎖定或者已經獲得了鎖,鎖的計數器+1。此時其他競爭鎖的線程則會進⼊等待隊列中。
執⾏monitorexit指令時則會把計數器-1,當計數器值為0時,則鎖釋放,處於等待隊列中的線程再繼續競爭鎖。
synchronized 修飾⽅法
synchronized 修飾的⽅法並沒有 monitorenter 指令和 monitorexit 指令,取得代之的確實是ACC_SYNCHRONIZED 標識,該標識指明了該⽅法是⼀個同步⽅法。JVM 通過該ACC_SYNCHRONIZED 訪問標誌來辨別⼀個⽅法是否聲明為同步⽅法,從⽽執⾏相應的同步調⽤。
當然,二者細節略有不同,但本質上都是獲取原子性內置鎖。
再深入一點,synchronized實際上有兩個隊列waitSet和entryList。
-
當多個線程進⼊同步代碼塊時,⾸先進⼊entryList
-
有⼀個線程獲取到monitor鎖後,就賦值給當前線程,並且計數器+1
-
如果線程調⽤wait⽅法,將釋放鎖,當前線程置為null,計數器-1,同時進⼊waitSet等待被喚醒,調⽤notify或者notifyAll之後⼜會進⼊entryList競爭鎖
-
如果線程執⾏完畢,同樣釋放鎖,計數器-1,當前線程置為null
synchronized的優化能說一說嗎?
從JDK1.6版本之後,synchronized本身也在不斷優化鎖的機制,有些情況下他並不會是⼀個很重量級的鎖。優化機制包括⾃適應鎖、⾃旋鎖、鎖消除、鎖粗化、偏向鎖、輕量級鎖。
鎖的狀態從低到⾼依次為⽆鎖->偏向鎖->輕量級鎖->重量級鎖,升級的過程就是從低到⾼。
自旋鎖:由於⼤部分時候,鎖被占⽤的時間很短,共享變量的鎖定時間也很短,所有沒有必要掛起線程,⽤戶態和內核態的來回上下⽂切換嚴重影響性能。⾃旋的概念就是讓線程執⾏⼀個忙循環,可以理解為就是啥也不⼲,防⽌從⽤戶態轉⼊內核態,⾃旋鎖可以通過設置-XX:+UseSpining來開啟,⾃旋的默認次數是10次,可以使⽤-XX:PreBlockSpin設置。
自適應鎖:自適應鎖就是自適應的自旋鎖,自旋鎖的時間不是固定時間,而是由前⼀次在同⼀個鎖上的⾃旋時間和鎖的持有者狀態來決定。
鎖消除:鎖消除指的是JVM檢測到⼀些同步的代碼塊,完全不存在數據競爭的場景,也就是不需要加鎖,就會進⾏鎖消除。
鎖粗化:鎖粗化指的是有很多操作都是對同⼀個對象進⾏加鎖,就會把鎖的同步範圍擴展到整個操作序列之外。
偏向鎖:當線程訪問同步塊獲取鎖時,會在對象頭和棧幀中的鎖記錄⾥存儲偏向鎖的線程ID,之後這個線程再次進⼊同步塊時都不需要CAS來加鎖和解鎖了,偏向鎖會永遠偏向第⼀個獲得鎖的線程,如果後續沒有其他線程獲得過這個鎖,持有鎖的線程就永遠不需要進⾏同步,反之,當有其他線程競爭偏向鎖時,持有偏向鎖的線程就會釋放偏向鎖。可以⽤過設置-XX:+UseBiasedLocking開啟偏向鎖。
輕量級鎖:JVM的對象的對象頭中包含有⼀些鎖的標誌位,代碼進⼊同步塊的時候,JVM將會使⽤CAS⽅式來嘗試獲取鎖,如果更新成功則會把對象頭中的狀態位標記為輕量級鎖,如果更新失敗,當前線程就嘗試⾃旋來獲得鎖。
鎖升級的過程非常複雜,簡單點說,偏向鎖就是通過對象頭的偏向線程ID來對⽐,甚⾄都不需要CAS了,⽽輕量級鎖主要就是通過CAS修改對象頭鎖記錄和⾃旋來實現,重量級鎖則是除了擁有鎖的線程其他全部阻塞。
面試官:說一下CAS
CAS(Compare And Swap/Set)比較並交換,CAS 算法的過程是這樣:它包含 3 個參數CAS(V,E,N)。V 表示要更新的變量(內存值),E 表示預期值(舊的),N 表示新值。當且僅當 V 值等於 E 值時,才會將 V 的值設為 N,如果 V 值和 E 值不同,則說明已經有其他線程做了更新,則當前線程什麼都不做。最後,CAS 返回當前 V 的真實值。
CAS是一種樂觀鎖,它總是認為自己可以成功完成操作。當多個線程同時使用 CAS 操作一個變量時,只有一個會勝出,並成功更新,其餘均會失敗。失敗的線程不會被掛起,僅是被告知失敗,並且允許再次嘗試,當然也允許失敗的線程放棄操作。基於這樣的原理,CAS 操作即使沒有鎖,也可以發現其他線程對當前線程的干擾,並進行恰當的處理。
java.util.concurrent.atomic 包下的類大多是使用 CAS 操作來實現的 (AtomicInteger,AtomicBoolean,AtomicLong)。
面試官:CAS會導致什麼問題?
- ABA 問題:
比如說一個線程 one 從內存位置 V 中取出 A,這時候另一個線程 two 也從內存中取出 A,並且 two 進行了一些操作變成了 B,然後 two 又將 V 位置的數據變成 A,這時候線程 one 進行 CAS 操作發現內存中仍然是 A,然後 one 操作成功。儘管線程 one 的 CAS 操作成功,但可能存在潛藏的問題。從 Java1.5 開始 JDK 的 atomic 包里提供了一個類 AtomicStampedReference 來解決 ABA 問題。
- 循環時間長開銷大:
對於資源競爭嚴重(線程衝突嚴重)的情況,CAS 自旋的概率會比較大,從而浪費更多的 CPU 資源,效率低於 synchronized。
- 只能保證一個共享變量的原子操作:
當對一個共享變量執行操作時,我們可以使用循環 CAS 的方式來保證原子操作,但是對多個共享變量操作時,循環 CAS 就無法保證操作的原子性,這個時候就可以用鎖。
面試官:能說一下說下ReentrantLock原理嗎
ReentrantLock 是基於 Lock 實現的可重入鎖,所有的 Lock 都是基於 AQS 實現的,AQS 和 Condition 各自維護不同的對象,在使用 Lock 和 Condition 時,其實就是兩個隊列的互相移動。它所提供的共享鎖、互斥鎖都是基於對 state 的操作。
面試官:能說一下AQS嗎
AbstractQueuedSynchronizer,抽象的隊列式的同步器,AQS 定義了一套多線程訪問共享資源的同步器框架,許多同步類實現都依賴於它,如常用的
ReentrantLock/Semaphore/CountDownLatch。
AQS 核⼼思想是,如果被請求的共享資源空閑,則將當前請求資源的線程設置為有效的⼯作線程,並且將共享資源設置為鎖定狀態。如果被請求的共享資源被占⽤,那麼就需要⼀套線程阻塞等待以及被喚醒時鎖分配的機制,這個機制 AQS 是⽤ CLH 隊列鎖實現的,即將暫時獲取不到鎖的線程加⼊到隊列中。
看個 AQS原理圖:
AQS 使⽤⼀個 int 成員變量來表示同步狀態,通過內置的 FIFO 隊列來完成獲取資源線程的排隊⼯作。AQS 使⽤ CAS 對該同步狀態進⾏原⼦操作實現對其值的修改。
private volatile int state;//共享變量,使⽤volatile修飾保證線程可⻅性
狀態信息通過 protected 類型的 getState,setState,compareAndSetState 進⾏操作
//返回同步狀態的當前值
protected final int getState() {
return state; }
// 設置同步狀態的值
protected final void setState(int newState) {
state = newState; }
//原⼦地(CAS操作)將同步狀態值設置為給定值update如果當前同步狀態的值等於expect(期望值)
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
嘗試加鎖的時候通過CAS(CompareAndSwap)修改值,如果成功設置為1,並且把當前線程ID賦值,則代表加鎖成功,⼀旦獲取到鎖,其他的線程將會被阻塞進⼊阻塞隊列⾃旋,獲得鎖的線程釋放鎖的時候將會喚醒阻塞隊列中的線程,釋放鎖的時候則會把state重新置為0,同時當前線程ID置為空。
面試官:能說一下Semaphore/CountDownLatch/CyclicBarrier嗎
- Semaphore(信號量)-允許多個線程同時訪問: synchronized 和 ReentrantLock 都是一次只允許一個線程訪問某個資源,Semaphore(信號量)可以指定多個線程同時訪問某個資源。
- CountDownLatch(倒計時器): CountDownLatch是一個同步工具類,用來協調多個線程之間的同步。這個工具通常用來控制線程等待,它可以讓某一個線程等待直到倒計時結束,再開始執行。
- CyclicBarrier(循環柵欄): CyclicBarrier 和 CountDownLatch 非常類似,它也可以實現線程間的技術等待,但是它的功能比 CountDownLatch 更加複雜和強大。主要應用場景和 CountDownLatch 類似。CyclicBarrier 的字面意思是可循環使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組線程到達一個屏障(也可以叫同步點)時被阻塞,直到最後一個線程到達屏障時,屏障才會開門,所有被屏障攔截的線程才會繼續幹活。CyclicBarrier默認的構造方法是 CyclicBarrier(int parties),其參數表示屏障攔截的線程數量,每個線程調用await()方法告訴 CyclicBarrier 我已經到達了屏障,然後當前線程被阻塞。
volatile原理知道嗎?
相⽐synchronized的加鎖⽅式來解決共享變量的內存可⻅性問題,volatile就是更輕量的選擇,他沒有上下⽂切換的額外開銷成本。使⽤volatile聲明的變量,可以確保值被更新的時候對其他線程⽴刻可⻅。
volatile使⽤內存屏障來保證不會發⽣指令重排,解決了內存可⻅性的問題。
我們知道,線程都是從主內存中讀取共享變量到⼯作內存來操作,完成之後再把結果寫會主內存,但是這樣就會帶來可⻅性問題。舉個例⼦,假設現在我們是兩級緩存的雙核CPU架構,包含L1、L2兩級緩存。
那麼,如果X變量⽤volatile修飾的話,當線程A再次讀取變量X的話,CPU就會根據緩存⼀致性協議強制線程A重新從主內存加載最新的值到⾃⼰的⼯作內存,⽽不是直接⽤緩存中的值。
再來說內存屏障的問題,volatile修飾之後會加⼊不同的內存屏障來保證可⻅性的問題能正確執⾏。這⾥寫的屏障基於書中提供的內容,但是實際上由於CPU架構不同,重排序的策略不同,提供的內存屏障也不⼀樣,⽐如x86平台上,只有StoreLoad⼀種內存屏障。
-
StoreStore屏障,保證上⾯的普通寫不和volatile寫發⽣重排序
-
StoreLoad屏障,保證volatile寫與後⾯可能的volatile讀寫不發⽣重排序
-
LoadLoad屏障,禁⽌volatile讀與後⾯的普通讀重排序
-
LoadStore屏障,禁⽌volatile讀和後⾯的普通寫重排序
面試官:說說你對Java內存模型(JMM)的理解,為什麼要用JMM
本身隨着CPU和內存的發展速度差異的問題,導致CPU的速度遠快於內存,所以現在的CPU加⼊了⾼速緩存,⾼速緩存⼀般可以分為L1、L2、L3三級緩存。基於上⾯的例⼦我們知道了這導致了緩存⼀致性的問題,所以加⼊了緩存⼀致性協議,同時導致了內存可⻅性的問題,⽽編譯器和CPU的重排序導致了原⼦性和有序性的問題,JMM內存模型正是對多線程操作下的⼀系列規範約束,通過JMM我們才屏蔽了不同硬件和操作系統內存的訪問差異,這樣保證了Java程序在不同的平台下達到⼀致的內存訪問效果,同時也是保證在⾼效並發的時候程序能夠正確執⾏。
面試官:看你用到了線程池,能說說為什麼嗎
- 提高線程的利用率,降低資源的消耗。
- 提高響應速度,線程的創建時間為T1,執行時間T2,銷毀時間T3,用線程池可以免去T1和T3的時間。
- 便於統一管理線程對象
- 可控制最大並發數
面試官:能說一下線程池的核心參數嗎?
來看一ThreadPoolExecutor的構造方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
-
核⼼線程數corePoolSize :此值是用來初始化線程池中核心線程數,當線程池中線程池數<
corePoolSize
時,系統默認是添加一個任務才創建一個線程池。可以通過調用prestartAllCoreThreads
方法一次性的啟動corePoolSize
個數的線程。當線程數 = corePoolSize時,新任務會追加到workQueue中。 -
允許的最大線程數maximumPoolSize:
maximumPoolSize
表示允許的最大線程數 = (非核心線程數+核心線程數),當BlockingQueue
也滿了,但線程池中總線程數 <maximumPoolSize
時候就會再次創建新的線程。 -
活躍時間keepAliveTime:非核心線程 =(maximumPoolSize – corePoolSize ) ,非核心線程閑置下來不幹活最多存活時間。
-
保持存活時間unit:線程池中非核心線程保持存活的時間
-
等待隊列workQueue:線程池 等待隊列,維護着等待執行的
Runnable
對象。當運行當線程數= corePoolSize時,新的任務會被添加到workQueue
中,如果workQueue
也滿了則嘗試用非核心線程執行任務 -
線程工廠 threadFactory:創建一個新線程時使用的工廠,可以用來設定線程名、是否為daemon線程等等。
-
拒絕策略RejectedExecutionHandler:
corePoolSize
、workQueue
、maximumPoolSize
都不可用的時候執行的 飽和策略。
面試官:完整說一下線程池的工作流程
-
線程池剛創建時,裏面沒有一個線程。任務隊列是作為參數傳進來的。不過,就算隊列裏面有任務,線程池也不會馬上執行它們。
-
當調用 execute() 方法添加一個任務時,線程池會做如下判斷:
-
a) 如果正在運行的線程數量小於 corePoolSize,那麼馬上創建線程運行這個任務;
-
b) 如果正在運行的線程數量大於或等於 corePoolSize,那麼將這個任務放入隊列;
-
c) 如果這時候隊列滿了,而且正在運行的線程數量小於 maximumPoolSize,那麼還是要創建非核心線程立刻運行這個任務;
-
d) 如果隊列滿了,而且正在運行的線程數量大於或等於 maximumPoolSize,那麼線程池會根據拒絕策略來對應處理。
-
當一個線程完成任務時,它會從隊列中取下一個任務來執行。
-
當一個線程無事可做,超過一定的時間(keepAliveTime)時,線程池會判斷,如果當前運行的線程數大於 corePoolSize,那麼這個線程就被停掉。所以線程池的所有任務完成後,它最終會收縮到 corePoolSize 的大小。
面試官:拒絕策略有哪些
主要有4種拒絕策略:
-
AbortPolicy:直接丟棄任務,拋出異常,這是默認策略
-
CallerRunsPolicy:只⽤調⽤者所在的線程來處理任務
-
DiscardOldestPolicy:丟棄等待隊列中最舊的任務,並執⾏當前任務
-
DiscardPolicy:直接丟棄任務,也不拋出異常
面試官:說一下你的核心線程數是怎麼選的
線程在Java中屬於稀缺資源,線程池不是越大越好也不是越小越好。任務分為計算密集型、IO密集型、混合型。
- 計算密集型一般推薦線程池不要過大,一般是CPU數 + 1,+1是因為可能存在頁缺失(就是可能存在有些數據在硬盤中需要多來一個線程將數據讀入內存)。如果線程池數太大,可能會頻繁的 進行線程上下文切換跟任務調度。獲得當前CPU核心數代碼如下:
Runtime.getRuntime().availableProcessors();
- IO密集型:線程數適當大一點,機器的Cpu核心數*2。
- 混合型:如果密集型站大頭則拆分的必要性不大,如果IO型佔據不少有必要,Mark 下。
面試官:說一下有哪些常見阻塞隊列
-
ArrayBlockingQueue :由數組結構組成的有界阻塞隊列。
-
LinkedBlockingQueue :由鏈表結構組成的有界阻塞隊列。
-
PriorityBlockingQueue :支持優先級排序的無界阻塞隊列。
-
DelayQueue:使用優先級隊列實現的無界阻塞隊列。
-
SynchronousQueue:不存儲元素的阻塞隊列。
-
LinkedTransferQueue:由鏈表結構組成的無界阻塞隊列。
-
LinkedBlockingDeque:由鏈表結構組成的雙向阻塞隊列
面試官:說一下有哪幾種常見的線程池吧
在上面我們直接用到了ThreadPoolExecutor的構造方法創建線程池,還有另一種方式,通過Executors 創建線程。
需要注意的是,阿里巴巴Java開發手冊強制禁止使用Executors創建線程
比較典型常見的四種線程池包括:newFixedThreadPool
、 newSingleThreadExecutor
、 newCachedThreadPool
、
newScheduledThreadPool
。
FixedThreadPool
-
定長的線程池,有核心線程,核心線程的即為最大的線程數量,沒有非核心線程。
-
使用的無界的等待隊列是
LinkedBlockingQueue
。使用時候有堵滿等待隊列的風險。
SingleThreadPool
只有一條線程來執行任務,適用於有順序的任務的應用場景,也是用的無界等待隊列
CachedThreadPool
可緩存的線程池,該線程池中沒有核心線程,非核心線程的數量為Integer.max_value,就是無限大,當有需要時創建線程來執行任務,沒有需要時回收線程,適用於耗時少,任務量大的情況。任務隊列用的是SynchronousQueue如果生產多快消費慢,則會導致創建很多線程需注意。
ScheduledThreadPoolExecutor
周期性執行任務的線程池,按照某種特定的計劃執行線程中的任務,有核心線程,但也有非核心線程,非核心線程的大小也為無限大。適用於執行周期性的任務。
看構造函數:調用的還是ThreadPoolExecutor
構造函數,區別不同點在於任務隊列是用的DelayedWorkQueue。
- 面試官:這些題都能回答出來,很好,小夥子,很有精神!
- 老三:謝謝。那面試官老師,你看這一輪面試……
- 面試官:雖然你答的很好,但你的項目數據量只有十萬級,不符合我們的要求。所以,面試不能讓你過。
老三上去就是一個左刺拳,再接一個右正蹬……
- 面試官:啊……年輕人不講武德,來偷襲……
代碼地址://gitee.com/fighter3/thread-demo.git
好了,通過本文,相信你對多線程的應用和原理都有了一定的了解。文章開頭提到的crud仔就是博主本人了,技術水平有限,難免錯漏,歡迎指出,謝謝!
參考:
【2】:講真 這次絕對讓你輕鬆學習線程池
【4】:JavaGuide編著《JavaGuide面試突擊版》
【5】:艾小仙編著 《我想進大廠面試總結》
【6】:佚名編著 《Java核心知識點整理》
【8】:並發編程的鎖機制:synchronized和lock
【10】:bugstack小傅哥編著《Java面經手冊》