Java並發知識總結,超詳細!
首先給大家分享一個github倉庫,上面放了200多本經典的電腦書籍,包括C語言、C++、Java、Python、前端、資料庫、作業系統、電腦網路、數據結構和演算法、機器學習、編程人生等,可以star一下,下次找書直接在上面搜索,倉庫持續更新中~
github地址://github.com/Tyson0314/java-books
如果github訪問不了,可以訪問gitee倉庫。
gitee地址://gitee.com/tysondai/java-books
執行緒池
使用執行緒池的好處:
- 降低資源消耗。通過重複利用已創建的執行緒降低執行緒創建和銷毀造成的消耗。
- 提高響應速度。當任務到達時,任務可以不需要的等到執行緒創建就能立即執行。
- 提高執行緒的可管理性。統一管理執行緒,避免系統創建大量同類執行緒而導致消耗完記憶體。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler);
執行緒池原理
創建新的執行緒需要獲取全局鎖,通過這種設計可以盡量避免獲取全局鎖,當 ThreadPoolExecutor 完成預熱之後(當前運行的執行緒數大於等於 corePoolSize),提交的大部分任務都會被放到 BlockingQueue。
ThreadPoolExecutor 的通用構造函數:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler);
-
corePoolSize:當有新任務時,如果執行緒池中執行緒數沒有達到執行緒池的基本大小,則會創建新的執行緒執行任務,否則將任務放入阻塞隊列。當執行緒池中存活的執行緒數總是大於 corePoolSize 時,應該考慮調大 corePoolSize。
-
maximumPoolSize:當阻塞隊列填滿時,如果執行緒池中執行緒數沒有超過最大執行緒數,則會創建新的執行緒運行任務。否則根據拒絕策略處理新任務。非核心執行緒類似於臨時借來的資源,這些執行緒在空閑時間超過 keepAliveTime 之後,就應該退出,避免資源浪費。
-
BlockingQueue:存儲等待運行的任務。
-
keepAliveTime:非核心執行緒空閑後,保持存活的時間,此參數只對非核心執行緒有效。設置為0,表示多餘的空閑執行緒會被立即終止。
-
TimeUnit:時間單位
TimeUnit.DAYS TimeUnit.HOURS TimeUnit.MINUTES TimeUnit.SECONDS TimeUnit.MILLISECONDS TimeUnit.MICROSECONDS TimeUnit.NANOSECONDS
-
ThreadFactory:每當執行緒池創建一個新的執行緒時,都是通過執行緒工廠方法來完成的。在 ThreadFactory 中只定義了一個方法 newThread,每當執行緒池需要創建新執行緒就會調用它。
public class MyThreadFactory implements ThreadFactory { private final String poolName; public MyThreadFactory(String poolName) { this.poolName = poolName; } public Thread newThread(Runnable runnable) { return new MyAppThread(runnable, poolName);//將執行緒池名字傳遞給構造函數,用於區分不同執行緒池的執行緒 } }
-
RejectedExecutionHandler:當隊列和執行緒池都滿了時,根據拒絕策略處理新任務。
AbortPolicy:默認的策略,直接拋出RejectedExecutionException DiscardPolicy:不處理,直接丟棄 DiscardOldestPolicy:將等待隊列隊首的任務丟棄,並執行當前任務 CallerRunsPolicy:由調用執行緒處理該任務
執行緒池大小
如果執行緒池執行緒數量太小,當有大量請求需要處理,系統響應比較慢影響體驗,甚至會出現任務隊列大量堆積任務導致OOM。
如果執行緒池執行緒數量過大,大量執行緒可能會同時在爭取 CPU 資源,這樣會導致大量的上下文切換(cpu給執行緒分配時間片,當執行緒的cpu時間片用完後保存狀態,以便下次繼續運行),從而增加執行緒的執行時間,影響了整體執行效率。
CPU 密集型任務(N+1): 這種任務消耗的主要是 CPU 資源,可以將執行緒數設置為 N(CPU 核心數)+1,比 CPU 核心數多出來的一個執行緒是為了防止某些原因導致的任務暫停(執行緒阻塞,如io操作,等待鎖,執行緒sleep)而帶來的影響。一旦某個執行緒被阻塞,釋放了cpu資源,而在這種情況下多出來的一個執行緒就可以充分利用 CPU 的空閑時間。
I/O 密集型任務(2N): 系統會用大部分的時間來處理 I/O 操作,而執行緒等待 I/O 操作會被阻塞,釋放 cpu資源,這時就可以將 CPU 交出給其它執行緒使用。因此在 I/O 密集型任務的應用中,我們可以多配置一些執行緒,具體的計算方法:最佳執行緒數 = CPU核心數 * (1/CPU利用率) = CPU核心數 * (1 + (I/O耗時/CPU耗時)),一般可設置為2N。
關閉執行緒池
shutdown():
將執行緒池狀態置為SHUTDOWN
,並不會立即停止:
- 停止接收外部提交的任務
- 內部正在跑的任務和隊列里等待的任務,會執行完
- 等到第二步完成後,才真正停止
shutdownNow():
將執行緒池狀態置為STOP
。企圖立即停止,事實上不一定:
- 跟shutdown()一樣,先停止接收外部提交的任務
- 忽略隊列里等待的任務
- 嘗試將正在跑的任務中斷(不一定中斷成功,取決於任務響應中斷的邏輯)
- 返回未執行的任務列表
executor框架
1.5後引入的Executor框架的最大優點是把任務的提交和執行解耦。當提交一個Callable對象給ExecutorService,將得到一個Future對象,調用Future對象的get方法等待執行結果。Executor框架的內部使用了執行緒池機制,它在java.util.cocurrent 包下,通過該框架來控制執行緒的啟動、執行和關閉,可以簡化並發編程的操作。
簡介
executor框架由3部分組成:任務、任務的執行、非同步計算的結果
- 任務。需要實現的介面:Runnable和Callable介面。
- 任務的執行。ExecutorService 是一個介面,用於定義執行緒池,調用它的 execute(Runnable)或者 submit(Runnable/Callable)執行任務。ExecutorService介面繼承於Executor,有兩個實現類
ThreadPoolExecutor
和ScheduledThreadPoolExecutor
。 - 非同步計算的結果。包括future介面和實現future介面的FutureTask,調用future.get()會阻塞當前執行緒直到任務完成,future.cancel()可以取消執行任務。
ThreadPoolExecutor實例
使用 ThreadPoolExecutor
構造函數自定義參數的方式來創建執行緒池。
public class ThreadPoolExecutorDemo {
private static final int CORE_POOL_SIZE = 5;
private static final int MAX_POOL_SIZE = 10;
private static final int QUEUE_CAPACITY = 100;
private static final long KEEP_ALIVE_TIME = 1L;
public static void main(String[] args) throws ExecutionException, InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(QUEUE_CAPACITY),
new ThreadPoolExecutor.CallerRunsPolicy()
);
for (int i = 0; i < 10; i++) {
Callable worker = () -> {
System.out.println(Thread.currentThread().getName());
return "ok";
};
Future<String> f = executor.submit(worker);
f.get();
}
executor.shutdown();
while (!executor.isTerminated()) {
}
System.out.println("Finished all threads");
}
}
Runnable和Callable的區別
Runnable 任務執行後不能返回值或者拋出異常。Callable 任務執行後可以返回值或拋出異常。
Executors.callable(Runnable task);//runnable轉化為callable
ExecutorService.execute(Runnable);
ExecutorService.submit(Runnable/Callable);//submit callable任務有返回值
//返回值是泛型參數V
public interface Callable<V> {
V call() throws Exception;
}
Future和FutureTask
Future 可以獲取任務執行的結果、取消任務。調用 future.get()會阻塞當前執行緒直到任務返回結果。
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
FutureTask 實現了 RunnableFuture 介面,而 RunnableFuture 實現了 Runnable 和 Future<V> 介面。
execute()和submit()
execute()
方法用於提交不需要返回值的任務,所以無法判斷任務是否被執行緒池執行成功與否。
submit()
方法用於提交需要返回值的任務。執行緒池會返回一個 Future
類型的對象,通過這個 Future
對象可以判斷任務是否執行成功,並且可以通過 Future
的 get()
方法來獲取返回值,get()
方法會阻塞當前執行緒直到任務完成,而使用 get(long timeout, TimeUnit unit)
方法則會阻塞當前執行緒一段時間後立即返回,無論任務是否執行完。
常用的執行緒池
常見的執行緒池有 FixedThreadPool、SingleThreadExecutor、CachedThreadPool 和 ScheduledThreadPool。這幾個都是 ExecutorService (執行緒池)實例。
FixedThreadPool
固定執行緒數的執行緒池。任何時間點,最多只有 nThreads 個執行緒處於活動狀態執行任務。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());}
使用無界隊列 LinkedBlockingQueue(隊列容量為 Integer.MAX_VALUE),運行中的執行緒池不會拒絕任務,即不會調用RejectedExecutionHandler.rejectedExecution()方法。
maxThreadPoolSize 是無效參數,故將它的值設置為與 coreThreadPoolSize 一致。
keepAliveTime 也是無效參數,設置為0L,因為此執行緒池裡所有執行緒都是核心執行緒,核心執行緒不會被回收(除非設置了executor.allowCoreThreadTimeOut(true))。
不推薦使用:FixedThreadPool 不會拒絕任務,在任務比較多的時候會導致 OOM。
SingleThreadExecutor
只有一個執行緒的執行緒池。
public static ExecutionService newSingleThreadExecutor() { return new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());}
使用無界隊列 LinkedBlockingQueue。執行緒池只有一個運行的執行緒,新來的任務放入工作隊列,執行緒處理完任務就循環從隊列里獲取任務執行。保證順序的執行各個任務。
不推薦使用:同 FixedThreadPool,在任務比較多的時候會導致 OOM。
CachedThreadPool
根據需要創建新執行緒的執行緒池。
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());}
如果主執行緒提交任務的速度高於執行緒處理任務的速度時,CachedThreadPool
會不斷創建新的執行緒。極端情況下,這樣會導致耗盡 cpu 和記憶體資源。
使用沒有容量的SynchronousQueue作為執行緒池工作隊列,當執行緒池有空閑執行緒時,SynchronousQueue.offer(Runnable task)
提交的任務會被空閑執行緒處理,否則會創建新的執行緒處理任務。
不推薦使用:CachedThreadPool
允許創建的執行緒數量為 Integer.MAX_VALUE ,可能會創建大量執行緒,從而導致 OOM。
ScheduledThreadPoolExecutor
在給定的延遲後運行任務,或者定期執行任務。在實際項目中基本不會被用到,因為有其他方案選擇比如quartz
。
使用的任務隊列 DelayQueue
封裝了一個 PriorityQueue
,PriorityQueue
會對隊列中的任務進行排序,時間早的任務先被執行(即ScheduledFutureTask
的 time
變數小的先執行),如果time相同則先提交的任務會被先執行(ScheduledFutureTask
的 squenceNumber
變數小的先執行)。
執行周期任務步驟:
- 執行緒從
DelayQueue
中獲取已到期的ScheduledFutureTask(DelayQueue.take())
。到期任務是指ScheduledFutureTask
的 time 大於等於當前系統的時間; - 執行這個
ScheduledFutureTask
; - 修改
ScheduledFutureTask
的 time 變數為下次將要被執行的時間; - 把這個修改 time 之後的
ScheduledFutureTask
放回DelayQueue
中(DelayQueue.add()
)。
編碼規範
阿里巴巴編碼規約不允許使用Executors去創建執行緒池,而是通過ThreadPoolExecutor的方式手動創建執行緒池,這樣子使用者會更加明確執行緒池的運行機制,避免資源耗盡的風險。
Executors 創建執行緒池對象的弊端:
FixedThreadPool和SingleThreadPool。允許請求隊列長度為 Integer.MAX_VALUE,可能堆積大量請求,從而導致OOM。
CachedThreadPool。創建的執行緒池允許的最大執行緒數是Integer.MAX_VALUE,當添加任務的速度大於執行緒池處理任務的速度,可能會創建大量的執行緒,消耗資源,甚至導致OOM。
正確示例(阿里巴巴編碼規範):
//正例1ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();//Common Thread PoolExecutorService pool = new ThreadPoolExecutor(5, 200,0L, TimeUnit.MILLISECONDS, //0L keepAliveTimenew LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());pool.execute(()-> System.out.println(Thread.currentThread().getName()));pool.shutdown();//gracefully shutdown//正例2ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, //corePoolSize threadFactory new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build());
JMM
Java記憶體模型:執行緒之間的共享變數存儲在主記憶體里,每個執行緒都有自己私有的本地記憶體,本地記憶體保存了共享變數的副本,執行緒對變數的操作都在本地記憶體中進行,不能直接讀寫主記憶體中的變數。
本地記憶體是JMM的一個抽象概念,並不真實存在,它包括快取、寫緩衝區、暫存器以及其他硬體和編譯器優化。
進程執行緒
進程是指一個記憶體中運行的應用程式,每個進程都有自己獨立的一塊記憶體空間,一個進程中可以啟動多個執行緒。
執行緒是比進程更小的執行單位,它是在一個進程中獨立的控制流,一個進程可以啟動多個執行緒,每條執行緒並行執行不同的任務。
執行緒狀態
初始(NEW):執行緒被構建,還沒有調用 start()。
運行(RUNNABLE):包括作業系統的就緒和運行兩種狀態。
阻塞(BLOCKED):一般是被動的,在搶佔資源中得不到資源,被動的掛起在記憶體,等待資源釋放將其喚醒。執行緒被阻塞會釋放CPU,不釋放記憶體。
等待(WAITING):進入該狀態的執行緒需要等待其他執行緒做出一些特定動作(通知或中斷)。
超時等待(TIMED_WAITING):該狀態不同於WAITING,它可以在指定的時間後自行返回。
終止(TERMINATED):表示該執行緒已經執行完畢。
圖片來源:Java並發編程的藝術
中斷
執行緒中斷即執行緒運行過程中被其他執行緒給打斷了,它與 stop 最大的區別是:stop 是由系統強制終止執行緒,而執行緒中斷則是給目標執行緒發送一個中斷訊號,如果目標執行緒沒有接收執行緒中斷的訊號並結束執行緒,執行緒則不會終止,具體是否退出或者執行其他邏輯取決於目標執行緒。
執行緒中斷三個重要的方法:
1、java.lang.Thread#interrupt
調用目標執行緒的interrupt()方法,給目標執行緒發一個中斷訊號,執行緒被打上中斷標記。
2、java.lang.Thread#isInterrupted()
判斷目標執行緒是否被中斷,不會清除中斷標記。
3、java.lang.Thread#interrupted
判斷目標執行緒是否被中斷,會清除中斷標記。
private static void test2() { Thread thread = new Thread(() -> { while (true) { Thread.yield(); // 響應中斷 if (Thread.currentThread().isInterrupted()) { System.out.println("Java技術棧執行緒被中斷,程式退出。"); return; } } }); thread.start(); thread.interrupt();}
常見方法
join
Thread.join(),在main中創建了thread執行緒,在main中調用了thread.join()/thread.join(long millis),main執行緒放棄cpu控制權,執行緒進入WAITING/TIMED_WAITING狀態,等到thread執行緒執行完才繼續執行main執行緒。
public final void join() throws InterruptedException { join(0);}
yield
Thread.yield(),一定是當前執行緒調用此方法,當前執行緒放棄獲取的CPU時間片,但不釋放鎖資源,由運行狀態變為就緒狀態,讓OS再次選擇執行緒。作用:讓相同優先順序的執行緒輪流執行,但並不保證一定會輪流執行。實際中無法保證yield()達到讓步目的,因為讓步的執行緒還有可能被執行緒調度程式再次選中。Thread.yield()不會導致阻塞。該方法與sleep()類似,只是不能由用戶指定暫停多長時間。
public static native void yield(); //static方法
sleep
Thread.sleep(long millis),一定是當前執行緒調用此方法,當前執行緒進入TIMED_WAITING狀態,讓出cpu資源,但不釋放對象鎖,指定時間到後又恢復運行。作用:給其它執行緒執行機會的最佳方式。
public static native void sleep(long millis) throws InterruptedException;//static方法
wait()和sleep()的區別
相同點:
- 使當前執行緒暫停運行,把機會交給其他執行緒
- 任何執行緒在等待期間被中斷都會拋出InterruptedException
不同點:
- wait() 是Object超類中的方法;而sleep()是執行緒Thread類中的方法
- 對鎖的持有不同,wait()會釋放鎖,而sleep()並不釋放鎖
- 喚醒方法不完全相同,wait() 依靠notify或者notifyAll 、中斷、達到指定時間來喚醒;而sleep()到達指定時間被喚醒
- 調用obj.wait()需要先獲取對象的鎖,而 Thread.sleep()不用
創建執行緒的方法
- 通過擴展Thread類來創建多執行緒
- 通過實現Runnable介面來創建多執行緒,可實現執行緒間的資源共享
- 實現Callable介面,通過FutureTask介面創建執行緒。
- 使用Executor框架來創建執行緒池。
繼承 Thread 創建執行緒程式碼如下。run()方法是由jvm創建完作業系統級執行緒後回調的方法,不可以手動調用,手動調用相當於調用普通方法。
/** * @author: 程式設計師大彬 * @time: 2021-09-11 10:15 */public class MyThread extends Thread { public MyThread() { } @Override public void run() { for (int i = 0; i < 10; i++) { System.out.println(Thread.currentThread() + ":" + i); } } public static void main(String[] args) { MyThread mThread1 = new MyThread(); MyThread mThread2 = new MyThread(); MyThread myThread3 = new MyThread(); mThread1.start(); mThread2.start(); myThread3.start(); }}
Runnable 創建執行緒程式碼:
/** * @author: 程式設計師大彬 * @time: 2021-09-11 10:04 */public class RunnableTest { public static void main(String[] args){ Runnable1 r = new Runnable1(); Thread thread = new Thread(r); thread.start(); System.out.println("主執行緒:["+Thread.currentThread().getName()+"]"); }}class Runnable1 implements Runnable{ @Override public void run() { System.out.println("當前執行緒:"+Thread.currentThread().getName()); }}
實現Runnable介面比繼承Thread類所具有的優勢:
- 資源共享,適合多個相同的程式程式碼的執行緒去處理同一個資源
- 可以避免java中的單繼承的限制
- 執行緒池只能放入實現Runable或Callable類執行緒,不能直接放入繼承Thread的類
Callable 創建執行緒程式碼:
/** * @author: 程式設計師大彬 * @time: 2021-09-11 10:21 */public class CallableTest { public static void main(String[] args) { Callable1 c = new Callable1(); //非同步計算的結果 FutureTask<Integer> result = new FutureTask<>(c); new Thread(result).start(); try { //等待任務完成,返回結果 int sum = result.get(); System.out.println(sum); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }}class Callable1 implements Callable<Integer> { @Override public Integer call() throws Exception { int sum = 0; for (int i = 0; i <= 100; i++) { sum += i; } return sum; }}
使用 Executor 創建執行緒程式碼:
/** * @author: 程式設計師大彬 * @time: 2021-09-11 10:44 */public class ExecutorsTest { public static void main(String[] args) { //獲取ExecutorService實例,生產禁用,需要手動創建執行緒池 ExecutorService executorService = Executors.newCachedThreadPool(); //提交任務 executorService.submit(new RunnableDemo()); }}class RunnableDemo implements Runnable { @Override public void run() { System.out.println("大彬"); }}
執行緒間通訊
volatile
volatile是輕量級的同步機制,volatile保證變數對所有執行緒的可見性,不保證原子性。
- 當對volatile變數進行寫操作的時候,JVM會向處理器發送一條LOCK前綴的指令,將該變數所在快取行的數據寫回系統記憶體。
- 由於快取一致性協議,每個處理器通過嗅探在匯流排上傳播的數據來檢查自己的快取是不是過期了,當處理器發現自己快取行對應的記憶體地址被修改,就會將當前處理器的快取行置為無效狀態,當處理器對這個數據進行修改操作的時候,會重新從系統記憶體中把數據讀到處理器快取中。
MESI(快取一致性協議):當CPU寫數據時,如果發現操作的變數是共享變數,即在其他CPU中也存在該變數的副本,會發出訊號通知其他CPU將該變數的快取行置為無效狀態,因此當其他CPU需要讀取這個變數時,就會從記憶體重新讀取。
volatile關鍵字的兩個作用:
- 保證了不同執行緒對共享變數進行操作時的可見性,即一個執行緒修改了某個變數的值,這新值對其他執行緒來說是立即可見的。
- 禁止進行指令重排序。
指令重排序是JVM為了優化指令,提高程式運行效率,在不影響單執行緒程式執行結果的前提下,儘可能地提高並行度。Java編譯器會在生成指令系列時在適當的位置會插入記憶體屏障
指令來禁止處理器重排序。插入一個記憶體屏障,相當於告訴CPU和編譯器先於這個命令的必須先執行,後於這個命令的必須後執行。對一個volatile欄位進行寫操作,Java記憶體模型將在寫操作後插入一個寫屏障指令,這個指令會把之前的寫入值都刷新到記憶體。
synchronized
保證執行緒對變數訪問的可見性和排他性。synchronized 詳細內容見下文鎖部分。
等待通知機制
wait/notify為 Object 對象的方法,調用wait/notify需要先獲得對象的鎖。對象調用wait之後執行緒釋放鎖,將執行緒放到對象的等待隊列,當通知執行緒調用此對象的notify()方法後,等待執行緒並不會立即從wait返回,需要等待通知執行緒釋放鎖(通知執行緒執行完同步程式碼塊),等待隊列里的執行緒獲取鎖,獲取鎖成功才能從wait()方法返回,即從wait方法返回前提是執行緒獲得鎖。
等待通知機制依託於同步機制,目的是確保等待執行緒從wait方法返回時能感知到通知執行緒對對象的變數值的修改。
鎖
synchronized
較常用的用於保證執行緒安全的方式。當一個執行緒獲取到鎖時,其他執行緒都會被阻塞住,當持有鎖的執行緒釋放鎖之後會喚醒這些執行緒,被喚醒的執行緒才有機會獲取到鎖。
- 修飾實例方法,作用於當前對象實例加鎖,進入同步程式碼前要獲得當前對象實例的鎖
- 修飾靜態方法,作用於當前類對象加鎖,進入同步程式碼前要獲得當前類對象的鎖(類的位元組碼文件)
- 修飾程式碼塊,指定加鎖對象,對給定對象加鎖,進入同步程式碼塊前要獲得給定對象的鎖
獲取了類鎖的執行緒和獲取了對象鎖的執行緒是不衝突的。
釋放鎖
當方法或者程式碼塊執行完畢後會自動釋放鎖,不需要做任何的操作。
當一個執行緒執行的程式碼出現異常時,其所持有的鎖會自動釋放。
實現原理
synchronized通過對象內部的監視器鎖(monitor)實現。每個對象都有一個monitor,當對象的monitor被持有時,則它處於鎖定的狀態。
程式碼塊的同步是使用monitorenter和monitorexit指令實現的,monitorenter指令是在編譯後插入到同步程式碼塊的開始位置,而monitorexit是插入到方法結束處或異常處。
public class SynchronizedDemo { public void method() { synchronized (this) { System.out.println("method start"); } }}
執行緒訪問同步塊時,先執行monitorenter指令時嘗試獲取monitor,過程如下:
- 如果monitor的進入數entry count為0,則該執行緒進入monitor,然後將進入數設置為1,該執行緒即為monitor的所有者。
- 如果執行緒已經佔有該monitor,只是重新進入,則進入monitor的進入數加1。
- 如果其他執行緒已經佔用了monitor,則該執行緒進入阻塞狀態,直到monitor的進入數為0,再重新嘗試獲取monitor。
執行緒退出同步塊時會執行monitorexit指令,monitor的進入數減1,如果減1後進入數為0,那執行緒退出monitor,不再是這個monitor的所有者。其他被這個monitor阻塞的執行緒可以嘗試去獲取這個 monitor。
Synchronized底層是通過一個monitor的對象來完成,其實wait/notify等方法也依賴於monitor對象,這就是為什麼只有在同步的塊或者方法中才能調用wait/notify等方法,否則會拋出java.lang.IllegalMonitorStateException的異常的原因。
方法的同步不是通過添加monitorenter和monitorexit指令來完成,而是在其常量池中添加了ACC_SYNCHRONIZED標識符。JVM就是根據該標識符來實現方法的同步的:當執行緒調用方法時,會先檢查方法的 ACC_SYNCHRONIZED 訪問標誌是否被設置,如果設置了,說明此方法是同步方法,執行執行緒將先獲取monitor,獲取成功之後才能執行方法體,方法執行完後再釋放monitor。在方法執行期間,其他執行緒無法再獲得同一個monitor對象。
public class SynchronizedMethod { public synchronized void method() { System.out.println("Hello World!"); }}
鎖的狀態
Synchronized是通過對象內部的監視器來實現的。但是監視器鎖本質又是依賴於底層的作業系統的Mutex Lock來實現的。而作業系統實現執行緒之間的切換這就需要從用戶態轉換到核心態,這個成本非常高,狀態之間的轉換需要相對比較長的時間。這種依賴於作業系統Mutex Lock所實現的鎖我們稱之為重量級鎖。
JDK1.6中為了減少獲得鎖和釋放鎖帶來的性能消耗,引入了偏向鎖、輕量級鎖、自旋鎖、適應性自旋鎖、鎖消除、鎖粗化等技術來減少鎖操作的開銷。
synchronized鎖主要存在四種狀態,依次是:偏向鎖狀態、輕量級鎖狀態、重量級鎖狀態,他們會隨著競爭的激烈而逐漸升級。鎖可以升級不可降級,這種策略是為了提高獲得鎖和釋放鎖的效率。
-
偏向鎖:當執行緒訪問同步塊並獲取鎖時,會在對象頭和鎖記錄中存儲鎖偏向的執行緒id,以後該執行緒進入和退出同步塊時,只需簡單測試一下對象頭的mark word中是否存儲著指向當前執行緒的偏向鎖,如果測試成功,則執行緒獲取鎖成功,否則,需再測試一下mark word中偏向鎖標識是否是1,是的話則使用CAS操作競爭鎖。如果競爭成功,則將Mark Word中執行緒ID設置為當前執行緒ID,如果CAS獲取偏向鎖失敗,則表示有競爭。當到達全局安全點時獲得偏向鎖的執行緒被掛起,偏向鎖升級為輕量級鎖,然後被阻塞在安全點的執行緒繼續往下執行同步程式碼。
偏向鎖偏向於第一個獲得它的執行緒,如果程式運行過程,該鎖沒有被其他執行緒獲取,那麼持有偏向鎖的執行緒就不需要進行同步。引入偏向鎖是為了在無多執行緒競爭的情況下盡量減少不必要的輕量級鎖執行的開銷,因為輕量級鎖的獲取及釋放使用了多次CAS原子指令,而偏向鎖只在置換ThreadID的時候使用一次CAS原子指令。當存在鎖競爭的時候,偏向鎖會升級為輕量級鎖。
適用場景:在鎖無競爭的情況下使用,在執行緒沒有執行完同步程式碼之前,沒有其它執行緒去競爭鎖,一旦有了競爭就升級為輕量級鎖,升級為輕量級鎖的時候需要撤銷偏向鎖,會做很多額外操作,導致性能下降。 -
輕量級鎖
加鎖過程:執行緒執行同步塊之前,JVM會先在當前執行緒的棧幀中創建用於存儲鎖記錄的空間,並將對象頭的mark word複製到鎖記錄(displaced mark word)中,然後執行緒嘗試使用cas將對象頭的mark word替換為指向鎖記錄的指針。如果成功,則當前執行緒獲得鎖,否則表示有其他執行緒競爭鎖,當前執行緒便嘗試使用自旋來獲得鎖。當自旋超過一定的次數,或者一個執行緒在持有鎖,一個在自旋,又有第三個來訪時,輕量級鎖升級為重量級鎖。
解鎖過程:使用原子的cas操作將displaced mark word替換回到對象頭,如果成功則解鎖成功,否則表明有鎖競爭,鎖會膨脹成重量級鎖。在沒有多執行緒競爭的前提下,使用輕量級鎖可以減少傳統的重量級鎖使用作業系統互斥量(申請互斥鎖)產生的性能消耗,因為使用輕量級鎖時,不需要申請互斥量。另外,輕量級鎖的加鎖和解鎖都用到了CAS操作。如果沒有競爭,輕量級鎖使用 CAS 操作避免了使用互斥操作的開銷。但如果存在鎖競爭,除了互斥量開銷外,還會額外發生CAS操作,因此在有鎖競爭的情況下,輕量級鎖比傳統的重量級鎖更慢!如果鎖競爭激烈,那麼輕量級鎖將很快膨脹為重量級鎖!
-
重量級鎖:當一個執行緒獲取到鎖時,其他執行緒都會被阻塞住,當持有鎖的執行緒釋放鎖之後會喚醒這些執行緒,被喚醒的執行緒才有機會獲取到鎖。
synchronized和Lock能保證同一時刻只有一個執行緒獲取鎖然後執行同步程式碼,並且在釋放鎖之前會將對變數的修改刷新到主存當中,保證了可見性。
-
自旋鎖:一般執行緒持有鎖的時間都不是太長,所以僅僅為了這一點時間去掛起執行緒/恢復執行緒比較浪費資源。自旋鎖就是讓該執行緒等待一段時間,執行一段無意義的循環,不會被立即掛起,看持有鎖的執行緒是否會很快釋放鎖。如果持有鎖的執行緒很快就釋放了鎖,那麼自旋的效率就非常好,反之,自旋的執行緒就會白白消耗掉處理的資源,這樣反而會帶來性能上的浪費。所以自旋的次數必須要有一個限度,如果自旋超過了限定次數仍然沒有獲取到鎖,則應該被掛起。
-
自適應自旋鎖:JDK 1.6引入了更加聰明的自旋鎖,即自適應自旋鎖。所謂自適應就意味著自旋的次數不再是固定的,它是由前一次在同一個鎖上的自旋時間及鎖的擁有者的狀態來決定。
-
鎖消除:虛擬機即使編譯器在運行時,如果檢測到那些共享數據不可能存在競爭,那麼就執行鎖消除。
-
鎖粗化:如果一系列的連續操作都對同一個對象反覆加鎖和解鎖,那麼會帶來很多不必要的性能消耗,使用鎖粗化減少鎖操作的開銷。
ReentrantLock
重入鎖,支援一個執行緒對資源的重複加鎖。該鎖的還支援設置獲取鎖時的公平和非公平性。
使用lock時需要在try finally塊進行解鎖:
public static final Object get(String key) { r.lock(); try { return map.get(key); } finally { r.unlock(); }}
原理
ReentrantLock是通過組合自定義同步器來實現鎖的獲取與釋放。當執行緒嘗試獲取同步狀態時,首先判斷當前執行緒是否為獲取鎖的執行緒來決定獲取操作是否成功,如果是獲取鎖的執行緒再次請求,則將同步狀態值進行增加並返回true,表示獲取同步狀態成功。獲取同步狀態失敗,則該執行緒會被構造成node節點放到AQS同步隊列中。
如果鎖被獲取了n次,那麼前n-1次 tryRelease(int releases)方法必須返回false,第n次調用tryRelease()之後,同步狀態完全釋放(值為0),才會返回true。
ReentrantLock和synchronized區別
- 使用synchronized關鍵字實現同步,執行緒執行完同步程式碼塊會自動釋放鎖,而ReentrantLock需要手動釋放鎖。
- synchronized是非公平鎖,ReentrantLock可以設置為公平鎖。
- ReentrantLock上等待獲取鎖的執行緒是可中斷的,執行緒可以放棄等待鎖。而synchonized會無限期等待下去。
- ReentrantLock 可以設置超時獲取鎖。在指定的截止時間之前獲取鎖,如果截止時間到了還沒有獲取到鎖,則返回。
- ReentrantLock 的 tryLock() 方法可以嘗試非阻塞的獲取鎖,調用該方法後立刻返回,如果能夠獲取則返回true,否則返回false。
鎖的分類
公平鎖與非公平鎖
按照執行緒訪問順序獲取對象鎖。synchronized 是非公平鎖, Lock 默認是非公平鎖,可以設置為公平鎖,公平鎖會影響性能。
public ReentrantLock() { sync = new NonfairSync();}public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync();}
共享式與獨佔式鎖
共享式與獨佔式的最主要區別在於:同一時刻獨佔式只能有一個執行緒獲取同步狀態,而共享式在同一時刻可以有多個執行緒獲取同步狀態。例如讀操作可以有多個執行緒同時進行,而寫操作同一時刻只能有一個執行緒進行寫操作,其他操作都會被阻塞。
悲觀鎖與樂觀鎖
悲觀鎖,每次訪問資源都會加鎖,執行完同步程式碼釋放鎖,synchronized 和 ReentrantLock 屬於悲觀鎖。
樂觀鎖,不會鎖定資源,所有的執行緒都能訪問並修改同一個資源,如果沒有衝突就修改成功並退出,否則就會繼續循環嘗試。樂觀鎖最常見的實現就是CAS。
樂觀鎖一般來說有以下2種方式:
- 使用數據版本記錄機制實現,這是樂觀鎖最常用的一種實現方式。給數據增加一個版本標識,一般是通過為資料庫表增加一個數字類型的version欄位來實現。當讀取數據時,將version欄位的值一同讀出,數據每更新一次,對此version值加一。當我們提交更新的時候,判斷資料庫表對應記錄的當前版本資訊與第一次取出來的version值進行比對,如果資料庫表當前版本號與第一次取出來的version值相等,則予以更新,否則認為是過期數據。
- 使用時間戳。資料庫表增加一個欄位,欄位類型使用時間戳(timestamp),和上面的version類似,也是在更新提交的時候檢查當前資料庫中數據的時間戳和自己更新前取到的時間戳進行對比,如果一致則OK,否則就是版本衝突。
適用場景:
- 悲觀鎖適合寫操作多的場景。
- 樂觀鎖適合讀操作多的場景,不加鎖可以提升讀操作的性能。
CAS
CAS全稱 Compare And Swap,比較與交換,是樂觀鎖的主要實現方式。CAS 在不使用鎖的情況下實現多執行緒之間的變數同步。ReentrantLock 內部的 AQS 和原子類內部都使用了 CAS。
CAS演算法涉及到三個操作數:
- 需要讀寫的記憶體值 V。
- 進行比較的值 A。
- 要寫入的新值 B。
只有當 V 的值等於 A 時,才會使用原子方式用新值B來更新V的值,否則會繼續重試直到成功更新值。
以 AtomicInteger 為例,AtomicInteger 的 getAndIncrement()方法底層就是CAS實現,關鍵程式碼是 compareAndSwapInt(obj, offset, expect, update)
,其含義就是,如果obj
內的value
和expect
相等,就證明沒有其他執行緒改變過這個變數,那麼就更新它為update
,如果不相等,那就會繼續重試直到成功更新值。
CAS 三大問題:
-
ABA問題。CAS需要在操作值的時候檢查記憶體值是否發生變化,沒有發生變化才會更新記憶體值。但是如果記憶體值原來是A,後來變成了B,然後又變成了A,那麼CAS進行檢查時會發現值沒有發生變化,但是實際上是有變化的。ABA問題的解決思路就是在變數前面添加版本號,每次變數更新的時候都把版本號加一,這樣變化過程就從
A-B-A
變成了1A-2B-3A
。JDK從1.5開始提供了AtomicStampedReference類來解決ABA問題,原子更新帶有版本號的引用類型。
-
循環時間長開銷大。CAS操作如果長時間不成功,會導致其一直自旋,給CPU帶來非常大的開銷。
-
只能保證一個共享變數的原子操作。對一個共享變數執行操作時,CAS能夠保證原子操作,但是對多個共享變數操作時,CAS是無法保證操作的原子性的。
Java從1.5開始JDK提供了AtomicReference類來保證引用對象之間的原子性,可以把多個變數放在一個對象里來進行CAS操作。
並發工具
在JDK的並發包里提供了幾個非常有用的並發工具類。CountDownLatch、CyclicBarrier和Semaphore工具類提供了一種並發流程式控制制的手段。
CountDownLatch
CountDownLatch用於某個執行緒等待其他執行緒執行完任務再執行,與thread.join()功能類似。常見的應用場景是開啟多個執行緒同時執行某個任務,等到所有任務執行完再執行特定操作,如匯總統計結果。
public class CountDownLatchDemo { static final int N = 4; static CountDownLatch latch = new CountDownLatch(N); public static void main(String[] args) throws InterruptedException { for(int i = 0; i < N; i++) { new Thread(new Thread1()).start(); } latch.await(1000, TimeUnit.MILLISECONDS); //調用await()方法的執行緒會被掛起,它會等待直到count值為0才繼續執行;等待timeout時間後count值還沒變為0的話就會繼續執行 System.out.println("task finished"); } static class Thread1 implements Runnable { @Override public void run() { try { System.out.println(Thread.currentThread().getName() + "starts working"); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { latch.countDown(); } } }}
運行結果:
Thread-0starts workingThread-1starts workingThread-2starts workingThread-3starts workingtask finished
CyclicBarrier
CyclicBarrier(同步屏障),用於一組執行緒互相等待到某個狀態,然後這組執行緒再同時執行。
public CyclicBarrier(int parties, Runnable barrierAction) {}public CyclicBarrier(int parties) {}
參數parties指讓多少個執行緒或者任務等待至某個狀態;參數barrierAction為當這些執行緒都達到某個狀態時會執行的內容。
public class CyclicBarrierTest { // 請求的數量 private static final int threadCount = 10; // 需要同步的執行緒數量 private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5); public static void main(String[] args) throws InterruptedException { // 創建執行緒池 ExecutorService threadPool = Executors.newFixedThreadPool(10); for (int i = 0; i < threadCount; i++) { final int threadNum = i; Thread.sleep(1000); threadPool.execute(() -> { try { test(threadNum); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (BrokenBarrierException e) { // TODO Auto-generated catch block e.printStackTrace(); } }); } threadPool.shutdown(); } public static void test(int threadnum) throws InterruptedException, BrokenBarrierException { System.out.println("threadnum:" + threadnum + "is ready"); try { /**等待60秒,保證子執行緒完全執行結束*/ cyclicBarrier.await(60, TimeUnit.SECONDS); } catch (Exception e) { System.out.println("-----CyclicBarrierException------"); } System.out.println("threadnum:" + threadnum + "is finish"); }}
運行結果如下,可以看出CyclicBarrier是可以重用的:
threadnum:0is readythreadnum:1is readythreadnum:2is readythreadnum:3is readythreadnum:4is readythreadnum:4is finishthreadnum:3is finishthreadnum:2is finishthreadnum:1is finishthreadnum:0is finishthreadnum:5is readythreadnum:6is ready...
當四個執行緒都到達barrier狀態後,會從四個執行緒中選擇一個執行緒去執行Runnable。
CyclicBarrier和CountDownLatch區別
CyclicBarrier 和 CountDownLatch 都能夠實現執行緒之間的等待。
CountDownLatch用於某個執行緒等待其他執行緒執行完任務再執行。CyclicBarrier用於一組執行緒互相等待到某個狀態,然後這組執行緒再同時執行。
CountDownLatch的計數器只能使用一次,而CyclicBarrier的計數器可以使用reset()方法重置,可用於處理更為複雜的業務場景。
Semaphore
Semaphore類似於鎖,它用於控制同時訪問特定資源的執行緒數量,控制並發執行緒數。
public class SemaphoreDemo { public static void main(String[] args) { final int N = 7; Semaphore s = new Semaphore(3); for(int i = 0; i < N; i++) { new Worker(s, i).start(); } } static class Worker extends Thread { private Semaphore s; private int num; public Worker(Semaphore s, int num) { this.s = s; this.num = num; } @Override public void run() { try { s.acquire(); System.out.println("worker" + num + " using the machine"); Thread.sleep(1000); System.out.println("worker" + num + " finished the task"); s.release(); } catch (InterruptedException e) { e.printStackTrace(); } } }}
運行結果如下,可以看出並非按照執行緒訪問順序獲取資源的鎖,即
worker0 using the machineworker1 using the machineworker2 using the machineworker2 finished the taskworker0 finished the taskworker3 using the machineworker4 using the machineworker1 finished the taskworker6 using the machineworker4 finished the taskworker3 finished the taskworker6 finished the taskworker5 using the machineworker5 finished the task
原子類
基本類型原子類
使用原子的方式更新基本類型
- AtomicInteger:整型原子類
- AtomicLong:長整型原子類
- AtomicBoolean :布爾型原子類
AtomicInteger 類常用的方法:
public final int get() //獲取當前的值public final int getAndSet(int newValue)//獲取當前的值,並設置新的值public final int getAndIncrement()//獲取當前的值,並自增public final int getAndDecrement() //獲取當前的值,並自減public final int getAndAdd(int delta) //獲取當前的值,並加上預期的值boolean compareAndSet(int expect, int update) //如果輸入的數值等於預期值,則以原子方式將該值設置為輸入值(update)public final void lazySet(int newValue)//最終設置為newValue,使用 lazySet 設置之後可能導致其他執行緒在之後的一小段時間內還是可以讀到舊的值。
AtomicInteger 類主要利用 CAS (compare and swap) 保證原子操作,從而避免加鎖的高開銷。
數組類型原子類
使用原子的方式更新數組裡的某個元素
- AtomicIntegerArray:整形數組原子類
- AtomicLongArray:長整形數組原子類
- AtomicReferenceArray :引用類型數組原子類
AtomicIntegerArray 類常用方法:
public final int get(int i) //獲取 index=i 位置元素的值public final int getAndSet(int i, int newValue)//返回 index=i 位置的當前的值,並將其設置為新值:newValuepublic final int getAndIncrement(int i)//獲取 index=i 位置元素的值,並讓該位置的元素自增public final int getAndDecrement(int i) //獲取 index=i 位置元素的值,並讓該位置的元素自減public final int getAndAdd(int i, int delta) //獲取 index=i 位置元素的值,並加上預期的值boolean compareAndSet(int i, int expect, int update) //如果輸入的數值等於預期值,則以原子方式將 index=i 位置的元素值設置為輸入值(update)public final void lazySet(int i, int newValue)//最終 將index=i 位置的元素設置為newValue,使用 lazySet 設置之後可能導致其他執行緒在之後的一小段時間內還是可以讀到舊的值。
引用類型原子類
- AtomicReference:引用類型原子類
- AtomicStampedReference:帶有版本號的引用類型原子類。該類將整數值與引用關聯起來,可用於解決原子的更新數據和數據的版本號,可以解決使用 CAS 進行原子更新時可能出現的 ABA 問題。
- AtomicMarkableReference :原子更新帶有標記的引用類型。該類將 boolean 標記與引用關聯起來
AQS
AQS定義了一套多執行緒訪問共享資源的同步器框架,許多並發工具的實現都依賴於它,如常用的ReentrantLock/Semaphore/CountDownLatch。
原理
AQS使用一個volatile的int類型的成員變數state來表示同步狀態,通過CAS修改同步狀態的值。
private volatile int state;//共享變數,使用volatile修飾保證執行緒可見性
同步器依賴內部的同步隊列(一個FIFO雙向隊列)來完成同步狀態的管理,當前執行緒獲取同步狀態失敗時,同步器會將當前執行緒以及等待狀態(獨佔或共享 )構造成為一個節點(Node)並將其加入同步隊列並進行自旋,當同步狀態釋放時,會把首節中的後繼節點對應的執行緒喚醒,使其再次嘗試獲取同步狀態。
Condition
任意一個Java對象,都擁有一組監視器方法(定義在java.lang.Object上),主要包括wait()、wait(long timeout)、notify()以及notifyAll()方法,使用這些方法的前提是已經獲取對象的鎖,和 synchronized 配合使用。Condition介面也提供了類似Object的監視器方法,與Lock配合可以實現等待/通知模式。Condition是依賴Lock對象。
Lock lock = new ReentrantLock();Condition condition = lock.newCondition();public void conditionWait() throws InterruptedException { lock.lock(); try { condition.await(); } finally { lock.unlock(); }}public void conditionSignal() throws InterruptedException { lock.lock(); try { condition.signal(); } finally { lock.unlock(); }}
一般將Condition對象作為成員變數。當調用await()方法後,當前執行緒會釋放鎖進入等待隊列。其他執行緒調用Condition對象的signal()方法,喚醒等待隊列首節點的執行緒。
實現原理
每個Condition對象都包含著一個等待隊列,如果一個執行緒成功獲取了鎖之後調用了Condition.await()方法,那麼該執行緒將會釋放同步狀態、喚醒同步隊列中的後繼節點,然後構造成節點加入等待隊列。只有當執行緒再次獲取Condition相關聯的鎖之後,才能從await()方法返回。
圖片來源:Java並發編程的藝術
在Object的監視器模型上,一個對象擁有一個同步隊列和等待隊列。Lock通過AQS實現,AQS可以有多個Condition,所以Lock擁有一個同步隊列和多個等待隊列。
圖片來源:Java並發編程的藝術
執行緒獲取了鎖之後,調用Condition的signal()方法,會將等待隊列的隊首節點移到同步隊列中,然後該節點的執行緒會嘗試去獲取同步狀態。成功獲取同步狀態之後,執行緒將await()方法返回。
圖片來源:Java並發編程的藝術
其他
Daemon Thread
在Java中有兩類執行緒:
- User Thread(用戶執行緒)
- Daemon Thread(守護執行緒)
只要當前JVM實例中尚存在任何一個非守護執行緒沒有結束,守護執行緒就全部工作;只有當最後一個非守護執行緒結束時,守護執行緒隨著JVM一同結束工作。
Daemon的作用是為其他執行緒的運行提供便利服務,守護執行緒最典型的應用就是垃圾收集。
將執行緒轉換為守護執行緒可以通過調用Thread對象的setDaemon(true)方法來實現。
參考資料
本文已經收錄到github倉庫,此倉庫用於分享Java相關知識總結,包括Java基礎、MySQL、Spring Boot、MyBatis、Redis、RabbitMQ、電腦網路、數據結構與演算法等等,歡迎大家提pr和star!
github地址://github.com/Tyson0314/Java-learning
如果github訪問不了,可以訪問gitee倉庫。