java高並發系列 – 第21天:java中的CAS操作,java並發的基石
- 2019 年 10 月 3 日
- 筆記
這是java高並發系列第21篇文章。
本文主要內容
- 從網站計數器實現中一步步引出CAS操作
- 介紹java中的CAS及CAS可能存在的問題
- 悲觀鎖和樂觀鎖的一些介紹及數據庫樂觀鎖的一個常見示例
- 使用java中的原子操作實現網站計數器功能
我們需要解決的問題
需求:我們開發了一個網站,需要對訪問量進行統計,用戶每次發一次請求,訪問量+1,如何實現呢?
下面我們來模仿有100個人同時訪問,並且每個人對咱們的網站發起10次請求,最後總訪問次數應該是1000次。實現訪問如下。
方式1
代碼如下:
package com.itsoku.chat20; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; /** * 跟着阿里p7學並發,微信公眾號:javacode2018 */ public class Demo1 { //訪問次數 static int count = 0; //模擬訪問一次 public static void request() throws InterruptedException { //模擬耗時5毫秒 TimeUnit.MILLISECONDS.sleep(5); count++; } public static void main(String[] args) throws InterruptedException { long starTime = System.currentTimeMillis(); int threadSize = 100; CountDownLatch countDownLatch = new CountDownLatch(threadSize); for (int i = 0; i < threadSize; i++) { Thread thread = new Thread(() -> { try { for (int j = 0; j < 10; j++) { request(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { countDownLatch.countDown(); } }); thread.start(); } countDownLatch.await(); long endTime = System.currentTimeMillis(); System.out.println(Thread.currentThread().getName() + ",耗時:" + (endTime - starTime) + ",count=" + count); } }
輸出:
main,耗時:138,count=975
代碼中的count用來記錄總訪問次數,request()方法表示訪問一次,內部休眠5毫秒模擬內部耗時,request方法內部對count++操作。程序最終耗時1秒多,執行還是挺快的,但是count和我們期望的結果不一致,我們期望的是1000,實際輸出的是973(每次運行結果可能都不一樣)。
分析一下問題出在哪呢?
代碼中採用的是多線程的方式來操作count,count++會有線程安全問題,count++操作實際上是由以下三步操作完成的:
- 獲取count的值,記做A:A=count
- 將A的值+1,得到B:B = A+1
- 讓B賦值給count:count = B
如果有A、B兩個線程同時執行count++,他們同時執行到上面步驟的第1步,得到的count是一樣的,3步操作完成之後,count只會+1,導致count只加了一次,從而導致結果不準確。
那麼我們應該怎麼做的呢?
對count++操作的時候,我們讓多個線程排隊處理,多個線程同時到達request()方法的時候,只能允許一個線程可以進去操作,其他的線程在外面候着,等裏面的處理完畢出來之後,外面等着的再進去一個,這樣操作count++就是排隊進行的,結果一定是正確的。
我們前面學了synchronized、ReentrantLock可以對資源加鎖,保證並發的正確性,多線程情況下可以保證被鎖的資源被串行訪問,那麼我們用synchronized來實現一下。
使用synchronized實現
代碼如下:
package com.itsoku.chat20; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; /** * 跟着阿里p7學並發,微信公眾號:javacode2018 */ public class Demo2 { //訪問次數 static int count = 0; //模擬訪問一次 public static synchronized void request() throws InterruptedException { //模擬耗時5毫秒 TimeUnit.MILLISECONDS.sleep(5); count++; } public static void main(String[] args) throws InterruptedException { long starTime = System.currentTimeMillis(); int threadSize = 100; CountDownLatch countDownLatch = new CountDownLatch(threadSize); for (int i = 0; i < threadSize; i++) { Thread thread = new Thread(() -> { try { for (int j = 0; j < 10; j++) { request(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { countDownLatch.countDown(); } }); thread.start(); } countDownLatch.await(); long endTime = System.currentTimeMillis(); System.out.println(Thread.currentThread().getName() + ",耗時:" + (endTime - starTime) + ",count=" + count); } }
輸出:
main,耗時:5563,count=1000
程序中request方法使用synchronized關鍵字,保證了並發情況下,request方法同一時刻只允許一個線程訪問,request加鎖了相當於串行執行了,count的結果和我們預期的結果一致,只是耗時比較長,5秒多。
方式3
我們在看一下count++操作,count++操作實際上是被拆分為3步驟執行:
1. 獲取count的值,記做A:A=count 2. 將A的值+1,得到B:B = A+1 3. 讓B賦值給count:count = B
方式2中我們通過加鎖的方式讓上面3步驟同時只能被一個線程操作,從而保證結果的正確性。
我們是否可以只在第3步加鎖,減少加鎖的範圍,對第3步做以下處理:
獲取鎖 第三步獲取一下count最新的值,記做LV 判斷LV是否等於A,如果相等,則將B的值賦給count,並返回true,否者返回false 釋放鎖
如果我們發現第3步返回的是false,我們就再次去獲取count,將count賦值給A,對A+1賦值給B,然後再將A、B的值帶入到上面的過程中執行,直到上面的結果返回true為止。
我們用代碼來實現,如下:
package com.itsoku.chat20; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; /** * 跟着阿里p7學並發,微信公眾號:javacode2018 */ public class Demo3 { //訪問次數 volatile static int count = 0; //模擬訪問一次 public static void request() throws InterruptedException { //模擬耗時5毫秒 TimeUnit.MILLISECONDS.sleep(5); int expectCount; do { expectCount = getCount(); } while (!compareAndSwap(expectCount, expectCount + 1)); } /** * 獲取count當前的值 * * @return */ public static int getCount() { return count; } /** * @param expectCount 期望count的值 * @param newCount 需要給count賦的新值 * @return */ public static synchronized boolean compareAndSwap(int expectCount, int newCount) { //判斷count當前值是否和期望的expectCount一樣,如果一樣將newCount賦值給count if (getCount() == expectCount) { count = newCount; return true; } return false; } public static void main(String[] args) throws InterruptedException { long starTime = System.currentTimeMillis(); int threadSize = 100; CountDownLatch countDownLatch = new CountDownLatch(threadSize); for (int i = 0; i < threadSize; i++) { Thread thread = new Thread(() -> { try { for (int j = 0; j < 10; j++) { request(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { countDownLatch.countDown(); } }); thread.start(); } countDownLatch.await(); long endTime = System.currentTimeMillis(); System.out.println(Thread.currentThread().getName() + ",耗時:" + (endTime - starTime) + ",count=" + count); } }
輸出:
main,耗時:116,count=1000
代碼中用了volatile關鍵字修飾了count,可以保證count在多線程情況下的可見性。關於volatile關鍵字的使用,也是非常非常重要的,前面有講過,不太了解的朋友可以去看一下:volatile與Java內存模型
咱們再看一下代碼,compareAndSwap方法,我們給起個簡稱吧叫CAS,這個方法有什麼作用呢?這個方法使用synchronized修飾了,能保證此方法是線程安全的,多線程情況下此方法是串行執行的。方法由兩個參數,expectCount:表示期望的值,newCount:表示要給count設置的新值。方法內部通過getCount()獲取count當前的值,然後與期望的值expectCount比較,如果期望的值和count當前的值一致,則將新值newCount賦值給count。
再看一下request()方法,方法中有個do-while循環,循環內部獲取count當前值賦值給了expectCount,循環結束的條件是compareAndSwap返回true,也就是說如果compareAndSwap如果不成功,循環再次獲取count的最新值,然後+1,再次調用compareAndSwap方法,直到compareAndSwap返回成功為止。
代碼中相當於將count++拆分開了,只對最後一步加鎖了,減少了鎖的範圍,此代碼的性能是不是比方式2快不少,還能保證結果的正確性。大家是不是感覺這個compareAndSwap方法挺好的,這東西確實很好,java中已經給我們提供了CAS的操作,功能非常強大,我們繼續向下看。
CAS
CAS,compare and swap的縮寫,中文翻譯成比較並交換。
CAS 操作包含三個操作數 —— 內存位置(V)、預期原值(A)和新值(B)。 如果內存位置的值與預期原值相匹配,那麼處理器會自動將該位置值更新為新值 。否則,處理器不做任何操作。無論哪種情況,它都會在 CAS 指令之前返回該 位置的值。(在 CAS 的一些特殊情況下將僅返回 CAS 是否成功,而不提取當前 值。)CAS 有效地說明了「我認為位置 V 應該包含值 A;如果包含該值,則將 B 放到這個位置;否則,不要更改該位置,只告訴我這個位置現在的值即可。」
通常將 CAS 用於同步的方式是從地址 V 讀取值 A,執行多步計算來獲得新 值 B,然後使用 CAS 將 V 的值從 A 改為 B。如果 V 處的值尚未同時更改,則 CAS 操作成功。
系統底層進行CAS操作的時候,會判斷當前系統是否為多核系統,如果是就給總線加鎖,只有一個線程會對總線加鎖成功,加鎖成功之後會執行cas操作,也就是說CAS的原子性實際上是CPU實現的, 其實在這一點上還是有排他鎖的.,只是比起用synchronized, 這裡的排他時間要短的多, 所以在多線程情況下性能會比較好。
java中提供了對CAS操作的支持,具體在sun.misc.Unsafe類中,聲明如下:
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5); public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5); public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
上面三個方法都是類似的,主要對4個參數做一下說明。
var1:表示要操作的對象
var2:表示要操作對象中屬性地址的偏移量
var4:表示需要修改數據的期望的值
var5:表示需要修改為的新值
JUC包中大部分功能都是依靠CAS操作完成的,所以這塊也是非常重要的,有關Unsafe類,下篇文章會具體講解。
synchronized、ReentrantLock這種獨佔鎖屬於悲觀鎖,它是在假設需要操作的代碼一定會發生衝突的,執行代碼的時候先對代碼加鎖,讓其他線程在外面等候排隊獲取鎖。悲觀鎖如果鎖的時間比較長,會導致其他線程一直處於等待狀態,像我們部署的web應用,一般部署在tomcat中,內部通過線程池來處理用戶的請求,如果很多請求都處於等待獲取鎖的狀態,可能會耗盡tomcat線程池,從而導致系統無法處理後面的請求,導致服務器處於不可用狀態。
除此之外,還有樂觀鎖,樂觀鎖的含義就是假設系統沒有發生並發衝突,先按無鎖方式執行業務,到最後了檢查執行業務期間是否有並發導致數據被修改了,如果有並發導致數據被修改了 ,就快速返回失敗,這樣的操作使系統並發性能更高一些。cas中就使用了這樣的操作。
關於樂觀鎖這塊,想必大家在數據庫中也有用到過,給大家舉個例子,可能以後會用到。
如果你們的網站中有調用支付寶充值接口的,支付寶那邊充值成功了會回調商戶系統,商戶系統接收到請求之後怎麼處理呢?假設用戶通過支付寶在商戶系統中充值100,支付寶那邊會從用戶賬戶中扣除100,商戶系統接收到支付寶請求之後應該在商戶系統中給用戶賬戶增加100,並且把訂單狀態置為成功。
處理過程如下:
開啟事務 獲取訂單信息 if(訂單狀態==待處理){ 給用戶賬戶增加100 將訂單狀態更新為成功 } 返回訂單處理成功 提交事務
由於網絡等各種問題,可能支付寶回調商戶系統的時候,回調超時了,支付寶又發起了一筆回調請求,剛好這2筆請求同時到達上面代碼,最終結果是給用戶賬戶增加了200,這樣事情就搞大了,公司蒙受損失,嚴重點可能讓公司就此倒閉了。
那我們可以用樂觀鎖來實現,給訂單表加個版本號version,要求每次更新訂單數據,將版本號+1,那麼上面的過程可以改為:
獲取訂單信息,將version的值賦值給V_A if(訂單狀態==待處理){ 開啟事務 給用戶賬戶增加100 update影響行數 = update 訂單表 set version = version + 1 where id = 訂單號 and version = V_A; if(update影響行數==1){ 提交事務 }else{ 回滾事務 } } 返回訂單處理成功
上面的update語句相當於我們說的CAS操作,執行這個update語句的時候,多線程情況下,數據庫會對當前訂單記錄加鎖,保證只有一條執行成功,執行成功的,影響行數為1,執行失敗的影響行數為0,根據影響行數來決定提交還是回滾事務。上面操作還有一點是將事務範圍縮小了,也提升了系統並發處理的性能。這個知識點希望你們能get到。
CAS 的問題
cas這麼好用,那麼有沒有什麼問題呢?還真有
ABA問題
CAS需要在操作值的時候檢查下值有沒有發生變化,如果沒有發生變化則更新,但是如果一個值原來是A,變成了B,又變成了A,那麼使用CAS進行檢查時會發現它的值沒有發生變化,但是實際上卻變化了。這就是CAS的ABA問題。 常見的解決思路是使用版本號。在變量前面追加上版本號,每次變量更新的時候把版本號加一,那麼A-B-A 就會變成1A-2B-3A。 目前在JDK的atomic包里提供了一個類AtomicStampedReference來解決ABA問題。這個類的compareAndSet方法作用是首先檢查當前引用是否等於預期引用,並且當前標誌是否等於預期標誌,如果全部相等,則以原子方式將該引用和該標誌的值設置為給定的更新值。
循環時間長開銷大
上面我們說過如果CAS不成功,則會原地循環(自旋操作),如果長時間自旋會給CPU帶來非常大的執行開銷。並發量比較大的情況下,CAS成功概率可能比較低,可能會重試很多次才會成功。
使用JUC中的類實現計數器
juc框架中提供了一些原子操作,底層是通過Unsafe類中的cas操作實現的。通過原子操作可以保證數據在並發情況下的正確性。
此處我們使用java.util.concurrent.atomic.AtomicInteger類來實現計數器功能,AtomicInteger內部是採用cas操作來保證對int類型數據增減操作在多線程情況下的正確性。
計數器代碼如下:
package com.itsoku.chat20; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * 跟着阿里p7學並發,微信公眾號:javacode2018 */ public class Demo4 { //訪問次數 static AtomicInteger count = new AtomicInteger(); //模擬訪問一次 public static void request() throws InterruptedException { //模擬耗時5毫秒 TimeUnit.MILLISECONDS.sleep(5); //對count原子+1 count.incrementAndGet(); } public static void main(String[] args) throws InterruptedException { long starTime = System.currentTimeMillis(); int threadSize = 100; CountDownLatch countDownLatch = new CountDownLatch(threadSize); for (int i = 0; i < threadSize; i++) { Thread thread = new Thread(() -> { try { for (int j = 0; j < 10; j++) { request(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { countDownLatch.countDown(); } }); thread.start(); } countDownLatch.await(); long endTime = System.currentTimeMillis(); System.out.println(Thread.currentThread().getName() + ",耗時:" + (endTime - starTime) + ",count=" + count); } }
輸出:
main,耗時:119,count=1000
耗時很短,並且結果和期望的一致。
關於原子類操作,都位於java.util.concurrent.atomic包中,下篇文章我們主要來介紹一下這些常用的類及各自的使用場景。
java高並發系列
- java高並發系列 – 第1天:必須知道的幾個概念
- java高並發系列 – 第2天:並發級別
- java高並發系列 – 第3天:有關並行的兩個重要定律
- java高並發系列 – 第4天:JMM相關的一些概念
- java高並發系列 – 第5天:深入理解進程和線程
- java高並發系列 – 第6天:線程的基本操作
- java高並發系列 – 第7天:volatile與Java內存模型
- java高並發系列 – 第8天:線程組
- java高並發系列 – 第9天:用戶線程和守護線程
- java高並發系列 – 第10天:線程安全和synchronized關鍵字
- java高並發系列 – 第11天:線程中斷的幾種方式
- java高並發系列 – 第12天JUC:ReentrantLock重入鎖
- java高並發系列 – 第13天:JUC中的Condition對象
- java高並發系列 – 第14天:JUC中的LockSupport工具類,必備技能
- java高並發系列 – 第15天:JUC中的Semaphore(信號量)
- java高並發系列 – 第16天:JUC中等待多線程完成的工具類CountDownLatch,必備技能
- java高並發系列 – 第17天:JUC中的循環柵欄CyclicBarrier的6種使用場景
- java高並發系列 – 第18天:JAVA線程池,這一篇就夠了
- java高並發系列 – 第19天:JUC中的Executor框架詳解1
- java高並發系列 – 第20天:JUC中的Executor框架詳解2
阿里p7一起學並發,公眾號:路人甲java,每天獲取最新文章!

