JUC並發編程與高性能記憶體隊列disruptor實戰-上

JUC並發實戰

Synchonized與Lock

區別

  • Synchronized是Java的關鍵字,由JVM層面實現的,Lock是一個介面,有實現類,由JDK實現。
  • Synchronized無法獲取鎖的狀態,Lock可以判斷是否獲取到了鎖。
  • Synchronized自動釋放鎖,lock一般在finally中手動釋放,如果不釋放鎖,會死鎖。
  • Synchronized 執行緒1(獲得鎖,阻塞),執行緒2(等待,傻傻的等); lock鎖不一定會等待下去(lock.tryLock())
  • Synchronized是可重入的,不可中斷的,非公平鎖。Lock, 可重入鎖,可以判斷鎖,非公平鎖。
  • Synchronized 適合鎖少量的程式碼同步問題,Lock適合鎖大量的同步程式碼。

程式碼示例

高鐵票類synchronized實現TicketS.java,對於執行緒來說也屬於資源類

package cn.itxs.synchronize;

public class TicketS {
    private int quantify = 20;
    public synchronized void sale(){
        if (quantify > 0) {
            System.out.println("當前執行緒"+Thread.currentThread().getName() + "賣出了第" + quantify-- + "張高鐵票,剩餘票數量為" + quantify);
        }
    }
}

高鐵票類Lock實現TicketL.java

package cn.itxs.synchronize;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class TicketL {
    private int quantify = 20;
    private Lock lock = new ReentrantLock();
    public void sale(){
        lock.lock();
        try {
            if (quantify > 0) {
                System.out.println("當前執行緒"+Thread.currentThread().getName() + "賣出了第" + quantify-- + "張高鐵票,剩餘票數量為" + quantify);
            }
        }catch (Exception e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
}

測試類,下面執行緒的使用採用lambda表達式寫法,屬於JDK8的特性之一

package cn.itxs.synchronize;

public class ThreadMain {
    public static void main(String[] args) {
        TicketS ticketS = new TicketS();
        System.out.println("ticketS-----------");
        new Thread(() -> {
            for (int i = 1; i < 30; i++) {
                ticketS.sale();
            }
        },"第一個執行緒").start();

        new Thread(() -> {
            for (int i = 1; i < 30; i++) {
                ticketS.sale();
            }
        },"第二個執行緒").start();

        new Thread(() -> {
            for (int i = 1; i < 30; i++) {
                ticketS.sale();
            }
        },"第三個執行緒").start();

        System.out.println("ticketL-----------");

        TicketL ticketL = new TicketL();
        new Thread(() -> {
            for (int i = 1; i < 30; i++) {
                ticketL.sale();
            }
        },"第一個執行緒").start();

        new Thread(() -> {
            for (int i = 1; i < 30; i++) {
                ticketL.sale();
            }
        },"第二個執行緒").start();

        new Thread(() -> {
            for (int i = 1; i < 30; i++) {
                ticketL.sale();
            }
        },"第三個執行緒").start();
    }
}

虛假喚醒

概述

  • 虛假喚醒是指當一定的條件觸發時會喚醒很多在阻塞態的執行緒,但只有部分的執行緒喚醒是有用的,其餘執行緒的喚醒是多餘的;比如說賣貨,如果本來沒有貨物,突然進了一件貨物,這時所有的顧客都被通知了,但是只能一個人買,所以對其他人都是做了無用的通知。

程式碼示例

計算類,提供加一減一的0和1結果,Counter.java

package cn.itxs.counter;

public class Counter {
    private int count = 0;

    public synchronized void addCount() throws InterruptedException {
        if (count > 0){
            //執行緒開始等待
            this.wait();
        }
        count++;
        System.out.println("當前執行緒為" + Thread.currentThread().getName() + ",count=" + count);
        //通知其他執行緒
        this.notifyAll();
    }

    public synchronized void subtractCount() throws InterruptedException {
        if (count == 0){
            //執行緒開始等待
            this.wait();
        }
        count--;
        System.out.println("當前執行緒為" + Thread.currentThread().getName() + ",count=" + count);
        //通知其他執行緒
        this.notifyAll();
    }
}

測試類

package cn.itxs.counter;

public class CounterMain {
    public static void main(String[] args) {
        Counter counter = new Counter();
        new Thread(() -> {
            for (int i = 1; i < 10; i++) {
                try {
                    counter.addCount();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"第一個執行緒").start();

        new Thread(() -> {
            for (int i = 1; i < 10; i++) {
                try {
                    counter.subtractCount();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"第二個執行緒").start();

        new Thread(() -> {
            for (int i = 1; i < 10; i++) {
                try {
                    counter.addCount();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"第三個執行緒").start();

        new Thread(() -> {
            for (int i = 1; i < 10; i++) {
                try {
                    counter.subtractCount();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"第四個執行緒").start();
    }
}

image-20220114141728250

從上面分析可以知道導致虛假喚醒的原因主要就是一個執行緒直接在if程式碼塊中被喚醒了,這時它已經跳過了if判斷。我們只需要將if判斷改為while,這樣執行緒就會被重複判斷而不再會跳出判斷程式碼塊,從而不會產生虛假喚醒這種情況了。

image-20220114142359979

Callable

Callable任務可拿到一個Future對象,表示非同步計算的結果,它提供了檢查是否計算完成的方法,以等待計算的完成,並檢索計算的結果,通過Future對象可以了解任務執行情況,可以取消任務的執行,還可以獲取執行結果。

  • Runnable和Callable的區別
    • Callable規定的方法是call(),Runnable規定的介面是run();
    • Callable的任務執行後可返回值,而Runnable的任務是不能有返回值的;
    • call方法可以拋出異常,run方法不可以

實現Callable介面資源類MessageThread.java

package cn.itxs.collection;

import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

public class MessageThread implements Callable<Integer> {
    @Override
    public Integer call() throws Exception {
        System.out.println("hello callable!");
        TimeUnit.SECONDS.sleep(3);
        return 100;
    }
}

測試類

package cn.itxs.collection;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class CallableMain {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        MessageThread messageThread = new MessageThread();
        FutureTask futureTask = new FutureTask(messageThread);
        new Thread(futureTask,"FutureTaskTest").start();
        Integer res = (Integer)futureTask.get();
        System.out.println(res);
    }
}

非同步回調

package cn.itxs.asyncall;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class AsynCallMain {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + ",async call void");
        });
        System.out.println("等待執行緒非同步執行");
        completableFuture.get();

        CompletableFuture<String> completableFutureR = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //int i = 1/0; //取消注釋後下面e輸出詳細資訊,返回值為bad
            System.out.println(Thread.currentThread().getName() + ",async call return");
            return "good";
        });
        System.out.println("等待執行緒非同步執行");
        System.out.println(completableFutureR.whenComplete((s, e) -> {
            System.out.println("s=" + s + ",e=" + e);
        }).exceptionally((e) -> {
            System.out.println(e.getMessage());
            return "bad";
        }).get());
    }
}

image-20220115230203289

Lock+Condition

程式碼示例

先將上一小節改造為Lock+Condition版本下CounterL.java

package cn.itxs.counter;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class CounterL {
    private int count = 0;
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    public void addCount() throws InterruptedException {
        lock.lock();
        try {
            while (count > 0){
                //執行緒開始等待
                condition.await();
            }
            count++;
            System.out.println("當前執行緒為" + Thread.currentThread().getName() + ",count=" + count);
            //通知其他執行緒
            condition.signalAll();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }

    public void subtractCount() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0){
                //執行緒開始等待
                condition.await();
            }
            count--;
            System.out.println("當前執行緒為" + Thread.currentThread().getName() + ",count=" + count);
            //通知其他執行緒
            condition.signalAll();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
}

和上面的執行結果是一樣,但Lock+Condition可以實現精準的喚醒

CounterA.java

package cn.itxs.counter;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class CounterA {
    private int count = 1;
    private Lock lock = new ReentrantLock();
    private Condition condition1 = lock.newCondition();
    private Condition condition2 = lock.newCondition();
    private Condition condition3 = lock.newCondition();

    public void testMethod1() throws InterruptedException {
        lock.lock();
        try {
            while (count != 1){
                //執行緒開始等待
                condition1.await();
            }
            count = 2;
            System.out.println("當前執行緒為" + Thread.currentThread().getName() + ",testMethod1 count=" + count);
            //通知其他執行緒
            condition2.signal();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }

    public void testMethod2() throws InterruptedException {
        lock.lock();
        try {
            while (count != 2){
                //執行緒開始等待
                condition2.await();
            }
            count = 3;
            System.out.println("當前執行緒為" + Thread.currentThread().getName() + ",testMethod2 count=" + count);
            //通知其他執行緒
            condition3.signal();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }

    public void testMethod3() throws InterruptedException {
        lock.lock();
        try {
            while (count != 3){
                //執行緒開始等待
                condition3.await();
            }
            count = 1;
            System.out.println("當前執行緒為" + Thread.currentThread().getName() + ",testMethod3 count=" + count);
            //通知其他執行緒
            condition1.signal();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
}

測試類

package cn.itxs.counter;

public class CounterAMain {

    public static void main(String[] args) {
        CounterA counterA = new CounterA();
        new Thread(() -> {
            for (int i = 1; i < 10; i++) {
                try {
                    counterA.testMethod1();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"第一個執行緒").start();

        new Thread(() -> {
            for (int i = 1; i < 10; i++) {
                try {
                    counterA.testMethod2();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"第二個執行緒").start();

        new Thread(() -> {
            for (int i = 1; i < 10; i++) {
                try {
                    counterA.testMethod3();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"第三個執行緒").start();
    }
}

image-20220114160512200

鎖的常識

package cn.itxs.lock;

import java.util.concurrent.TimeUnit;

public class Sport {
    public synchronized void playBasketBall(){
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + "打籃球");
    }

    public synchronized void swimming(){
        System.out.println(Thread.currentThread().getName() + "去游泳");
    }

    //普通方法
    public void dancing(){
        System.out.println(Thread.currentThread().getName() + "去跳舞");
    }

    public synchronized void singing(){
        System.out.println(Thread.currentThread().getName() + "去K歌");
    }

    //靜態同步方法
    public static synchronized void skating(){
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + "去滑冰");
    }

    //靜態同步方法
    public static synchronized void climbing(){
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + "去登山");
    }

    public synchronized void shooting(){
        System.out.println(Thread.currentThread().getName() + "去射擊");
    }
}

測試類

package cn.itxs.lock;

public class LockDemo {
    public static void main(String[] args) {
        Sport sport = new Sport();
        Sport sport1 = new Sport();
        Sport sport2 = new Sport();
        Sport sport3 = new Sport();
        Sport sport4 = new Sport();
        new Thread(() -> {
            sport.playBasketBall();
        },"第一個執行緒").start();

        new Thread(() -> {
            sport.swimming();
        },"第二個執行緒").start();

        new Thread(() -> {
            sport.dancing();
        },"第三個執行緒").start();

        new Thread(() -> {
            sport1.swimming();
        },"第四個執行緒").start();

        new Thread(() -> {
            sport2.skating();
        },"第五個執行緒").start();

        new Thread(() -> {
            sport3.climbing();
        },"第六個執行緒").start();

        new Thread(() -> {
            sport3.shooting();
        },"第七個執行緒").start();
    }
}

image-20220114171253621

從上面的結果我們可以知道synchronized鎖的是方法的調用者,對於同一對象同步方法誰先拿到鎖先執行,而不同對象如sport和sport1是兩個對象相當於兩把鎖,互不相干;對於static同步方法鎖的是class,兩個對象的類class只有一個,相同對象的類的靜態同步方法也是誰先拿到鎖先執行;對於同一對象的靜態同步方法和同步方法屬於class和對象也是兩把鎖,互不相干。

並發集合類

CopyOnWriteArrayList

image-20220114172351216

package cn.itxs.collection;

import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;

public class ListDemo {
    public static void main(String[] args) {
        //ArrayList不是執行緒安全
        //List<String> list = new ArrayList<String>();
        //List<String> list = new Vector<>(); //第一種方法,這種是集合方法加了synchronized變為同步方法
        //List<String> list = Collections.synchronizedList(new ArrayList<String>()); //第二種方法,將ArrayList通過Collections工具類轉為同步集合
        List<String> list = new CopyOnWriteArrayList<>();
        for (int i = 0; i < 50; i++) {
            new Thread(() -> {
                list.add(UUID.randomUUID().toString());
                System.out.println(list);
            },String.valueOf(i)+"執行緒").start();
        }
    }
}

CopyOnWriteArraySet

package cn.itxs.collection;

import java.util.*;
import java.util.concurrent.CopyOnWriteArraySet;

public class SetDemo {
    public static void main(String[] args) {
        //HashSet不是執行緒安全
        //Set<String> set = new HashSet<>();
        //Set<String> set = Collections.synchronizedSet(new HashSet<String>()); //將HashSet通過Collections工具類轉為同步集合
        Set<String> set= new CopyOnWriteArraySet<>();
        for (int i = 0; i < 50; i++) {
            new Thread(() -> {
                set.add(UUID.randomUUID().toString());
                System.out.println(set);
            },String.valueOf(i)+"執行緒").start();
        }
    }
}

ConcurrentHashMap

package cn.itxs.collection;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

public class MapDemo {
    public static void main(String[] args) {
        //HashMap不是執行緒安全
        //Map<String,String> map = new HashMap<>();
        Map<String,String> map = new ConcurrentHashMap<>();
        for (int i = 0; i < 50; i++) {
            new Thread(() -> {
                map.put(UUID.randomUUID().toString(),UUID.randomUUID().toString());
                System.out.println(map);
            },String.valueOf(i)+"執行緒").start();
        }
    }
}

並發編程輔助類

CountDowmLatch

CountDownLatch一般被稱作”計數器”,當數量達到了某個點之後計數結束,才能繼續往下走,可用於流程式控制制,大流程分成多個子流程,然後大流程在子流程全部結束之前不動(子流程最好是相互獨立的,除非能很好的控制兩個流程的關聯關係),子流程全部結束後大流程開始操作。

package cn.itxs.tool;

import java.util.concurrent.CountDownLatch;

public class CDLMain {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(10);
        for (int i = 1; i <= 10; i++) {
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + "進入核酸檢測排隊區域");
                countDownLatch.countDown();
            },String.valueOf(i)).start();
        }
        countDownLatch.await();
        System.out.println("開始進行一組核酸監測");
    }
}

CyclicBarrier

CyclicBarrier通過它可以實現讓一組執行緒等待至某個狀態之後再全部同時執行。叫做迴環是因為當所有等待執行緒都被釋放以後,CyclicBarrier可以被重用。

package cn.itxs.tool;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;

public class CBMain {
    public static void main(String[] args) throws InterruptedException {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5,() -> {
            System.out.println("集齊五福兌取大獎");
        });

        for (int i = 1; i < 6; i++) {
            final int count = i;
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + ",獲取到第" + count + "種福");
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }).start();
        }

        TimeUnit.SECONDS.sleep(5);

        for (int i = 1; i < 6; i++) {
            final int count = i;
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + ",獲取到第" + count + "種福");
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

Semaphore

Semaphore通常叫它訊號量, 可以用來控制同時訪問特定資源的執行緒數量,通過協調各個執行緒,以保證合理的使用資源.通常用於那些資源有明確訪問數量限制的場景,常用於限流 。比如:資料庫連接池,同時進行連接的執行緒有數量限制,連接不能超過一定的數量,當連接達到了限制數量後,後面的執行緒只能排隊等前面的執行緒釋放了資料庫連接才能獲得資料庫連接。

package cn.itxs.tool;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class SemaphoreMain {
    public static void main(String[] args) {
        //最多同時處理4個請求
        Semaphore semaphore = new Semaphore(4);
        for (int i = 1; i <= 20; i++) {
            new Thread(() -> {
                try {
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName() + "請求處理開始");
                    TimeUnit.SECONDS.sleep(3);
                    System.out.println(Thread.currentThread().getName() + "請求處理結束");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release();
                }
            },String.valueOf(i)).start();
        }
    }
}

隊列

集合介面包含隊列介面,常見的隊列有阻塞隊列和同步隊列。

image-20220115164705033

阻塞隊列

阻塞隊列存在四組API,分別對應著四種隊列的阻塞情況。

image-20220115172518602

package cn.itxs.queue;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class BlockIngQueueMain {
    public static void main(String[] args) throws InterruptedException {
        System.out.println("異常拋出測試----------");
        BlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(3);
        arrayBlockingQueue.add("hello");
        arrayBlockingQueue.add("world");
        arrayBlockingQueue.add("java");
        //arrayBlockingQueue.add("queue"); //這裡取消注釋則會拋Queue full異常
        System.out.println(arrayBlockingQueue.element()); //獲取隊頂元素但不出隊
        System.out.println(arrayBlockingQueue.remove());
        System.out.println(arrayBlockingQueue.remove());
        System.out.println(arrayBlockingQueue.remove());
        //System.out.println(arrayBlockingQueue.remove()); //這裡取消注釋且隊列沒有元素會拋NoSuchElementException異常
        //System.out.println(arrayBlockingQueue.element());  //這裡取消注釋會拋且隊列沒有元素NoSuchElementException異常

        System.out.println("返回值測試----------");
        BlockingQueue<String> arrayBlockingQueueR = new ArrayBlockingQueue<>(3);
        System.out.println(arrayBlockingQueueR.offer("hello"));
        System.out.println(arrayBlockingQueueR.offer("world"));
        System.out.println(arrayBlockingQueueR.offer("java"));
        System.out.println(arrayBlockingQueueR.offer("queue"));
        System.out.println(arrayBlockingQueueR.peek());
        System.out.println(arrayBlockingQueueR.poll());
        System.out.println(arrayBlockingQueueR.poll());
        System.out.println(arrayBlockingQueueR.poll());
        System.out.println(arrayBlockingQueueR.poll());
        System.out.println(arrayBlockingQueueR.peek());

        System.out.println("超時等待timeoout時間測試----------");
        BlockingQueue<String> arrayBlockingQueueT = new ArrayBlockingQueue<>(3);
        System.out.println(arrayBlockingQueueT.offer("hello"));
        System.out.println(arrayBlockingQueueT.offer("world"));
        System.out.println(arrayBlockingQueueT.offer("java"));
        System.out.println(arrayBlockingQueueT.offer("queue",3, TimeUnit.SECONDS));
        System.out.println(arrayBlockingQueueT.poll());
        System.out.println(arrayBlockingQueueT.poll());
        System.out.println(arrayBlockingQueueT.poll());
        System.out.println(arrayBlockingQueueT.poll(3, TimeUnit.SECONDS));

        System.out.println("一直阻塞測試----------");
        BlockingQueue<String> arrayBlockingQueueB = new ArrayBlockingQueue<>(3);
        arrayBlockingQueueB.put("hello");
        arrayBlockingQueueB.put("world");
        arrayBlockingQueueB.put("java");
        //arrayBlockingQueueB.put("queue");  //這裡取消注釋會一直阻塞
        System.out.println(arrayBlockingQueueB.take());
        System.out.println(arrayBlockingQueueB.take());
        System.out.println(arrayBlockingQueueB.take());
        System.out.println(arrayBlockingQueueB.take());  //當元素為空時一直阻塞
    }
}

同步隊列

在同步隊列中只有出隊以後才允許入隊,否則一直處於阻塞狀態。

package cn.itxs.queue;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

public class SynchronousQueueMain {
    public static void main(String[] args) {
        BlockingQueue<String> synchronousQueue = new SynchronousQueue<>();
        new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName()+"put hello");
                synchronousQueue.put("hello");
                System.out.println(Thread.currentThread().getName()+"put world");
                synchronousQueue.put("world");
                System.out.println(Thread.currentThread().getName()+"put java");
                synchronousQueue.put("java");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"入隊執行緒").start();

        new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName()+"take hello");
                System.out.println(synchronousQueue.take());
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName()+"take world");
                System.out.println(synchronousQueue.take());
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName()+"take java");
                System.out.println(synchronousQueue.take());
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"出隊執行緒").start();
    }
}

image-20220115174848718

CAS

  • CAS:Compare And Swap,直譯為比較並交換;CAS是CPU並發原語,由CPU實現;通過比較當前記憶體中的值和主記憶體中的值,如果這個值是期望的,那麼則執行,如果不是就一直循環下去。

  • CAS也稱為自旋鎖在一個(死)循環【for(;😉】里不斷進行CAS操作,直到成功為止(自旋操作),實際上CAS也是一種樂觀。

  • 缺點

    • 循環會耗時。
    • 一次只能保證一個共享變數的原子性。
    • ABA問題
package cn.itxs.cas;

import java.util.concurrent.atomic.AtomicInteger;

public class CASMain {
    public static void main(String[] args) {
        AtomicInteger atomicInteger = new AtomicInteger(100);
        System.out.println(atomicInteger.getAndIncrement()); //原子遞增
        System.out.println(atomicInteger.get());
        //如果我期望的值達到了,那麼就更新,否則,就不更新
        System.out.println(atomicInteger.compareAndSet(101, 200));
        System.out.println(atomicInteger.get());
        System.out.println(atomicInteger.compareAndSet(101, 300));
        System.out.println(atomicInteger.get());
        System.out.println("ABA-----");
        System.out.println(atomicInteger.compareAndSet(200, 300));
        System.out.println(atomicInteger.get());
        System.out.println(atomicInteger.compareAndSet(300, 200));
        System.out.println(atomicInteger.get());
        System.out.println(atomicInteger.compareAndSet(200, 0));
        System.out.println(atomicInteger.get());
    }
}

image-20220115233031964

package cn.itxs.cas;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicStampedReference;

public class ARMain {
    public static void main(String[] args) {
        AtomicStampedReference atomicStampedReference = new AtomicStampedReference(101,1);
        new Thread(()->{
            System.out.println(Thread.currentThread().getName() + "版本號為:"+atomicStampedReference.getStamp());
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            atomicStampedReference.compareAndSet(101, 102,
                    atomicStampedReference.getStamp(),
                    atomicStampedReference.getStamp()+1);

            System.out.println(Thread.currentThread().getName() + "版本號為:"+atomicStampedReference.getStamp());

            atomicStampedReference.compareAndSet(102, 101,
                    atomicStampedReference.getStamp(),
                    atomicStampedReference.getStamp()+1);

            System.out.println(Thread.currentThread().getName() + "版本號為:"+atomicStampedReference.getStamp());

        },"A").start();


        //和樂觀鎖的原理相同
        new Thread(()->{
            int stamp = atomicStampedReference.getStamp();
            System.out.println(Thread.currentThread().getName() + "版本號為:"+stamp);

            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(atomicStampedReference.
                    compareAndSet(101, 105,stamp,stamp+1));

            System.out.println(Thread.currentThread().getName() + "版本號為:"+atomicStampedReference.getStamp());
        },"B").start();
    }
}

image-20220115235242439

**本人部落格網站 **IT小神 www.itxiaoshen.com