多執行緒編程(2)—執行緒安全
- 2019 年 10 月 23 日
- 筆記
1. 問題的引出
執行緒安全歸根結底可以說是記憶體安全,在jvm記憶體模型中,有一塊特殊的公共記憶體空間,稱為堆記憶體,進程內的所有執行緒都可以訪問並修改其中的數據,就會造成潛在的問題。因為堆記憶體空間在沒有保護機制的情況下,你放進去的數據,可能被別的執行緒篡改。如下程式碼:
public class ThreadSafe implements Runnable { private static int count = 0; @Override public void run() { for (int i = 0; i < 1000; i++) { count++; } } public static void main(String[] args) throws InterruptedException { ExecutorService es = Executors.newFixedThreadPool(10); for (int i = 0; i < 20; i++) { es.execute(new ThreadSafe()); } es.shutdown(); //不允許添加執行緒到執行緒池,非同步關閉連接池 es.awaitTermination(10L, TimeUnit.SECONDS); //等待連接池的執行緒任務完成 System.out.println(count); } }
本來期望的值是20000,可是最終輸出的結果卻一點在變化,其值總是小於等於20000,顯然這是由於執行緒不安全造成的,多個執行緒並發的去訪問全局變數、靜態變數、文件、設備、套接字等都可能出現這種問題。
2.
為了協調和配合執行緒之間對共享資源的訪問,通常有四種方式:
1. 臨界區:訪問某一段臨界資源的程式碼片段,與共享資源類似,但有一點不同的是,某一時刻只允許一個執行緒去訪問(對應java中的關鍵字 synchronized包含的程式碼)。
2. 互斥量:互斥量是一個對象,只有都擁有互斥量的對象才可以訪問共享資源。而且互斥量中只有一個,通常互斥量的實現是通過鎖來實現的,而且加鎖操作和釋放操作只能由同一個執行緒來完成。此處與臨界區的區別是一段程式碼,通常存在與一個文件中,而互斥量是一個對象,加鎖操作和解鎖操作可以在不同的文件去編寫,只要是同一個執行緒就好。
3. 訊號量: 訊號量可以允許指定數量的執行緒同一時刻去訪問共享資源,當執行緒數達到了閾值後,將阻止其他執行緒的訪問,最常見的比如生產者和消費者問題。訊號量和互斥量的區別則是訊號量的發出和釋放可以由不同執行緒來完成。
4. 事件:通過發送通知的形式來實現執行緒同步,可以實現不同進程中的執行緒同步操作。
3.
飢餓:某些執行緒或進程由於長期得不到資源,而總是處於就緒或者阻塞狀態。例如:
②. 由於選用不恰當的調度演算法,導致該進程或執行緒長期無法得到CPU時間片,處於就緒狀態。
③. 由於喚醒的時間把握不對,喚醒執行緒時,所需的資源處於被鎖定狀態,導致執行緒回到阻塞狀態。
死鎖:兩個或多個執行緒在執行過程中,由於相互佔有對方所需的資源而又互不相讓從而造成這些執行緒都被阻塞,若無外力的作用下,他們都將無法執行下去。例如
①. 進程推進順序不合適。互相佔有彼此需要的資源,同時請求對方佔有的資源,形成循環依賴的關係。
②. 資源不足。
③. 進程運行推進順序與速度不同,也可能產生死鎖。
一些避免死鎖的措施:
-
不要在鎖區域內在加把鎖,即不要在釋放鎖之前競爭其他鎖。
-
減小鎖粒度,即減小執行緒加鎖的範圍,真正需要的時候再去加鎖。
-
順序訪問共享資源。
-
設置超時機制,超過指定時間則程式返回錯誤。
-
競爭鎖期間,允許程式被中斷。
可見性:當多個執行緒並發的讀寫某個共享資源的時候,每個執行緒總是可以取到該共享資源的最新數據。
原子性:某執行緒對一個或者多個共享資源所做的一連串寫操作不會被中斷,在此期間不會有其他執行緒同時對這些共享資源進行寫操作。
有序性:單個執行緒內的操作必須是有序的。
通常原子性都可以得到保證,問題的病端就出在可見性和原子性。
public class Thread4 implements Runnable{ private static boolean flag = false; @Override public void run() { System.out.println("waiting for data...."); while (!flag); System.out.println("cpying with data"); } public static void main(String[] args) throws InterruptedException { Thread4 thread4 = new Thread4(); Thread t = new Thread(thread4); t.start(); Thread.sleep(1000); flag = true; } } /* output * waiting for data.... */
public class Thread4 implements Runnable{ private volatile static boolean flag = false; @Override private volatile static boolean flag = false; public void run() { System.out.println("waiting for data...."); while (!flag); System.out.println("cpying with data"); } //.... } /* output * waiting for data.... * false * ... * false * cpying with data */
為了感知其他執行緒中一些全局變數值的變化,而且避免頻繁去測試主記憶體中的數據變化,保證執行緒之間的可見性,可以使用volatile關鍵字去修飾全局變數,如下:
public void run() { System.out.println("waiting for data...."); /* Notice 如果在while循環里加上System.out.println(flag);語句,則不會使用本工作記憶體的flag數據, 而是重新去主記憶體載入數據 */ while (!flag){ System.out.println(flag); //測試,可以做到執行緒的可見性 } System.out.println("cpying with data"); } /* output * waiting for data.... * false * ... * false * cpying with data */
volatile關鍵字藉助MESI一致性協議,會在工作記憶體(CPU的暫存器等)與主記憶體連接的匯流排上建立一道匯流排嗅探機制,一旦發現其他執行緒修改了主記憶體中的某個全局變數(即圖中橙灰色線條讀取的數據以及寫回的數據),就會讓其他工作執行緒中從主記憶體拷貝出來的副本變數失效(即圖中紫色的線條讀取的數據),從而會使左邊的執行緒重新去讀取數據(即圖中紅色的線條讀取的數據)。如下圖:
雖然解決了原子性問題,可是volatile關鍵字不支援原子性操作,如下程式:
public class Thread5 implements Runnable { private static volatile int count = 0; @Override public void run() { for (int i = 0; i < 10000; i++) { count++; } } public static void main(String[] args) throws InterruptedException { ExecutorService es = Executors.newFixedThreadPool(10); for (int i = 0; i < 20; i++) { es.execute(new Thread5()); } es.shutdown(); //不允許添加執行緒,非同步關閉連接池 es.awaitTermination(10L, TimeUnit.SECONDS); //等待連接池的執行緒任務完成 System.out.println(count); } } /* output * 175630 */
針對原子性問題,我們可以使用熟悉的synchronized關鍵字,synchronized關鍵字最主要有以下3種應用方式:
-
修飾實例方法,作用於當前實例加鎖,進入同步程式碼前要獲得當前實例的鎖
-
-
修飾程式碼塊,指定加鎖對象,對給定對象加鎖,進入同步程式碼庫前要獲得給定對象的鎖。
部分示例程式碼如下:
public class Thread5 implements Runnable { private static int count = 0; public synchronized static void add() { count++; } @Override public void run() { for (int i = 0; i < 1000000; i++) { // add() synchronized (Thread5.class){ count++; } } } public static void main(String[] args) throws InterruptedException { ExecutorService es = Executors.newFixedThreadPool(10); for (int i = 0; i < 20; i++) { es.execute(new Thread5()); } es.shutdown(); //不允許添加執行緒,非同步關閉連接池 es.awaitTermination(10L, TimeUnit.SECONDS); //等待連接池的執行緒任務完成 System.out.println(count); } } /* output * 20000000 */
針對上面的問題,java中可以使用Atomic,它的包名為java.util.concurrent.atomic
。這個包裡面提供了一組原子變數的操作類(通過值加版本號的方式去解決ABA問題),這些類可以保證在多執行緒環境下,當某個執行緒在執行atomic的方法時,不會被其他執行緒打斷,一直等到該方法執行完成(具體的API文檔可以查看參考文獻第5點)。
public class ThreadSafe implements Runnable { private static AtomicInteger count = new AtomicInteger(0); @Override public void run() { for (int i = 0; i < 10000; i++) { count.getAndAdd(1); } } public static void main(String[] args) throws InterruptedException { ExecutorService es = Executors.newFixedThreadPool(10); for (int i = 0; i < 20; i++) { es.execute(new ThreadSafe()); } es.shutdown(); //不允許添加執行緒,非同步關閉連接池 es.awaitTermination(10L, TimeUnit.SECONDS); //等待連接池的執行緒任務完成 System.out.println(count); } } /* output * 200000 */
5.
a. 自旋鎖
執行緒循環反覆檢查鎖變數是否可用,在這一過程中執行緒一直保持執行(RUNNABLE),因此是一種忙等待,不像關鍵字synchronized一樣,一旦發現不能訪問,則處於執行緒處於阻塞狀態(BLOCKED)。
public class Thread6 implements Runnable{ private static final Lock lock = new ReentrantLock(); private volatile static int count = 0; @Override public void run() { for (int i = 0; i < 1000000; i++){ lock.lock(); count++; lock.unlock(); } } static void test(ExecutorService es) throws InterruptedException { for (int i = 0; i < 20; i++) { es.execute(new Thread6()); } es.shutdown(); //不允許添加執行緒,非同步關閉連接池 es.awaitTermination(10L, TimeUnit.SECONDS); //等待連接池的執行緒任務完成 System.out.println(count); } public static void main(String[] args) throws InterruptedException { ExecutorService es = Executors.newFixedThreadPool(20); test(es); } }
如果在使用lock的時候包含了try…catch…語句,要注意的是lock 必須在 finally 塊中釋放。否則,如果受保護的程式碼將拋出異常,鎖就有可能永遠得不到釋放!
與Lock類同一個包java.util.concurrent.locks
下還有一種讀寫分離的鎖ReentrantReadWriteLock類,讀寫鎖維護了一對鎖,一個讀鎖和一個寫鎖。一般情況下,讀寫鎖的性能都會比排它鎖好,因為大多數場景讀是多於寫的。在讀多於寫的情況下,讀寫鎖能夠提供比排它鎖更好的並發性和吞吐量。
public class RWTest { private static final Map<String, Object> map = new HashMap<String, Object>(); private static final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private static final Lock readLock = lock.readLock(); private static final Lock writeLock = lock.writeLock(); public static final Object get(String key) { readLock.lock(); try { return map.get(key); } finally { readLock.unlock(); } } public static final Object put(String key, Object value) { writeLock.lock(); try { return map.put(key, value); } finally { writeLock.unlock(); } } public static final void clear() { writeLock.lock(); try { map.clear(); } finally { writeLock.unlock(); } } }
public class Thread9 { public static ReentrantLock lock=new ReentrantLock(); public static Condition condition =lock.newCondition(); public static void main(String[] args) { new Thread(){ @Override public void run() { lock.lock();//請求鎖 try{ System.out.println(Thread.currentThread().getName()+"==》進入等待"); condition.await();//設置當前執行緒進入等待 }catch (InterruptedException e) { e.printStackTrace(); }finally{ lock.unlock();//釋放鎖 } System.out.println(Thread.currentThread().getName()+"==》繼續執行"); } }.start(); new Thread(){ @Override public void run() { lock.lock();//請求鎖 try{ System.out.println(Thread.currentThread().getName()+"=》進入"); Thread.sleep(2000);//休息2秒 condition.signal();//隨機喚醒等待隊列中的一個執行緒 System.out.println(Thread.currentThread().getName()+"休息結束"); }catch (InterruptedException e) { e.printStackTrace(); }finally{ lock.unlock();//釋放鎖 } } }.start(); } } /*output *Thread-0==》進入等待 *Thread-1=》進入 *Thread-1休息結束 *Thread-0==》繼續執行 */
在關鍵字synchronized的執行緒同步機制,調用執行緒的sleep,yield方法時,執行緒並不會讓出對象鎖,但是調用wait卻不同,執行緒自動釋放其佔有的對象鎖,同時不會去申請對象鎖,當執行緒被喚醒的時候,它才再次去申請競爭對象的鎖(該關鍵字通常只與synchronized結合使用)。notify()喚醒在等待該對象同步鎖的執行緒(只喚醒一個,如果有多個在等待),注意的是在調用此方法的時候,並不能確切的喚醒某一個等待狀態的執行緒,而是由JVM確定喚醒哪個執行緒,而且不是按優先順序。而notifyAll()則是喚醒所有等待的執行緒。
public class Thread8 implements Runnable { private int num; private Object lock; public Thread8(int num, Object lock) { this.num = num; this.lock = lock; } public void run() { try { while (true) { synchronized (lock) { lock.notifyAll(); lock.wait(); System.out.println(num); } } } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { final Object lock = new Object(); Thread thread1 = new Thread(new Thread8(1, lock)); Thread thread2 = new Thread(new Thread8(2, lock)); thread1.start(); thread2.start(); } } /* output * 交替輸出1,2,1,2,1,2...... */
6.
public class Thread10 { public static void main(String[] args) throws InterruptedException { int count = 20; final CountDownLatch cdl = new CountDownLatch(count); ExecutorService es = Executors.newCachedThreadPool(); for (int i = 0; i < count; i++) { es.execute(new Runnable() { @Override public void run() { try { System.out.println(cdl.getCount()); }finally { cdl.countDown(); } } }); } cdl.await(); es.shutdown(); System.out.println("主執行緒現在才結束: count = "+cdl.getCount()); } }
2.CyclicBarrier
即迴環柵欄,是一種可重用的執行緒阻塞器,它將率先到達柵欄的這些執行緒阻塞(調用await()方法),直到指定數量的執行緒都到達該處,這些執行緒將會被全部釋放。
public class Thread11 implements Runnable{ private int num; private static CyclicBarrier cb = new CyclicBarrier(6); //指定柵欄的等待執行緒數 public Thread11(int num){ this.num = num; } @Override public void run() { try { Thread.sleep(1000*num); //等待指定數量時間後到達柵欄處 System.out.println(Thread.currentThread().getName() +" is coming.."); cb.await(10L, TimeUnit.SECONDS); System.out.println("continue...."); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { ExecutorService es = Executors.newCachedThreadPool(); for (int i = 0; i < 8; i++) { es.execute(new Thread11(i)); } es.shutdown(); } } /* *pool-1-thread-1 is coming.. *pool-1-thread-2 is coming.. *pool-1-thread-3 is coming.. *continue.... *continue.... *continue.... *pool-1-thread-4 is coming.. *超時異常錯誤(指定時間內執行緒數量仍然到達) */
3.Semaphore訊號量
訊號量用於保護對一個或多個共享資源的訪問,其內部維護一個計數器,用來只是當前可以訪問共享資源的數量。可以通過tryAcquire去嘗試獲取許可,還可以通過availablePermits()方法得到可用的許可數目,而acquire/release則是獲取/釋放許可。
public class Thread12 implements Runnable { private static SecureRandom random= new SecureRandom(); private static Semaphore semaphore = new Semaphore(3, true); @Override public void run() { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName() + " got permission..."); Thread.sleep(random.nextInt(10000)); semaphore.release(); System.out.println(Thread.currentThread().getName() + " released permission..."); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { ExecutorService es = Executors.newCachedThreadPool(); for (int i = 0; i < 6; i++) { es.execute(new Thread12()); } es.shutdown(); } }
4.fork/join框架
Fork/Join使用兩個類:
ForkJoinTask:我們要使用ForkJoin框架,必須首先創建一個ForkJoin任務。它提供在任務中執行fork()和join()操作的機制,Fork/Join框架提供了以下兩個子類:
RecursiveAction:用於沒有返回結果的任務。
RecursiveTask :用於有返回結果的任務。
ForkJoinPool :ForkJoinTask需要通過ForkJoinPool來執行,任務分割出的子任務會添加到當前工作執行緒所維護的雙端隊列中。
ForkJoinPool與其他類型的ExecutorService的不同之處主要在於使用工作竊取,每個執行緒都有自己的雙端任務隊列,執行緒在一般情況下會從隊列頭去獲取任務,當某個執行緒任務隊列的為空的時候,它會嘗試從其他執行緒的任務隊列的尾部去“竊取”任務來執行。
public class Thread13 extends RecursiveTask<Integer> { private int start; private int end; public Thread13(int start, int end) { this.start = start; this.end = end; } @Override protected Integer compute() { int m = 1000; //每個執行緒計算的範圍大小 int s = start, n = end; //每個執行緒計算的起始地址 int r = 0; //算和的變數 List<ForkJoinTask<Integer>> it = new ArrayList<ForkJoinTask<Integer>>(); while (s <= n) { if (n - s < m) { for (int i = s; i <= n; i++) { r += i; } } else { n = Math.min(s + m - 1, n); //得到一個新的start it.add(new Thread13(s, n).fork()); //得到每一個範圍[如(0,999)]加入一個執行緒 } s = n + 1; n = end; } for (ForkJoinTask<Integer> t : it) { r += t.join(); } return r; } public static void main(String[] args) throws ExecutionException, InterruptedException { ForkJoinPool fjp = new ForkJoinPool(); int s = 1, n = 10001; Future<Integer> rs = fjp.submit(new Thread13(s, n)); System.out.println(rs.get()); } } /* output * 50015001 */
-
龐永華. Java多執行緒與Socket:實戰微服務框架[M].電子工業出版社.2019.3