並發原子技術之CAS機制

  • 2020 年 3 月 10 日
  • 筆記

1. CAS機制

CAS定義

  • CAS全稱為Compare-and-swap,是屬於並發多執行緒中實現同步原子操作的指令,是依賴於硬體層次的原語發起的原子操作
  • 從程式程式碼理解上,CAS包含check then act的兩個動作,這兩個動作在在硬體的處理器上是具備原子性,也就是在作業系統底層上已經實現對CAS演算法的原子性保證

CAS 使用條件

  • 需要輸入兩個數值,一個是期望修改前的值(舊值),一個是需要被設置的新值(新值)
  • 進行CAS操作需要進行對預期值的check操作

CAS之簡易版本

  • 通過CAS設置新值
// 簡單的check方式完成對象新值的設置  // cas.java  boolean cas(Object ref, int refOldVal, int refNewVal){      if(ref.value != refOldVal) return false;      ref.value = refNewVal;      return true;  }
  • CAS實現加法設置器
// cas_adder.java  // import cas.java  int cas_adder(Object ref, int incr){      while(!cas(ref, ref.value, ref.value+incr)){            // nothing      }      // 表示修改成功      return ref.value;  }
2. Java之基於UnSafe底層實現的CAS機制

java原子操作類Atomic*實現

  • 比如AtomicBoolean的實現源程式碼
// AtomicBoolean.java 摘取核心程式碼  public class AtomicBoolean implements java.io.Serializable {      private static final long serialVersionUID = 4654671469794556979L;      // setup to use Unsafe.compareAndSwapInt for updates      private static final Unsafe unsafe = Unsafe.getUnsafe();      private static final long valueOffset;      // 使用volatile修飾,保證修改的可見性,令當前的執行緒快取失效,讀取主記憶體修改後的數據      private volatile int value;        static {          try {          // 獲取當前屬性value的記憶體地址偏移量              valueOffset = unsafe.objectFieldOffset                  (AtomicBoolean.class.getDeclaredField("value"));          } catch (Exception ex) { throw new Error(ex); }      }        public final boolean compareAndSet(boolean expect, boolean update) {          int e = expect ? 1 : 0;          int u = update ? 1 : 0;          // 使用unsafe的CAS方法完成修改操作          return unsafe.compareAndSwapInt(this, valueOffset, e, u);      }  }
  • 基於UnSafe實現自定義的CAS操作
// AtomicDefineInt.java  public class AtomicDefineInt {        private volatile int value;      private static final Unsafe unsafe;      // 定義記憶體偏移量      private static final long iValueOffset;        static {          try {          // 必須通過反射獲取Unsafe,本身是屬於不安全的一個操作,直接通過getUnsafe會拋出異常,              Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");              theUnsafe.setAccessible(true);              unsafe = (Unsafe) theUnsafe.get(null);          // 從JNI中獲取value屬性在堆記憶體中的地址偏移量              iValueOffset = unsafe.objectFieldOffset(AtomicDefineInt.class.getDeclaredField("value"));          }catch (NoSuchFieldException e){              throw new RuntimeException(e);          }      }        // 藉助UnSafe調用CAS方法完成操作      public boolean compareAndSet(int expect, int update){          return unsafe.compareAndSwapInt(this, iValueOffset, expect, update);      }  }

Unsafe源碼以及實現

// java程式碼  public final class Unsafe {  // 欄位屬性theUnsafe  private static final Unsafe theUnsafe;    @CallerSensitive      public static Unsafe getUnsafe() {          Class var0 = Reflection.getCallerClass();          // 只允許jvm調用,java程式無法直接調用          if (!VM.isSystemDomainLoader(var0.getClassLoader())) {              throw new SecurityException("Unsafe");          } else {              return theUnsafe;          }      }    // compareAndSwapInt的修改方法為native  // 表示在執行緒的虛擬機棧中載入調用方法,由c++底層源碼實現  public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);  }  // c++程式碼  // 核心方法  // 獲取記憶體偏移量  UNSAFE_ENTRY(jlong, Unsafe_ObjectFieldOffset0(JNIEnv *env, jobject unsafe, jobject field)) {    return find_field_offset(field, 0, THREAD);  } UNSAFE_END    // 完成CAS操作  UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSetInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x)) {    // JNI解析載入java object    oop p = JNIHandles::resolve(obj);    if (p == NULL) {    	// 如果p不是java中定義的面向對象引用,直接從記憶體地址完成修改操作      volatile jint* addr = (volatile jint*)index_oop_from_field_offset_long(p, offset);      return RawAccess<>::atomic_cmpxchg(addr, e, x) == e;    } else {    	// 如果p是java對象,直接在堆記憶體中對p的屬性數據進行修改操作      assert_field_offset_sane(p, offset);      return HeapAccess<>::atomic_cmpxchg_at(p, (ptrdiff_t)offset, e, x) == e;    }  } UNSAFE_END
  • 源碼分析
    • 從上述可知,java底層CAS實現機制是通過JNI環境來調用c++的實現
    • 底層實現之一是根據是否為java對象類型來直接在堆記憶體中完成CAS操作,而另一種是針對非java對象類型則直接從記憶體地址中完成對應的CAS操作
  • UnSafe的實現中可以看出,需要在java中使用UnSafe需要以下條件
    • 必須要有一個對象的屬性在記憶體的偏移量valueOffset
    • 其次需要傳遞對應的java對象p
    • 同時還需要有修改前期望的數值以及要設置修改的值
    • 另外在java程式碼中使用volatile保證數據是刷新到記憶體的,因為JNI是調用c++實現是直接操作堆記憶體的,那麼我們需要在並發多執行緒下保證讀是可見的,寫是最新的
3. CAS存在的問題

CAS問題

  • 在上述的CAS實現程式碼中,對於做一些自增或是自減等數學運算操作時,會產生自旋判斷,容易造成CPU性能下降
  • 在CAS操作僅針對單個變數,如果涉及多個變數的原子操作,CAS是無法保證原子性
  • 最後一個就是ABA問題

ABA問題

  • 程式碼示例
// AtomicDefineObject  public final class AtomicDefineObject implements Serializable {        private volatile String value;      private static Unsafe unsafe;      private static final long valueOffset;        static {          try {              // 反射獲取              Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");              theUnsafe.setAccessible(true);              unsafe = (Unsafe) theUnsafe.get(null);                valueOffset = unsafe.objectFieldOffset(AtomicDefineObject.class.getDeclaredField("value"));          }catch (NoSuchFieldException | IllegalAccessException e){              throw new RuntimeException(e);          }      }        private boolean compareAndSet(String expect, String update){          return unsafe.compareAndSwapObject(this, valueOffset, expect, update);      }        public void setValue(String value){          this.compareAndSet(this.value, value);      }        public String getValue() {          return value;      }  }
  • 對應執行的main方法
main(){          final AtomicDefineObject atomicDefineObject = new AtomicDefineObject();          atomicDefineObject.setValue("A");          System.out.println(Thread.currentThread() + " --- set value to A----");            Thread t1 = new Thread("thread1"){              @Override              public void run() {                  try{                      // 完成寫操作需要1s                      TimeUnit.SECONDS.sleep(1L);                      atomicDefineObject.setValue("B");                      System.out.println(Thread.currentThread() + " --- set value to B----");                  }catch (Exception e){}                }          };              Thread t2 = new Thread("thread2"){              @Override              public void run() {                  try{                      // 完成寫操作需要2s                      TimeUnit.SECONDS.sleep(2L);                      atomicDefineObject.setValue("A");                      System.out.println(Thread.currentThread() + " --- set value to A----");                  }catch (Exception e){}              }          };            Thread t3 = new Thread("thread3"){              @Override              public void run() {                 try{                     // 由於網路原因,讀取操作延遲                     TimeUnit.SECONDS.sleep(3L);                     String val = atomicDefineObject.getValue();                     System.out.println(Thread.currentThread() + " --- get value ----" + val);                 }catch (Exception e){}              }          };          // start && join  }
  • 執行結果
Thread[main,5,main] --- set value to A----  Thread[thread1,5,main] --- set value to B----  Thread[thread2,5,main] --- set value to A----  Thread[thread3,5,main] --- get value ----A  finish task dome ...    ## 最終讀執行緒讀取到的數據是A,但是不知道之前對象已經發生過修改操作,對當前讀操作是一個透明

ABA解決方案

  • 為ABA問題增加版本號,版本號的值設置為long類型的自增加方式,這樣程式就知道共享資源數據的變更情況
  • java提供的一個支援類AtomicStampedReference,通過增加時間戳方式來記錄修改的時候對應的時間戳,這樣的方式便可以知道當前的數據最近修改的時間段
  • ABA技術解決的意義
    • 通過知道數據對象變化的情況,我們可以利用版本或者時間戳的方式記錄修改的變更日誌,方便邏輯業務排查
    • 同時知道變更的時間或者是版本號,可以利用最新的一個數據值來作為一個起點修復過程,比如我們應用在某一個時間點down掉,如果此時更新應用的數據存在ABA問題,那麼可以結合實際場景來進行恢復

感謝花時間閱讀,如果有用歡迎轉發或者點個好看,謝謝!