多執行緒編程(2)—執行緒安全

  • 2019 年 10 月 23 日
  • 筆記

1. 問題的引出 

 執行緒安全歸根結底可以說是記憶體安全,在jvm記憶體模型中,有一塊特殊的公共記憶體空間,稱為堆記憶體,進程內的所有執行緒都可以訪問並修改其中的數據,就會造成潛在的問題。因為堆記憶體空間在沒有保護機制的情況下,你放進去的數據,可能被別的執行緒篡改。如下程式碼:

public class ThreadSafe implements Runnable {      private static int count = 0;      @Override      public void run() {          for (int i = 0; i < 1000; i++) {              count++;          }      }      public static void main(String[] args) throws InterruptedException {          ExecutorService es = Executors.newFixedThreadPool(10);          for (int i = 0; i < 20; i++) {              es.execute(new ThreadSafe());          }          es.shutdown();  //不允許添加執行緒到執行緒池,非同步關閉連接池          es.awaitTermination(10L, TimeUnit.SECONDS); //等待連接池的執行緒任務完成          System.out.println(count);      }  }  

 本來期望的值是20000,可是最終輸出的結果卻一點在變化,其值總是小於等於20000,顯然這是由於執行緒不安全造成的,多個執行緒並發的去訪問全局變數、靜態變數、文件、設備、套接字等都可能出現這種問題。


2. 執行緒同步的措施

 為了協調和配合執行緒之間對共享資源的訪問,通常有四種方式:

   1. 臨界區:訪問某一段臨界資源的程式碼片段,與共享資源類似,但有一點不同的是,某一時刻只允許一個執行緒去訪問(對應java中的關鍵字 synchronized包含的程式碼)。

  2. 互斥量:互斥量是一個對象,只有都擁有互斥量的對象才可以訪問共享資源。而且互斥量中只有一個,通常互斥量的實現是通過鎖來實現的,而且加鎖操作和釋放操作只能由同一個執行緒來完成。此處與臨界區的區別是一段程式碼,通常存在與一個文件中,而互斥量是一個對象,加鎖操作和解鎖操作可以在不同的文件去編寫,只要是同一個執行緒就好。

  3. 訊號量: 訊號量可以允許指定數量的執行緒同一時刻去訪問共享資源,當執行緒數達到了閾值後,將阻止其他執行緒的訪問,最常見的比如生產者和消費者問題。訊號量和互斥量的區別則是訊號量的發出和釋放可以由不同執行緒來完成

  4. 事件:通過發送通知的形式來實現執行緒同步,可以實現不同進程中的執行緒同步操作


 3.飢餓與死鎖

  飢餓:某些執行緒或進程由於長期得不到資源,而總是處於就緒或者阻塞狀態。例如:

  ①. 該進程或執行緒所擁有的CPU時間片被其他執行緒搶佔而得不到執行(通常是優先順序比它高的執行緒或進程),一直處於就緒狀態。

  ②. 由於選用不恰當的調度演算法,導致該進程或執行緒長期無法得到CPU時間片,處於就緒狀態。

  ③. 由於喚醒的時間把握不對,喚醒執行緒時,所需的資源處於被鎖定狀態,導致執行緒回到阻塞狀態。

  死鎖:兩個或多個執行緒在執行過程中,由於相互佔有對方所需的資源而又互不相讓從而造成這些執行緒都被阻塞,若無外力的作用下,他們都將無法執行下去。例如

  ①. 進程推進順序不合適。互相佔有彼此需要的資源,同時請求對方佔有的資源,形成循環依賴的關係。

  ②. 資源不足。

  ③. 進程運行推進順序與速度不同,也可能產生死鎖。

  一些避免死鎖的措施:

  1. 不要在鎖區域內在加把鎖,即不要在釋放鎖之前競爭其他鎖。

  2. 減小鎖粒度,即減小執行緒加鎖的範圍,真正需要的時候再去加鎖。

  3. 順序訪問共享資源。

  4. 設置超時機制,超過指定時間則程式返回錯誤。

  5. 競爭鎖期間,允許程式被中斷。


 4.程式碼層面解決執行緒安全

 解決執行緒安全主要考慮三方面:

  1. 可見性:當多個執行緒並發的讀寫某個共享資源的時候,每個執行緒總是可以取到該共享資源的最新數據。

  2. 原子性:某執行緒對一個或者多個共享資源所做的一連串寫操作不會被中斷,在此期間不會有其他執行緒同時對這些共享資源進行寫操作。

  3. 有序性:單個執行緒內的操作必須是有序的。

通常原子性都可以得到保證,問題的病端就出在可見性和原子性。

4.1 可見性的問題

  如下實常式序,按通常的理解來說,當主執行緒等待一秒後,把flag的值修改為true後,另外一個執行緒應該可以感知到,然後跳過while循環,直接列印出後面的數據,可是最結果卻一直卡在了while循環里。

public class Thread4 implements Runnable{      private static boolean flag = false;      @Override      public void run() {          System.out.println("waiting for data....");          while (!flag);          System.out.println("cpying with data");      }        public static void main(String[] args) throws InterruptedException {          Thread4 thread4 = new Thread4();          Thread t = new Thread(thread4);          t.start();          Thread.sleep(1000);          flag = true;      }  }  /* output   * waiting for data....   */  

  主要的原因是java程式在jvm上運行的時候,該程式所佔用的記憶體分為兩類主記憶體和工作記憶體(邏輯上的記憶體,實際上是cpu的暫存器和高速快取,因為,cpu在計算的時候,並不總是從記憶體讀取數據,它的數據讀取順序優先順序是:暫存器-高速快取-記憶體。CPU和記憶體之間通過匯流排進行)。也就是在主執行緒中啟動另一個執行緒t會開闢出一個新的工作記憶體,與主執行緒的工作記憶體相互獨立,且執行緒之間無法直接通訊,只能去主記憶體讀取全局變數,而執行緒t中做while判斷的時候並不會去讀取主記憶體的flag值,致使執行緒t無法被感知到flag在其他執行緒被改變,可以做一個測試,現在把run函數改成如下形式:  

public class Thread4 implements Runnable{      private volatile static boolean flag = false;      @Override      private volatile static boolean flag = false;  	public void run() {  		System.out.println("waiting for data....");          while (!flag);          System.out.println("cpying with data");      }      //....     }  /* output   * waiting for data....   * false   * ...   * false   * cpying with data   */

 為了感知其他執行緒中一些全局變數值的變化,而且避免頻繁去測試主記憶體中的數據變化,保證執行緒之間的可見性,可以使用volatile關鍵字去修飾全局變數,如下:

public void run() {          System.out.println("waiting for data....");          /*  Notice              如果在while循環里加上System.out.println(flag);語句,則不會使用本工作記憶體的flag數據,              而是重新去主記憶體載入數據           */          while (!flag){              System.out.println(flag);     //測試,可以做到執行緒的可見性          }          System.out.println("cpying with data");   }  /* output   * waiting for data....   * false   * ...   * false   * cpying with data   */  

volatile關鍵字藉助MESI一致性協議,會在工作記憶體(CPU的暫存器等)與主記憶體連接的匯流排上建立一道匯流排嗅探機制,一旦發現其他執行緒修改了主記憶體中的某個全局變數(即圖中橙灰色線條讀取的數據以及寫回的數據),就會讓其他工作執行緒中從主記憶體拷貝出來的副本變數失效(即圖中紫色的線條讀取的數據),從而會使左邊的執行緒重新去讀取數據(即圖中紅色的線條讀取的數據)。如下圖:

  雖然解決了原子性問題,可是volatile關鍵字不支援原子性操作,如下程式:

public class Thread5 implements Runnable {      private static volatile int count = 0;      @Override      public void run() {          for (int i = 0; i < 10000; i++) {              count++;          }      }      public static void main(String[] args) throws InterruptedException {          ExecutorService es = Executors.newFixedThreadPool(10);          for (int i = 0; i < 20; i++) {              es.execute(new Thread5());          }          es.shutdown();  //不允許添加執行緒,非同步關閉連接池          es.awaitTermination(10L, TimeUnit.SECONDS); //等待連接池的執行緒任務完成          System.out.println(count);      }  }  /* output   * 175630   */  

4.2 原子性問題

 針對原子性問題,我們可以使用熟悉的synchronized關鍵字,synchronized關鍵字最主要有以下3種應用方式:

  • 修飾實例方法,作用於當前實例加鎖,進入同步程式碼前要獲得當前實例的鎖

  • 修飾靜態方法,作用於當前類對象加鎖,進入同步程式碼前要獲得當前類對象的鎖

  • 修飾程式碼塊,指定加鎖對象,對給定對象加鎖,進入同步程式碼庫前要獲得給定對象的鎖。

部分示例程式碼如下:

public class Thread5 implements Runnable {      private static int count = 0;      public synchronized static void add() {          count++;      }      @Override      public void run() {          for (int i = 0; i < 1000000; i++) {  //          add()              synchronized (Thread5.class){                  count++;              }          }      }      public static void main(String[] args) throws InterruptedException {          ExecutorService es = Executors.newFixedThreadPool(10);          for (int i = 0; i < 20; i++) {              es.execute(new Thread5());          }          es.shutdown();  //不允許添加執行緒,非同步關閉連接池          es.awaitTermination(10L, TimeUnit.SECONDS); //等待連接池的執行緒任務完成          System.out.println(count);      }  }  /* output   * 20000000   */  

然而synchronized是一種悲觀鎖,具有強烈的獨佔和排他特性,它頻繁的加鎖和釋放鎖操作會使程式的效率低下。與悲觀鎖相對是一種樂觀鎖操作CAS(CompareAndSwap),樂觀鎖就是每次去取數據的時候都樂觀的認為數據不會被修改,因此這個過程不會上鎖,但是在更新的時候會判斷一下在此期間的數據有沒有更新,如果沒有更新則去修改,否則失敗。可是上面這種 操作會出現ABA(A-B-A,中途被改變,但最後又改回原值)的問題,

針對上面的問題,java中可以使用Atomic,它的包名為java.util.concurrent.atomic。這個包裡面提供了一組原子變數的操作類(通過值加版本號的方式去解決ABA問題),這些類可以保證在多執行緒環境下,當某個執行緒在執行atomic的方法時,不會被其他執行緒打斷,一直等到該方法執行完成(具體的API文檔可以查看參考文獻第5點)。

public class ThreadSafe implements Runnable {      private static AtomicInteger count = new AtomicInteger(0);      @Override      public void run() {          for (int i = 0; i < 10000; i++) {              count.getAndAdd(1);          }      }      public static void main(String[] args) throws InterruptedException {          ExecutorService es = Executors.newFixedThreadPool(10);          for (int i = 0; i < 20; i++) {              es.execute(new ThreadSafe());          }          es.shutdown();  //不允許添加執行緒,非同步關閉連接池          es.awaitTermination(10L, TimeUnit.SECONDS); //等待連接池的執行緒任務完成          System.out.println(count);      }  }  /* output   * 200000   */  


5. 其他方法解決執行緒同步

a. 自旋鎖

 執行緒循環反覆檢查變數是否可用,在這一過程中執行緒一直保持執行(RUNNABLE),因此是一種忙等待,不像關鍵字synchronized一樣,一旦發現不能訪問,則處於執行緒處於阻塞狀態(BLOCKED)。

public class Thread6 implements Runnable{      private static  final Lock lock = new ReentrantLock();      private volatile static int count = 0;      @Override      public void run() {          for (int i = 0; i < 1000000; i++){              lock.lock();              count++;              lock.unlock();          }      }      static void test(ExecutorService es) throws InterruptedException {          for (int i = 0; i < 20; i++) {              es.execute(new Thread6());          }          es.shutdown();  //不允許添加執行緒,非同步關閉連接池          es.awaitTermination(10L, TimeUnit.SECONDS); //等待連接池的執行緒任務完成          System.out.println(count);      }      public static void main(String[] args) throws InterruptedException {          ExecutorService es = Executors.newFixedThreadPool(20);          test(es);      }  }  

 如果在使用lock的時候包含了try…catch…語句,要注意的是lock 必須在 finally 塊中釋放。否則,如果受保護的程式碼將拋出異常,鎖就有可能永遠得不到釋放!

 與Lock類同一個包java.util.concurrent.locks下還有一種讀寫分離的鎖ReentrantReadWriteLock類,讀寫鎖維護了一對鎖,一個讀鎖和一個寫鎖。一般情況下,讀寫鎖的性能都會比排它鎖好,因為大多數場景讀是多於寫的。在讀多於寫的情況下,讀寫鎖能夠提供比排它鎖更好的並發性和吞吐量。

  

public class RWTest {      private static final Map<String, Object> map = new HashMap<String, Object>();      private static final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();      private static final Lock readLock = lock.readLock();      private static final Lock writeLock = lock.writeLock();        public static final Object get(String key) {          readLock.lock();          try {              return map.get(key);          } finally {              readLock.unlock();          }      }        public static final Object put(String key, Object value) {          writeLock.lock();          try {              return map.put(key, value);          } finally {              writeLock.unlock();          }      }        public static final void clear() {          writeLock.lock();          try {              map.clear();          } finally {              writeLock.unlock();          }      }  }  

 利用自旋鎖Lock也提供了Condition來實現執行緒間的狀態通知的,可以根據實際情況去喚醒某個執行緒(與後面的wait不同,是隨機的)或者所有執行緒。可以通過lock.newCondition()來獲取得Condition實例,可以根據實際需求創建多個實例。

public class Thread9 {      public static ReentrantLock lock=new ReentrantLock();      public static Condition condition =lock.newCondition();      public static void main(String[] args) {          new Thread(){              @Override              public void run() {                  lock.lock();//請求鎖                  try{                      System.out.println(Thread.currentThread().getName()+"==》進入等待");                      condition.await();//設置當前執行緒進入等待                  }catch (InterruptedException e) {                      e.printStackTrace();                  }finally{                      lock.unlock();//釋放鎖                  }                  System.out.println(Thread.currentThread().getName()+"==》繼續執行");              }          }.start();          new Thread(){              @Override              public void run() {                  lock.lock();//請求鎖                  try{                      System.out.println(Thread.currentThread().getName()+"=》進入");                      Thread.sleep(2000);//休息2秒                      condition.signal();//隨機喚醒等待隊列中的一個執行緒                      System.out.println(Thread.currentThread().getName()+"休息結束");                  }catch (InterruptedException e) {                      e.printStackTrace();                  }finally{                      lock.unlock();//釋放鎖                  }              }          }.start();      }  }  /*output   *Thread-0==》進入等待   *Thread-1=》進入   *Thread-1休息結束   *Thread-0==》繼續執行   */  

b. wait.notify.notifyAll

 在關鍵字synchronized的執行緒同步機制,調用執行緒的sleep,yield方法時,執行緒並不會讓出對象鎖,但是調用wait卻不同,執行緒自動釋放其佔有的對象鎖,同時不會去申請對象鎖,當執行緒被喚醒的時候,它才再次去申請競爭對象的鎖(該關鍵字通常只與synchronized結合使用)。notify()喚醒在等待該對象同步鎖的執行緒(只喚醒一個,如果有多個在等待),注意的是在調用此方法的時候,並不能確切的喚醒某一個等待狀態的執行緒,而是由JVM確定喚醒哪個執行緒,而且不是按優先順序。而notifyAll()則是喚醒所有等待的執行緒。

public class Thread8 implements Runnable {        private int num;      private Object lock;        public Thread8(int num, Object lock) {          this.num = num;          this.lock = lock;      }        public void run() {          try {              while (true) {                  synchronized (lock) {                      lock.notifyAll();                      lock.wait();                      System.out.println(num);                  }              }          } catch (InterruptedException e) {              e.printStackTrace();          }      }        public static void main(String[] args) {          final Object lock = new Object();          Thread thread1 = new Thread(new Thread8(1, lock));          Thread thread2 = new Thread(new Thread8(2, lock));          thread1.start();          thread2.start();      }  }  /* output   * 交替輸出1,2,1,2,1,2......   */  


6.並發編程—CountDownLatch、CyclicBarrier、Semaphore和fork/join框架

1. CountDownLatch

  CountDownLatch實現的是一個倒序計數器,可以通過調用它的countDown實現計數器減一和await方法來阻塞當前執行緒:

public class Thread10 {      public static void main(String[] args) throws InterruptedException {          int count = 20;          final CountDownLatch cdl = new CountDownLatch(count);          ExecutorService es = Executors.newCachedThreadPool();            for (int i = 0; i < count; i++) {              es.execute(new Runnable() {                  @Override                  public void run() {                      try {                          System.out.println(cdl.getCount());                      }finally {                          cdl.countDown();                      }                  }              });          }          cdl.await();          es.shutdown();          System.out.println("主執行緒現在才結束: count = "+cdl.getCount());      }  }  

2.CyclicBarrier

  即迴環柵欄,是一種可重用的執行緒阻塞器,它將率先到達柵欄的這些執行緒阻塞(調用await()方法),直到指定數量的執行緒都到達該處,這些執行緒將會被全部釋放。

public class Thread11 implements Runnable{      private int num;      private static CyclicBarrier cb = new CyclicBarrier(6); //指定柵欄的等待執行緒數      public Thread11(int num){          this.num = num;      }      @Override      public void run() {          try {              Thread.sleep(1000*num);     //等待指定數量時間後到達柵欄處              System.out.println(Thread.currentThread().getName() +" is coming..");              cb.await(10L, TimeUnit.SECONDS);              System.out.println("continue....");          } catch (Exception e) {              e.printStackTrace();          }      }      public static void main(String[] args) {          ExecutorService es = Executors.newCachedThreadPool();          for (int i = 0; i < 8; i++) {              es.execute(new Thread11(i));          }          es.shutdown();      }  }  /*   *pool-1-thread-1 is coming..   *pool-1-thread-2 is coming..   *pool-1-thread-3 is coming..   *continue....   *continue....   *continue....   *pool-1-thread-4 is coming..   *超時異常錯誤(指定時間內執行緒數量仍然到達)   */  

3.Semaphore訊號量

  訊號量用於保護對一個或多個共享資源的訪問,其內部維護一個計數器,用來只是當前可以訪問共享資源的數量。可以通過tryAcquire去嘗試獲取許可,還可以通過availablePermits()方法得到可用的許可數目,而acquire/release則是獲取/釋放許可。

public class Thread12 implements Runnable {      private static SecureRandom random= new SecureRandom();      private static Semaphore semaphore = new Semaphore(3, true);        @Override      public void run() {          try {              semaphore.acquire();              System.out.println(Thread.currentThread().getName() + " got permission...");              Thread.sleep(random.nextInt(10000));              semaphore.release();              System.out.println(Thread.currentThread().getName() + " released permission...");          } catch (InterruptedException e) {              e.printStackTrace();          }      }        public static void main(String[] args) {          ExecutorService es = Executors.newCachedThreadPool();          for (int i = 0; i < 6; i++) {              es.execute(new Thread12());          }          es.shutdown();      }  }  

4.fork/join框架

 Fork/Join框架提供了的一個用於並行執行任務的框架,充分利用了CPU資源,把大任務分割成若干個小任務,最終匯總每個小任務結果後得到大任務結果的框架。(只供Java7使用)

Fork/Join使用兩個類:

  • ForkJoinTask:我們要使用ForkJoin框架,必須首先創建一個ForkJoin任務。它提供在任務中執行fork()和join()操作的機制,Fork/Join框架提供了以下兩個子類:

    • RecursiveAction:用於沒有返回結果的任務。

    • RecursiveTask :用於有返回結果的任務。

  • ForkJoinPool :ForkJoinTask需要通過ForkJoinPool來執行,任務分割出的子任務會添加到當前工作執行緒所維護的雙端隊列中。

ForkJoinPool與其他類型的ExecutorService的不同之處主要在於使用工作竊取,每個執行緒都有自己的雙端任務隊列,執行緒在一般情況下會從隊列頭去獲取任務,當某個執行緒任務隊列的為空的時候,它會嘗試從其他執行緒的任務隊列的尾部去“竊取”任務來執行。

public class Thread13 extends RecursiveTask<Integer> {      private int start;      private int end;        public Thread13(int start, int end) {          this.start = start;          this.end = end;      }        @Override      protected Integer compute() {          int m = 1000;   //每個執行緒計算的範圍大小          int s = start, n = end;  //每個執行緒計算的起始地址          int r = 0;  //算和的變數          List<ForkJoinTask<Integer>> it = new ArrayList<ForkJoinTask<Integer>>();          while (s <= n) {              if (n - s < m) {                  for (int i = s; i <= n; i++) {                      r += i;                  }              } else {                  n = Math.min(s + m - 1, n);        //得到一個新的start                  it.add(new Thread13(s, n).fork());  //得到每一個範圍[如(0,999)]加入一個執行緒              }              s = n + 1;              n = end;          }            for (ForkJoinTask<Integer> t : it) {              r += t.join();          }          return r;      }        public static void main(String[] args) throws ExecutionException, InterruptedException {          ForkJoinPool fjp = new ForkJoinPool();          int s = 1, n = 10001;          Future<Integer> rs = fjp.submit(new Thread13(s, n));          System.out.println(rs.get());      }  }  /* output   * 50015001   */  

  


 

參考文獻

  1. 龐永華. Java多執行緒與Socket:實戰微服務框架[M].電子工業出版社.2019.3

  2. Executors類中創建執行緒池的幾種方法的分析

  3. 知乎——如果你這樣回答“什麼是執行緒安全”,面試官都會對你刮目相看

  4. 知乎——Java執行緒記憶體模型,執行緒、工作記憶體、主記憶體

  5. Java進階——Java中的Atomic原子特性

  6. 深入理解Java並發之synchronized實現原理

  7. Java的wait(), notify()和notifyAll()使用小結

  8. java多執行緒-07-Lock和Condition

  9. Java並發編程:CountDownLatch、CyclicBarrier和Semaphor