並發編程之:Atomic

大家好,我是小黑,一個在互聯網苟且偷生的農民工。

在開始講今天的內容之前,先問一個問題,使用int類型做加減操作是不是線程安全的呢?比如 i++ ,++i,i=i+1這樣的操作在並發情況下是否會有問題?

我們通過運行代碼來看一下。

public class AtomicDemo {
    public static void main(String[] args) throws InterruptedException {
        Data data = new Data();
        Thread a = new Thread(() -> {
            for (int i = 0; i < 100000; i++) {
                System.out.println(Thread.currentThread().getName()+"_"+data.increment());
            }
        }, "A");
        Thread b = new Thread(() -> {
            for (int i = 0; i < 100000; i++) {
                System.out.println(Thread.currentThread().getName()+"_"+data.increment());
            }
        }, "B");
        a.start();
        b.start();
        // 等待A,B線程執行完畢
        a.join();
        b.join();
        System.out.println(data.getI());
    }
}

class Data {
    private volatile int i = 0;
    public int increment() {
        i++;
        return i;
    }
    public int getI() {
        return i;
    }
}

以上代碼比較簡單,通過A,B兩個線程同時對Data對象中的i執行++操作,各自執行100000次,最後輸出,如果說i++操作時線程安全的,那麼最後輸出的結果應該是200000,但是我們運行代碼會看到如下結果:

image

我們發現最後輸出的並不是200000,而是199982,如果多執行幾次的話,這個結果會發生變化,並且大多數情況下不會是200000。這主要是因為int類型的++操作不是原子的,i++同等於i=i+1,也就是加1這一步和對i重新賦值這一步不是同時完成的,不具備原子性,所以我們得出結論int類型的操作不是線程安全的。

在很多實際場景中都需要對一個數據進行並發操作,比如電商的秒殺活動中,對一個商品數量的扣減,那麼我們想保證安全性應該怎麼做呢?

首先我們可以想到的就是使用synchronized關鍵字對increment()這個方法加鎖,這樣就能保證每次只有一個線程能訪問。

但是之前的文章中我們有講到synchronized是一個重量級的悲觀鎖,我們的業務場景的並發可能是一段時間內的,多數情況下可能並不會有很多競爭,所以有沒有更好的處理方式呢,答案就是通過AtomicInteger

AtomicInteger

image

AtomicInteger是java.util.concurrent.atomic包中的一個類。我們看官方文檔對於這個包的描述,說它是支持單個變量上的無鎖線程安全編程的工具包,好像和我們期望的一樣,在不加鎖的情況下達到線程安全。

我們來修改一下上面例子的代碼。

class Data {
    private volatile AtomicInteger i = new AtomicInteger(0);

    public int increment() {
        return i.incrementAndGet();
    }

    public int getI() {
        return i.get();
    }
}

很簡單,將原來的int修改為AtomicInteger,在執行increment()方法進行增加操作時,調用incrementAndGet()方法就可以了。同樣我們運行代碼,會發現,不管運行多少次,代碼最後執行的結果都是一樣的,200000。所以我們說AtomicInteger是線程安全的。除了incrementAndGet()方法以外,還有很多其他的操作,比如decrementAndGet(),getAndIncrement(),getAndDecrement(),getAndAdd(int delta),addAndGet(int delta)等等,實際上就是對i++,++i,i=i+n,i+=n這些操作的原子實現。

除了AtomicInteger以外,java.util.concurrent.atomic包中還有一些其他類型,比如AtomicBoolean,AtomicLong等。

image

實現原理

那麼AtomicInteger是如何實現在不使用synchronized的情況下保證原子性的呢?我們來看一下源碼。

public class AtomicInteger extends Number implements java.io.Serializable {
	
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    // value在內存中的地址偏移值
    private static final long valueOffset;
    static {
        try {
            valueOffset = unsafe.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }
    // value為volatile的保證內存可見性
    private volatile int value;

    public final int incrementAndGet() {
        return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
    }
   
}
public final class Unsafe {
    public final int getAndAddInt(Object var1, long var2, int var4) {
        int var5;
        do {
            // 獲取volatile的Int,保證拿到的值是最新的
            var5 = this.getIntVolatile(var1, var2);
            // compareAndSwapInt 比較並交換 native方法
        } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
        return var5;
    }
}

通過源碼我們看到在incrementAndGet()方法中調用了Unsafe類的getAndAddInt方法,在這個方法內部對value進行compareAndSwapInt操作。通過這個方法名我們就可以看出是比較並交換,也就是我們之前提到過的CAS。也就是在執行賦值操作時,先看一下當前值是不是我加之前的值,如果不是,那我就重新加一次之後再進行比較,是一個循環的過程,這個過程也稱作自旋

CAS這種處理方式雖然很高效的解決了原子操作,但是它仍然存在三個問題,在實際開發中一定要注意,結合自己的實際業務場景使用。

ABA問題

什麼是ABA問題呢,通俗理解,就是你大爺還是你大爺,你大媽已經不是你大媽了~

image

什麼意思呢?就是當線程1取到A之後,有另一個線程2把A變成了B,又變成了A,當線程1再修改完值進行CAS比較時,發現值還是A,和自己取到的一樣,就直接更新了,但是在這個過程中,這個A中間是發生過變化的。就好比一個小偷,偷了別人家錢然後再還回來,還是原來的錢嗎?雖然你的錢沒變,但是這個小偷已經觸犯了法律,而你自己還不知道。

為了解決這個問題,atomic包中提供了一個類,我們看下是如何解決的。

public static void main(String[] args) throws InterruptedException {
    AtomicStampedReference<String> ref = new AtomicStampedReference<>("A", 0);
    new Thread(() -> {
        try {
            int stamp = ref.getStamp();
            String reference = ref.getReference();
            System.out.println("線程1拿到的值:" + reference + " stamp:" + stamp);
            // sleep 2秒模擬線程切換到2
            TimeUnit.SECONDS.sleep(2);
            boolean success = ref.compareAndSet(reference, "C", stamp, stamp + 1);
            System.out.println(Thread.currentThread().getName() + " " + success);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }, "線程1").start();

    new Thread(() -> {
        // 先改為B
        int stamp = ref.getStamp();
        String reference = ref.getReference();
        System.out.println("線程2拿到的值:" + reference + " stamp:" + stamp);
        ref.compareAndSet(reference, "B", stamp, stamp + 1);
        // 再改回A
        stamp = ref.getStamp();
        reference = ref.getReference();
        System.out.println("線程2拿到的值:" + reference + " stamp:" + stamp);
        ref.compareAndSet(reference, "A", ref.getStamp(), stamp + 1);
    }, "線程2").start();
}

我們可以看到AtomicStampedReference的compareAndSet()方法有4個參數:

  1. expectedReference:表示期望的引用值
  2. newReference:表示要修改後的新引用值
  3. expectedStamp:表示期望的戳(版本號)
  4. newStamp:表示修改後新的戳(版本號)

什麼意思呢?就是在修改時不光比較值是不是和獲取到的一樣,還要比較版本號。這樣的話,每次操作時都對版本號加1,那麼就算值從A改為B再改回A,但是版本號從0改成了1又改成了2,並沒有變回0,就可以避免ABA問題的發生。

循環時間變長

在並發非常大的情況下,使用CAS可能會存在一些線程一直循環修改不成功,導致循環時間變長,會給CPU帶來很大的執行開銷。並且由於AtomicReference中的引用是volatile的,為了保證內存可見性,需要保證緩存一致性,通過總線傳輸數據,當有大量的CAS循環時,會產生總線風暴

只能保證一個變量的原子操作

CAS的第三個問題就是AtomicReference中只能存放一個變量,如果需要保證多個變量操作的原子性,是做不到的。對於這種情況只能使用synchronized或者juc包中的Lock工具。

小結

簡單做個小結,使用int類型在並發場景下存在線程安全問題,可以用AtomicInteger來保證原子性操作,Atomic是通過CAS做到無鎖線程安全的。但是CAS有三個問題,第一ABA問題,可以通過AtomicStampedReference解決;第二競爭激烈情況下循環時間會變長,會產生總線風暴;第三隻能保證一個變量的原子操作。

具體業務場景中是使用synchronized,Lock等鎖工具還是使用Atomic的CAS無鎖操作,還是要結合場景考慮。


好的,今天的內容就到這裡,我們下期見。

並發編程之:Atomic
並發編程之:線程
關注我的微信公眾號【小黑說Java】,更多乾貨內容。
image