聊聊並發(一)——初始JUC

一、volatile

1、介紹

  JDK 5.0 提供了java.util.concurrent包,在此包中增加了並發編程中很常用的使用工具類,用於定義類似於線程的自定義子系統,包括線程池、異步IO和輕量級任務框架。提供可調的、靈活的線程池。還提供了設計用於多線程上下文的Collection實現等。

2、內存可見性

  內存可見性是指當某個線程正在使用對象狀態而另一線程在同時修改該狀態,需要確保當一個線程修改了對象狀態後,其他線程能夠看到發生的狀態變化。
  可見性錯誤是指當讀操作與寫操作在不同的線程中執行時,我們無法確保執行讀操作的線程能實時的看到其他線程寫入之後的值,有時甚至是根本不可能的事情。
  我們可以通過同步來保證對象被安全的發佈。除此之外我們也可以使用一種更加輕量級的volatile變量。
  內存可見性問題:當多個線程同時操作共享數據時,對共享數據的操作彼此是不可見的。
  代碼示例:內存可見性問題

 1 public class VolatileDemo implements Runnable {
 2 
 3     private boolean flag = false;
 4 
 5     @Override
 6     public void run() {
 7 
 8         try {
 9             Thread.sleep(200);
10         } catch (InterruptedException e) {
11         }
12 
13         flag = true;
14         System.out.println("flag = " + isFlag());
15     }
16 
17     public boolean isFlag() {
18         return flag;
19     }
20 
21     public void setFlag(boolean flag) {
22         this.flag = flag;
23     }
24 
25 }
 1 // 測試類
 2 public class Main {
 3     public static void main(String[] args) {
 4         VolatileDemo demo = new VolatileDemo();
 5         new Thread(demo).start();
 6 
 7         while (true) {
 8             if (demo.isFlag()) {
 9                 System.out.println("this is main");
10                 break;
11             }
12         }
13     }
14 }
15 
16 // 結果1
17 flag = true
18 this is main
19 // 程序結束
20 
21 // 結果2
22 flag = true
23 // 程序死循環

  問題:結果1不難理解,當線程執行完畢後,主線程才開始執行while。為什麼結果2是死循環呢?
  原因:JVM為每一個執行任務的線程,它都會分配一個獨立的工作內存用於提高效率。每次都會從主存中讀取變量的副本到各自的工作內存中,修改後,再寫回主存中。
  那麼,不難理解結果2:主線程從主存讀取flag = false,因為用的while循環,while屬於底層的東西,執行速度非常快,沒有再讀主存的機會,一直讀取的是自己的工作內存(flag = false)。而當線程1讀到flag並修改為true,回寫到主存時,主線程並不知道,所以死循環。

  解決:知道問題原因了,如何解決呢?
  代碼示例:方式一、加鎖

 1 // 方式一
 2 public class Main {
 3     public static void main(String[] args) {
 4         VolatileDemo demo = new VolatileDemo();
 5         new Thread(demo).start();
 6 
 7         while (true) {
 8             synchronized (demo) {
 9                 if (demo.isFlag()) {
10                     System.out.println("this is main");
11                     break;
12                 }
13             }
14         }
15     }
16 }

  分析:synchronize加鎖可以解決。加了鎖,就可以讓while循環每次都從主存中去讀取數據,這樣就能讀取到true了。但是加鎖效率極低。每次只能有一個線程訪問,當一個線程持有鎖時,其他線程就會阻塞,效率就非常低了。不想加鎖,又要解決內存可見性問題,那麼就可以使用volatile關鍵字。
  代碼示例:方式二、用volatile修飾

1 private volatile boolean flag = false;

3、volatile關鍵字

  Java提供了一種稍弱的同步機制——volatile關鍵字,當多個線程訪問共享數據時,可以保證內存可見性,即內存中的數據可見。用這個關鍵字修飾共享數據,就會及時的把線程工作內存中的數據刷新到主存中去,也可以理解為,就是直接操作主存中的數據。
  可以將volatile看做一個輕量級的鎖,相較於synchronized是一種輕量級的同步策略。與鎖(synchronize)的區別:
  volatile不具備互斥性。即一個線程訪問共享數據,另一個線程依然可以訪問。所有的訪問都在主存中完成,保證內存可見性。
  synchronized具備互斥性。即一個線程搶到鎖,另一個線程進不來,必須等待。
  volatile不能保證變量的原子性。

二、i++問題

1、原子性

  所謂原子性就是一組操作不可再細分。要麼全都做,要麼全都不做。前面提到volatile不能保證變量的原子性,具體表現如下:
  代碼示例:原子性問題

 1 public class AtomicDemo implements Runnable {
 2 
 3     // 此時是加了volatile語義的
 4     private volatile int i = 0;
 5 
 6     @Override
 7     public void run() {
 8 
 9         try {
10             Thread.sleep(1000);
11         } catch (InterruptedException e) {
12         }
13 
14         System.out.println(getI());
15     }
16 
17     public int getI() {
18         return i++;
19     }
20 
21 }
 1 // 測試類
 2 public class Main {
 3     public static void main(String[] args) {
 4         AtomicDemo atomicDemo = new AtomicDemo();
 5 
 6         // 開啟 10 個線程對共享數據進行自增後打印。
 7         for (int i = 0; i < 10; i++) {
 8             new Thread(atomicDemo).start();
 9         }
10     }
11 }
12 
13 // 可能的一種結果
14 0
15 5
16 4
17 3
18 2
19 1
20 0
21 6
22 6
23 7

  問題:期望結果應該每個線程對 i 自增一次,最終 i 的值為10。實際結果如上(有重複數據)。
  原因:i++操作不是一個原子性操作,實際分為讀改寫三步,如下:

  int temp = i; // 從主存中讀
  i = i + 1; // cpu 對 i 進行+1運算
  i = temp; // 寫回主存

  而volatile不能保證變量的原子性。volatile,只是相當於所有線程都是在主存中操作數據而已,並不具備互斥性。比如兩個線程同時讀取主存中的0,然後又同時自增,同時寫入主存,結果還是會出現重複數據。volatile的不具備互斥性也導致了它不具備原子性。
  解決:知道問題原因了,如何解決呢?
  代碼示例:方式一、加鎖

1 public synchronized int getI() {
2     return i++;
3 }

  代碼示例:方式二、原子變量

1 private AtomicInteger i = new AtomicInteger();
2 public int getI() {
3     return i.getAndIncrement();
4 }

2、原子變量

  JDK 1.5 以後java.util.concurrent.atomic包下提供了常用的原子變量。這些原子變量具備以下特點:volatile的內存可見性;CAS算法保證數據的原子性。
  類的小工具包,支持在單個變量上解除鎖的線程安全編程。事實上,此包中的類可將volatile值、字段和數組元素的概念擴展到那些也提供原子條件更新操作的類。
  類AtomicBoolean、AtomicInteger、AtomicLong和AtomicReference的實例各自提供對相應類型單個變量的訪問和更新。每個類也為該類型提供適當的實用工具方法。
  AtomicIntegerArray、AtomicLongArray和AtomicReferenceArray類進一步擴展了原子操作,對這些類型的數組提供了支持。這些類在為其數組元素提供volatile訪問語義方面也引人注目,這對於普通數組來說是不受支持的。
  核心方法:boolean compareAndSet(int expectedValue, int updateValue)
  java.util.concurrent.atomic包下提供了一些原子操作的常用類:

  AtomicBoolean、AtomicInteger、AtomicLong、AtomicReference<V>
  AtomicIntegerArray、AtomicLongArray
  AtomicMarkableReference<V>
  AtomicReferenceArray<E>
  AtomicStampedReference<V>

3、CAS算法

  CAS(Compare and Swap)是一種硬件對並發的支持,針對多處理器操作而設計的處理器中的一種特殊指令,用於管理對共享數據的並發訪問,是硬件對於並發操作共享數據的支持。
  CAS是一種無鎖的非阻塞算法的實現。不存在上下文切換的問題。
  CAS包含了3個操作數:內存值V,比較值A,更新值B。當且僅當V == A時,V = B,否則不執行任何操作。
  CAS算法:當多個線程並發的對主存中的數據進行修改的時候。有且只有一個線程會成功,其他的都會失敗。(同時操作,只是會失敗而已,並不會被鎖之類的)。
  CAS比普通同步鎖效率高,原因:CAS算法當這一次不成功的時候,它下一次不會阻塞,也就是它不會放棄CPU的執行權,它可以立即再次嘗試,再去更新。
  代碼示例:模擬CAS算法

 1 // 模擬CAS算法
 2 public class CompareAndSwap {
 3     private int value;
 4 
 5     // 獲取內存值
 6     public synchronized int get() {
 7         return value;
 8     }
 9 
10     // 比較
11     public synchronized int compareAndSwap(int expectedValue, int newValue) {
12         int oldValue = value;
13 
14         if (oldValue == expectedValue) {
15             this.value = newValue;
16         }
17 
18         return oldValue;
19     }
20 
21     // 設置
22     public synchronized boolean compareAndSet(int expectedValue, int newValue) {
23         return expectedValue == compareAndSwap(expectedValue, newValue);
24     }
25 }
 1 // 測試類
 2 public class Main {
 3     public static void main(String[] args) {
 4         final CompareAndSwap cas = new CompareAndSwap();
 5 
 6         for (int i = 0; i < 10; i++) {
 7             new Thread(() -> {
 8 
 9                 int expectedValue = cas.get();
10                 boolean b = cas.compareAndSet(expectedValue, (int) (Math.random() * 101));
11 
12                 System.out.println(b);
13 
14             }).start();
15         }
16  
17     }
18 }

三、鎖分段機制

1、concurrentHashMap

  JDK 1.5之後,在java.util.concurrent包中提供了多種並發容器類來改進同步容器類的性能。其中最主要的就是ConcurrentHashMap,採用”鎖分段”機制。
  HashMap是線程不安全的;Hashtable 加了鎖,是線程安全的,因此它效率低。Hashtable 加鎖就是將整個hash表鎖起來,當有多個線程訪問時,同一時間只能有一個線程訪問,並行變成串行,因此效率低。
  ConcurrentHashMap是一個線程安全的hash表。對於多線程的操作,介於 HashMap 與 Hashtable 之間。內部採用”鎖分段”機制替代 Hashtable 的獨佔鎖,進而提高性能。

  每個段都是一個獨立的鎖。JDK 1.8 以後concurrentHashMap的鎖分段被取消了。採用的是CAS算法。
  此包還提供了設計用於多線程上下文中的 Collection 實現:

  ConcurrentHashMap
  ConcurrentSkipListMap
  ConcurrentSkipListSet
  CopyOnWriteArrayList
  CopyOnWriteArraySet

  當期望多線程訪問一個給定 collection 時,ConcurrentHashMap 通常優於同步的 HashMap,ConcurrentSkipListMap 通常優於同步的 TreeMap。當期望的讀數和遍歷遠遠大於列表的更新數時,CopyOnWriteArrayList 優於同步的 ArrayList。

2、CopyOnWriteArrayList

  代碼示例:CopyOnWriteArrayList

 1 // 不寫注釋也能看懂的代碼
 2 public class CopyOnWriteArrayListDemo implements Runnable {
 3 
 4     private static final List<String> list = Collections.synchronizedList(new ArrayList<>());
 5     //private static final CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
 6 
 7     static {
 8         list.add("AA");
 9         list.add("BB");
10         list.add("CC");
11     }
12 
13     @Override
14     public void run() {
15         Iterator<String> it = list.iterator();
16         while (it.hasNext()) {
17             System.out.println(it.next());
18 
19             list.add("AA");
20         }
21     }
22 }
 1 // 測試類
 2 public class Main {
 3     public static void main(String[] args) {
 4         CopyOnWriteArrayListDemo ht = new CopyOnWriteArrayListDemo();
 5 
 6         new Thread(ht).start();
 7     }
 8 }
 9 
10 // 結果(有並發修改異常)
11 AA
12 Exception in thread "Thread-0" java.util.ConcurrentModificationException

  如果用CopyOnWriteArrayList,則不會有異常。
  CopyOnWriteArrayList:寫入並複製,添加操作多時,效率低,因為每次添加時都會進行複製,開銷非常的大。並發迭代操作多時可以選擇。

四、CountDownLatch(閉鎖)

1、介紹

  java.util.concurrent包中提供了多種並發容器類來改進同步容器的性能。CountDownLatch是一個同步輔助類,在完成某些運算時,只有其他所有線程的運算全部完成,當前運算才繼續執行,這就叫閉鎖。

2、案例

  代碼示例:計算10個線程打印偶數的時間

 1 // 不寫注釋也能看懂的代碼
 2 public class CountDownLatchDemo implements Runnable {
 3 
 4     private final CountDownLatch latch;
 5 
 6     public CountDownLatchDemo(CountDownLatch latch) {
 7         this.latch = latch;
 8     }
 9 
10     @Override
11     public void run() {
12         try {
13             for (int i = 0; i < 50000; i++) {
14                 if (i % 2 == 0) {
15                     System.out.println(i);
16                 }
17             }
18         } finally {
19             // 完成一個線程,計數 -1
20             latch.countDown();
21         }
22     }
23 
24 }
 1 // 測試類
 2 public class Main {
 3 
 4     public static void main(String[] args) {
 5         final CountDownLatch latch = new CountDownLatch(10);
 6         CountDownLatchDemo ld = new CountDownLatchDemo(latch);
 7 
 8         long start = System.currentTimeMillis();
 9 
10         for (int i = 0; i < 10; i++) {
11             new Thread(ld).start();
12         }
13 
14         try {
15             // 等待10個線程都執行完
16             latch.await();
17         } catch (InterruptedException e) {
18         }
19 
20         long end = System.currentTimeMillis();
21 
22         System.out.println("耗費時間為:" + (end - start));
23     }
24 
25 }

五、Callable

  Callable和Runable的區別是,Callable帶泛型,其call方法有返回值。使用的時候,需要用FutureTask來接收返回值。而且它也要等到線程執行完調用get方法才會執行,也可以用於閉鎖操作。
  代碼示例:

 1 // 不寫注釋也能看懂的代碼
 2 public class CallableDemo implements Callable<Integer> {
 3 
 4     @Override
 5     public Integer call() throws Exception {
 6         int sum = 0;
 7 
 8         for (int i = 0; i <= 100; i++) {
 9             sum += i;
10         }
11 
12         return sum;
13     }
14 }
 1 // 測試類
 2 public class Main {
 3     public static void main(String[] args) {
 4         CallableDemo demo = new CallableDemo();
 5 
 6         //執行 Callable 方式,需要 FutureTask 實現類的支持,用於接收運算結果。
 7         FutureTask<Integer> result = new FutureTask<>(demo);
 8         new Thread(result).start();
 9 
10         try {
11             // get()方法是阻塞的
12             Integer sum = result.get();
13             System.out.println(sum);
14 
15             System.out.println("--------表明 get()方法是阻塞的-------------");
16         } catch (InterruptedException | ExecutionException e) {
17             e.printStackTrace();
18         }
19     }
20 }

六、Lock同步鎖

  在JDK1.5之前,解決多線程安全問題用sychronized隱式鎖:同步代碼塊;同步方法。
  在JDK1.5之後,出現了更加靈活的方式Lock顯式鎖:同步鎖。
  Lock需要通過lock()方法上鎖,通過unlock()方法釋放鎖。為了保證鎖能釋放,所有unlock方法一般放在finally中去執行。
  代碼示例:賣票問題

 1 // 不寫注釋也能看懂的代碼
 2 public class LockDemo implements Runnable {
 3 
 4     private int tick = 100;
 5 
 6     private final Lock lock = new ReentrantLock();
 7 
 8     @Override
 9     public void run() {
10         while (true) {
11             //上鎖
12             lock.lock();
13 
14             try {
15                 if (tick > 0) {
16 
17                     try {
18                         Thread.sleep(200);
19                     } catch (InterruptedException e) {
20                     }
21 
22                     System.out.println(Thread.currentThread().getName() + " 完成售票,余票為:" + --tick);
23                 } else {
24                     break;
25                 }
26             } finally {
27                 //釋放鎖
28                 lock.unlock();
29             }
30         }
31     }
32 }
 1 // 測試類
 2 public class Main {
 3     public static void main(String[] args) {
 4         LockDemo ticket = new LockDemo();
 5 
 6         // 開了3個窗口賣票
 7         new Thread(ticket, "1號窗口").start();
 8         new Thread(ticket, "2號窗口").start();
 9         new Thread(ticket, "3號窗口").start();
10     }
11 }
12 
13 // 可能的結果.這裡只出最後10張票的結果值
14 2號窗口 完成售票,余票為:9
15 2號窗口 完成售票,余票為:8
16 3號窗口 完成售票,余票為:7
17 3號窗口 完成售票,余票為:6
18 3號窗口 完成售票,余票為:5
19 3號窗口 完成售票,余票為:4
20 3號窗口 完成售票,余票為:3
21 3號窗口 完成售票,余票為:2
22 1號窗口 完成售票,余票為:1
23 2號窗口 完成售票,余票為:0

七、ReadWriterLock讀寫鎖

  多個線程並發讀數據,是不會出現問題。但是,多個線程並發寫數據,到底是寫入哪個線程的數據呢?所以,寫寫/讀寫需要互斥,讀讀不需要互斥。這個時候可以用讀寫鎖來提高效率。
  ReadWriteLock 維護了一對相關的鎖,一個用於只讀操作,另一個用於寫入操作。只要沒有 writer,讀取鎖可以由多個 reader 線程同時保持。
  讀鎖,可以多個線程並發的持有。
  寫鎖,是獨佔的。
  源碼示例:讀寫鎖

1 public interface ReadWriteLock {
2     // 返回一個讀鎖
3     Lock readLock();
4 
5     // 返回一個寫鎖
6     Lock writeLock();
7 }

  代碼示例:

 1 public class ReadWriteLockDemo {
 2 
 3     private int number = 0;
 4 
 5     private final ReadWriteLock lock = new ReentrantReadWriteLock();
 6 
 7     // 讀.可以多個線程並發讀
 8     public void read() {
 9         // 上讀鎖
10         lock.readLock().lock();
11 
12         try {
13             System.out.println(Thread.currentThread().getName() + " : " + number);
14         } finally {
15             // 釋放讀鎖
16             lock.readLock().unlock();
17         }
18     }
19 
20     // 寫.一次只能有一個線程操作
21     public void write(int number) {
22         // 上寫鎖
23         lock.writeLock().lock();
24 
25         try {
26             System.out.println(Thread.currentThread().getName());
27             this.number = number;
28         } finally {
29             // 釋放寫鎖
30             lock.writeLock().unlock();
31         }
32     }
33 }
 1 // 測試類
 2 public class Main {
 3     public static void main(String[] args) {
 4         ReadWriteLockDemo rw = new ReadWriteLockDemo();
 5 
 6         // 開啟 1 個線程寫
 7         new Thread(new Runnable() {
 8             @Override
 9             public void run() {
10                 rw.write((int) (Math.random() * 101));
11             }
12         }, "Write:").start();
13 
14         // 開啟 100 個線程讀
15         for (int i = 0; i < 100; i++) {
16             new Thread(new Runnable() {
17 
18                 @Override
19                 public void run() {
20                     rw.read();
21                 }
22             }).start();
23         }
24     }
25 }