java並發編程基礎

公號二維碼

本文是筆者閱讀《java並發編程藝術》一書的筆記中的一部分,筆者將所有筆記已經整理成了一本gitbook電子書(還在完善中),閱讀體驗可能會好一些,像本文這樣的長文是很難讀下去的,可能會收藏,但是從來不看。若有需要可關注微信公眾號大雄和你一起學編程並在後台回復我愛java領取(ps:沒辦法,希望儘快達到500粉絲,開個流量主看能不能賺點錢,所以出此下策,非常抱歉。實在不想關注又想看看這個筆記的朋友,可以看文末給出的鏈接。)

內容簡介

本文比較長,主要介紹 線程的基本概念和意義、多線程程序開發需要注意的問題、創建線程的方式、線程同步、線程通信、線程的生命周期、原子類等內容。

這些內容基本都是來自《java並發編程藝術》一書,在此感謝,我是在微信讀書免費看的,所以算是白嫖了。部分源碼的解讀是筆者自己從jdk源碼扒下來的。


線程的定義與意義

線程的定義

  • 是輕量級的進程,線程的創建和切換成本比進程低
  • 同一進程中的多條線程將共享該進程中的全部系統資源,如虛擬地址空間,文件描述符和信號處理等等
  • 是操作系統能夠進行運算調度的最小單位
  • java程序至少有一個線程main,main線程由JVM創建

為什麼要有多線程

  • 可以充分利用多處理器核心
  • 更快的響應時間,可以將數據一致性要求不強的工作交給別的線程做
  • 更好的編程模型,例如可以使用生產者消費者模型進行解耦

並發編程需要注意的問題

上下文切換

cpu通過時間分片來執行任務,多個線程在cpu上爭搶時間片執行,線程切換需要保存一些狀態,再次切換回去需要恢復狀態,此為上下文切換成本。

因此並不是線程越多越快,頻繁的切換會損失性能

減少上下文切換的方法:

  • 無鎖並發編程:例如把一堆數據分為幾塊,交給不同線程執行,避免用鎖
  • 使用CAS:用自旋不用鎖可以減少線程競爭切換,但是可能會更加耗cpu
  • 使用最少的線程
  • 使用協程:在一個線程里執行多個任務

死鎖

死鎖就是線程之間因爭奪資源, 處理不當出現的相互等待現象

避免死鎖的方法:

  • 避免一個線程同時獲取多個鎖
  • 避免一個線程在鎖內同時佔用多個資源,盡量保證每個鎖只佔用一個資源
  • 嘗試使用定時鎖,lock.tryLock(timeout)
  • 對於數據庫鎖,加鎖和解鎖必須在一個數據庫連接里,否則會出現解鎖失敗的情況

資源限制

程序的執行需要資源,比如數據庫連接、帶寬,可能會由於資源的限制,多個線程並不是並發,而是串行,不僅無優勢,反而帶來不必要的上下文切換損耗

常見資源限制

  • 硬件資源限制
    • 帶寬
    • 磁盤讀寫速度
    • cpu處理速度
  • 軟件資源限制
    • 數據庫連接數
    • socket連接數

應對資源限制

  • 集群化,增加資源
  • 根據不同的資源限制調整程序的並發度,找到瓶頸,把瓶頸資源搞多一些,或者根據這個瓶頸調整線程數

創建線程的三種方式

廢話不說,直接上代碼

繼承Thread類

// 繼承Thread
class MyThread extends Thread {
    // 重寫run方法執行任務
    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            // 可以通過this拿到當前線程
            System.out.println(this.getName()+"執行了"+i);
        }
    }
}

public class Demo_02_02_1_ThreadCreateWays {
    public static void main(String[] args) {
        // 先new出來,然後啟動
        MyThread myThread = new MyThread();
        myThread.start();
        for (int i = 0; i < 10; i++) {
            // 通過Thread的靜態方法拿到當前線程
            System.out.println(Thread.currentThread().getName()+"執行了"+i);
        }
    }
}

實現Runnable

// 實現Runnable接口
class MyThreadByRunnable implements Runnable {

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            // 不能用this了
            System.out.println(Thread.currentThread().getName() + "執行了" + i);
        }
    }
}

public class Demo_02_02_1_ThreadCreateWays {
    public static void main(String[] args) {
        // 實現Runnable接口的方式啟動線程
        Thread thread = new Thread(new MyThreadByRunnable());
        thread.start();
        for (int i = 0; i < 10; i++) {
            // 通過Thread的靜態方法拿到當前線程
            System.out.println(Thread.currentThread().getName() + "執行了" + i);
        }
    }
}

因為Runnable是函數式接口,用lamba也可以

new Thread(() -> {
    System.out.println("Runnable是函數式接口, java8也可以使用lamba");
}).start();

使用Callable和Future

// 使用Callable
class MyThreadByCallable implements Callable<Integer> {

    @Override
    public Integer call() throws Exception {
        int sum = 0;
        for (int i = 0; i < 10; i++) {
            System.out.println(Thread.currentThread().getName()+"執行了"+i);
            sum+=i;
        }
        return sum;
    }
}
public class Demo_02_02_1_ThreadCreateWays {
    public static void main(String[] args) {
        // 用FutureTask包一層
        FutureTask<Integer> futureTask = new FutureTask<>(new MyThreadByCallable());
        new Thread(futureTask).start();
        try {
            // 調用futureTask的get能拿到返回的值
            System.out.println(futureTask.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}

這是最複雜的一種方式,他可以有返回值,歸納一下步驟:

  1. 搞一個類實現Callable接口,重寫call方法,在call執行任務
  2. FutureTask包裝實現Callable接口類的實例
  3. FutureTask的實例作為Thread構造參數
  4. 調用FutureTask實例的get拿到返回值,調這一句會阻塞父線程

Callable也是函數式接口,所以也能用lamba

為啥Thread構造裡邊能放Runnable,也能放FutureTask? 其實FutureTask繼承RunnableFuture,而RunnableFuture繼承Runnable和Future,所以FutureTask也是Runnable

三種方式比較

方式 使用簡易程度 是否可以共享任務代碼 是否可以有返回值 是否可以聲明拋出異常 是否可以再繼承別的類
繼承Thread 簡單 不能 不能 不能 不能
Runnable 中等 可以 不能 不能 可以
Callable 複雜 可以 可以 可以 可以

繼承Thread是最容易的,但是也是最不靈活的

使用Callable時最複雜的,但是也是最靈活的

這裡說的共享任務代碼舉個例子:

還是上面那個MyThreadByRunnable

MyThreadByRunnable myThreadByRunnable = new MyThreadByRunnable();
Thread thread = new Thread(myThreadByRunnable);
thread.start();
// 再來一個,復用了任務代碼,繼承Thread就不行
Thread thread2 = new Thread(myThreadByRunnable);
thread2.start();

線程的一些屬性

名字

給以給線程取一個響亮的名字,便於排查問題,默認為Thread-${一個數字}這個樣子

  • 設置名字
threadA.setName("歡迎關注微信公號'大雄和你一起學編程'");
  • 獲取名字
threadA.getName();

是否是守護線程(daemon)

為其他線程服務的線程可以是守護線程,守護線程的特點是如果所有的前台線程死亡,則守護線程自動死亡。

非守護線程創建的線程默認為非守護線程,守護線程創建的則默認為守護

  • set
threadA.setDaemon(true);
  • get
threadA.isDaemon();

線程優先級(priority)

優先級高的線程可以得到更多cpu資源, 級別是1-10,默認優先級和創建他的父線程相同,main是5

set

threadA.setPriority(Thread.NORM_PRIORITY);

get

threadA.getPriority()

所屬線程組

可以把線程放到組裡,一起管理

設置線程組

Thread的構造裡邊可以指定

ThreadGroup threadGroup = new ThreadGroup("歡迎關注微信公號'大雄和你一起學編程'");
Thread thread = new Thread(threadGroup, () -> {
    System.out.println("歡迎關注微信公號'大雄和你一起學編程'");
});

拿到線程組

thread.getThreadGroup()

基於線程組的操作

ThreadGroup threadGroup1 = thread.getThreadGroup();
System.out.println(threadGroup1.activeCount()); // 有多少活的線程
threadGroup1.interrupt();                       // 中斷組裡所有線程
threadGroup1.setMaxPriority(10);                // 設置線程最高優先級是多少

線程同步

多個線程訪問同一個資源可能會導致結果的不確定性,因此有時需要控制只有一個線程訪問共享資源,此為線程同步。

一個是可以使用synchronized同步,一個是可以使用Lock。synchronized是也是隱式的鎖。

同步方法

class Account {
    private Integer total;

    public Account(int total) {
        this.total = total;
    }

    public synchronized void draw(int money) {
        if (total >= money) {
            this.total = this.total - money;
            System.out.println(Thread.currentThread().getName() + "剩下" + this.total);
        } else {
            System.out.println(Thread.currentThread().getName() + "不夠了");
        }
    }

    public synchronized int getTotal() {
        return total;
    }
}

public class Demo_02_04_1_ThreadSync {
    public static void main(String[] args) {
        Account account = new Account(100);
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                while (account.getTotal() >= 10) {
                    account.draw(10);
                    try {
                        Thread.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        Thread A = new Thread(runnable);
        A.setName("A");
        Thread B = new Thread(runnable);
        B.setName("B");
        A.start();
        B.start();
    }
}

假設AB兩個人從同一個賬戶里取錢,直接在draw這個方法加synchronized關鍵字,防止兩個人同時進入draw

sychronized加在普通方法上,鎖為當前實例對象

加在靜態方法上,鎖為當前類的Class

同步代碼塊

public  void draw(int money) {
    synchronized (total) {
        if (total >= money) {
            this.total = this.total - money;
            System.out.println(Thread.currentThread().getName() + "剩下" + this.total);
        } else {
            System.out.println(Thread.currentThread().getName() + "不夠了");
        }
    }
}

synchronized同步塊,鎖為()裡邊的對象

Lock lock = new ReentrantLock();
public void draw(int money) {
    lock.lock();
    try {
        if (total >= money) {
            this.total = this.total - money;
            System.out.println(Thread.currentThread().getName() + "剩下" + this.total);
        } else {
            System.out.println(Thread.currentThread().getName() + "不夠了");
        }
    } finally {
        lock.unlock();
    }
}

使用比較簡單,進方法加鎖,執行完釋放,後面會專門發一篇文章介紹鎖,包括AQS之類的東西,敬請關注。


線程間的通信

線程之間協調工作的方式

基於等待通知模型的通信

等待/通知的相關方法是任意Java對象都具備的,因為這些方法被定義在java.lang.Object上。

相關API

  • notify: 通知一個對象上等待的線程,使其從wait方法返回,而返回的前提是該線程獲取到了對象的鎖
  • notifyAll: 通知對象上所有等待的線程,使其從wait方法返回
  • wait: 使線程進入WAITING(後麵線程的生命周期裡邊有)狀態,只有等待另一個線程通知或者被中斷才返回,需要注意的是,調用wait方法後需要釋放對象的鎖
  • wait(long): 和wait類似,加入了超時時間,超時了還沒被通知就直接返回
  • wait(long, int): 納秒級,不常用

一些需要注意的點:

  • 使用wait()、notify()和notifyAll()時需要先對調用對象加鎖
  • 調用wait()方法後,線程狀態由RUNNING變為WAITING,並將當前線程放置到對象的等待隊列,釋放鎖
  • notify()或notifyAll()方法調用後,等待線程不會立即從wait()返回,需要調用notify()或notifAll()的線程釋放鎖之後,等待線程才有機會從wait()返回。
  • notify()方法將等待隊列中的一個等待線程從等待隊列中移到同步隊列中,而notifyAll()方法則是將等待隊列中所有的線程全部移到同步隊列,被移動的線程狀態由WAITING變為BLOCKED。
  • 從wait()方法返回的前提是獲得了調用對象的鎖。

關於等待隊列和同步隊列

  • 同步隊列(鎖池):假設線程A已經擁有了某個對象(注意:不是類)的鎖,而其它的線程想要調用這個對象的某個synchronized方法(或者synchronized塊),由於這些線程在進入對象的synchronized方法之前必須先獲得該對象的鎖的擁有權,但是該對象的鎖目前正被線程A擁有,所以這些線程就進入了該對象的同步隊列(鎖池)中,這些線程狀態為Blocked
  • 等待隊列(等待池):假設一個線程A調用了某個對象的wait()方法,線程A就會釋放該對象的鎖(因為wait()方法必須出現在synchronized中,這樣自然在執行wait()方法之前線程A就已經擁有了該對象的鎖),同時 線程A就進入到了該對象的等待隊列(等待池)中,此時線程A狀態為Waiting。如果另外的一個線程調用了相同對象的notifyAll()方法,那麼 處於該對象的等待池中的線程就會全部進入該對象的同步隊列(鎖池)中,準備爭奪鎖的擁有權。如果另外的一個線程調用了相同對象的notify()方法,那麼 僅僅有一個處於該對象的等待池中的線程(隨機)會進入該對象的同步隊列(鎖池)。

以上來自啃碎並發(二):Java線程的生命周期

等待通知模型的示例

class WaitNotifyModel {
    Object lock = new Object();
    boolean flag = false;

    public void start() {
        Thread A = new Thread(() -> {
            synchronized (lock) {
                while (!flag) {
                    try {
                        System.out.println(Thread.currentThread().getName()+":等待通知");
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println(Thread.currentThread().getName()+ ":收到通知,處理業務邏輯");
            }
        });
        A.setName("我是等待者");
        Thread B = new Thread(() -> {
            synchronized (lock) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                flag = true;
                System.out.println(Thread.currentThread().getName()+":發出通知");
                lock.notify();
            }
        });
        B.setName("通知者");
        A.start();
        B.start();
    }
}

模型歸納

等待者

 synchronized (對象) {
    while (不滿足條件) {
        對象.wait()
    }
    處理業務邏輯
}

通知者

synchronized (對象) {
    改變條件
    對象.notify();
}

基於Condition的通信

上述的這種等待通知需要使用synchronized, 如果使用Lock的話就要用Condition

Condition接口也提供了類似Object的監視器方法,與Lock配合可以實現等待/通知模式

Condition與Object監視器的區別

項目 Object的監視器方法 Condition
前置條件 獲得對象的鎖 Lock.lock()獲取鎖
Lock.newCondition()獲取Condition
調用方式 obj.wait() condition.await()
等待隊列個數 一個 可以多個
當前線程釋放鎖並進入等待狀態 支持 支持
等待狀態中不響應中斷 不支持 支持
釋放鎖進入超時等待狀態 支持 支持
進入等待狀態到將來的某個時間 不支持 支持
喚醒等待中的一個或多個線程 支持 notify notifyAll 支持signal signalAll

這裡有一些線程的狀態,可以看完後邊的線程的生命周期再回過頭看看

示例

一般都會將Condition對象作為成員變量。當調用await()方法後,當前線程會釋放鎖並在此等待,而其他線程調用Condition對象的signal()方法,通知當前線程後,當前線程才從await()方法返回,並且在返回前已經獲取了鎖。

實現一個有界隊列,當隊列為空時阻塞消費線程,當隊列滿時阻塞生產線程

class BoundList<T> {
    private LinkedList<T> list;
    private int size;
    private Lock lock = new ReentrantLock();
    // 拿兩個condition,一個是非空,一個是不滿
    private Condition notEmpty = lock.newCondition();
    private Condition notFullCondition = lock.newCondition();

    public BoundList(int size) {
        this.size = size;
        list = new LinkedList<>();
    }

    public void push(T x) throws InterruptedException {
        lock.lock();
        try {
            while (list.size() >= size) {
                // 滿了就等待
                notFullCondition.await();
            }
            list.push(x);
            // 喚醒等待的消費者
            notEmpty.signalAll();
            
        } finally {
            lock.unlock();
        }
    }

    public T get() throws InterruptedException {
        lock.lock();
        try {
            while (list.isEmpty()) {
                // 空了就等
                notEmpty.await();
            }
            T x = list.poll();
            // 喚醒生產者
            notFullCondition.signalAll();
            return x;
        } finally {
            lock.unlock();
        }
    }

}

public class Demo_02_05_1_Condition {
    public static void main(String[] args) {
        BoundList<Integer> list = new BoundList<>(10);
        // 生產數據的線程
        new Thread(() -> {
            for (int i = 0; i < 20; i++) {
                try {
                    Thread.sleep(1000);
                    list.push(i);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        
        // 消費數據的線程
        new Thread(() -> {
            for (int i = 0; i < 20; i++) {
                try {
                    System.out.println(list.get());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

基於BlockingQueue實現線程通信

後面會專門發文介紹BlockingQueue, 敬請關注


控制線程

參考了《瘋狂java講義》的提法,將如下內容歸為控制線程的方式。

join

主線程join一個線程,那麼主線程會阻塞直到join進來的線程執行完,主線程繼續執行, join如果帶超時時間的話,那麼如果超時的話主線程也會不再等join進去的線程而繼續執行.

join實際就是判斷join進來的線程存活狀態,如果活着就調用wait(0),如果帶超時時間了的話,wait裡邊的時間會算出來

while (isAlive()) {
    wait(0);
}

API

  • public final void join() throws InterruptedException
  • public final synchronized void join(long millis, int nanos)
  • public final synchronized void join(long millis)

例子

public class Demo_02_06_1_join extends Thread {
    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            System.out.println(this.getName() + "  " + i);
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Demo_02_06_1_join joinThread = new Demo_02_06_1_join();
        for (int i = 0; i < 100; i++) {

            if (i == 10) {
                joinThread.start();
                joinThread.join();
            }
            // 打到9就停了,然後執行joinThread這裡邊的代碼,完事繼續從10打
            System.out.println(Thread.currentThread().getName()+"  "+i);
        }
    }
}

sleep

睡覺方法,使得線程暫停一段時間,進入阻塞狀態。

API

  • public static native void sleep(long millis) throws InterruptedException
  • public static void sleep(long millis, int nanos) throws InterruptedException

示例

public class Demo_02_06_2_sleep extends Thread {
    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            if (i == 5) {
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
            // 輸出到4停止, 5秒後繼續
            System.out.println(this.getName() + "  " + i);
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Demo_02_06_2_sleep sleepThread = new Demo_02_06_2_sleep();
        sleepThread.start();
    }
}

yield

也是讓線程暫停一下,但是是進入就緒狀態,讓系統重新開始一次新的調度過程,下一次可能運氣好被yield的線程又被選中。

Thread.yield()

中斷

Java中斷機制是一種協作機制,也就是說通過中斷並不能直接終止另一個線程,而需要被中斷的線程自己處理中斷。

前面有一些方法聲明了InterruptedException, 這意味者他們可以被中斷,中斷後把異常拋給調用方,讓調用方自己處理.

被中斷的線程可以自已處理中斷,也可以不處理或者拋出去。

public class Demo_02_06_3_interrupt extends Thread {

    static class MyCallable implements Callable {
        @Override
        public Integer call() throws InterruptedException {
            for (int i = 0; i < 5000; i++) {
                if (Thread.currentThread().isInterrupted()) {
                    System.out.println("3333");
                    throw new InterruptedException("中斷我幹嘛,關注 微信號 大雄和你一起學編程 呀");
                }
            }
            return 0;
        }
    }
    public static void main(String[] args) throws InterruptedException {
        FutureTask<Integer> futureTask = new FutureTask<>(new MyCallable());
        Thread thread = new Thread(futureTask);
        thread.start();
        for (int i = 0; i < 100; i++) {
            if (i == 3) {
                thread.interrupt();
            }
        }
        try {
            futureTask.get();
        } catch (ExecutionException e) {
            // 這裡會捕獲到異常
            e.printStackTrace();
        }

    }
}

線程的生命周期

啃碎並發(二):Java線程的生命周期 這篇文章寫的非常好,建議看一下。

要是早點發現這篇文章的話,大雄也不用費勁在《java並發編程藝術》和《瘋狂java講義》以及各種博客找資料了。

這裡我只想把這篇文章里一個圖改一下貼到這裡,細節部分大家可以參考上述這篇文章。

還是先說兩嘴,這個生命周期的圖我找到了不少版本,不僅圖的形式不一樣,裡邊的內容也有些出入

  • 《瘋狂java講義》裡邊只有5中狀態,缺少WAITING和TIMED_WAITING
  • 《java並發編程藝術》裡邊有7中狀態
  • 上邊的那篇文章,文字描述有7中狀態,但是圖裡邊只有6種

大雄也懵了,遂在源碼找到了如下一個枚舉, 裏面有一些注釋,翻譯了一下。

 public enum State {
        // 表示沒有開始的線程
        NEW,

        // 表示可運行(大家的翻譯應該是就緒)的線程
        // 表示在JVM正在運行,但是他可能需要等操作系統分配資源
        // 比如CPU
        RUNNABLE,

         // 表示線程在等待監視器鎖
         // 表示正在等待監視器鎖以便重新進進入同步塊或者同步方法 
         // OR 在調用了Object.wait重新進入同步塊或者同步方法
        BLOCKED,

         // 調用如下方法之一會進入WAITING
         // 1. Object.wait() 沒有加超時參數
         // 2. 調用join() 沒有加超時參數
         // 3. 調用LockSupport.park()
         // WAITING狀態的線程在等待別的線程做一個特殊的事情(action)例如
         // 1. 調用了wait的在等待其他線程調用notify或者notifyAll
         // 2. 調用了join的在等待指定線程結束
        WAITING,

         // 就是有一個特定等待時間的線程
         // 加上一個特定的正的超時時間調用如下方法會進入此狀態
         // 1. Thread.sleep
         // 2. Thread.join(long)
         // 3. LockSupport.parkNanos
         // 4. LockSupport.parkUntil
        TIMED_WAITING,

        // 執行完了結束的狀態
        TERMINATED;
    }

對於一個擁有8級英語水品的6級沒過的人來說,這段翻譯太難了,但是翻譯出來感覺很清晰了。

應該是 7種狀態!!!

大雄不去具體研究狀態的流轉了,直接參考一些資料及上述翻譯,搞一個前無古人、後有來者的線程生命周期圖

線程的生命周期

這個圖八成、沒準、大概是沒有太大問題的。此圖中,原諒色是線程狀態,紫色是引起狀態變化的原因。


ThereadLocal

就是綁定到線程上邊的一個存東西的地方。

使用示例

class Profiler {
    // ThreadLocal的創建
    private static ThreadLocal<Long> threadLocal = new ThreadLocal<Long>(){
        @Override
        protected Long initialValue() {
            return System.currentTimeMillis();
        }

    };

    // 記錄開始時間
    public static void begin() {
        threadLocal.set(System.currentTimeMillis());
    }

    // 記錄耗時
    public static Long end() {
        return System.currentTimeMillis() - threadLocal.get();
    }
}
public class Demo_02_08_1_ThreadLocal {
    public static void main(String[] args) {
        new Thread(() -> {
            Profiler.begin();
            long sum = 1;
            for (int i = 1; i < 20; i++) {
                sum*=i;
            }
            System.out.println(sum);
            System.out.println(Thread.currentThread().getName()+"耗時="+Profiler.end());
        }).start();

        new Thread(() -> {
            Profiler.begin();
            int sum = 1;
            for (int i = 1; i < 1000; i++) {
                sum+=i;
                try {
                    Thread.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println(sum);
            System.out.println(Thread.currentThread().getName()+"耗時="+Profiler.end());
        }).start();
    }
}

InheritableThreadLocal

這種ThreadLocal可以從父線程傳到子線程,也就是子線程能訪問父線程中的InheritableThreadLocal

public class Demo_02_08_2_ThreadLocalInherit {
    static class TestThreadLocalInherit extends Thread{
        @Override
        public void run() {
            System.out.println(threadLocal.get()); // null 
            System.out.println(inheritableThreadLocal.get()); // 歡迎關注微信公眾號 大雄和你一起學編程
        }
    }

    public static ThreadLocal<Object> threadLocal = new ThreadLocal<Object>();
    public static InheritableThreadLocal<Object> inheritableThreadLocal = new InheritableThreadLocal<>();
    public static void main(String[] args) {
        inheritableThreadLocal.set("歡迎關注微信公眾號 大雄和你一起學編程");
        threadLocal.set("ddd");
        new TestThreadLocalInherit().start();
    }
}

實現原理

很容易想到,因為這個東西是跟着線程走的,所以應該是線程的一個屬性,事實上也是這樣,ThreadLocal和InheritableThreadLocal都是存儲在Thread裏面的。

/* ThreadLocal values pertaining to this thread. This map is maintained
 * by the ThreadLocal class. */
ThreadLocal.ThreadLocalMap threadLocals = null;

/*
 * InheritableThreadLocal values pertaining to this thread. This map is
 * maintained by the InheritableThreadLocal class.
 */
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;

上邊這個就是Thread的兩個成員變量,其實兩個是一樣的類型。

ThreadLocalMap是ThreadLocal的內部類,他裡邊是一個用一個Entry數組來存數據的。set時將ThreadLocal作為key,要存的值傳進去,他會對key做一個hash,構建Entry,放到Entry數組裡邊。

// 偽碼
static class ThreadLocalMap {
    // 內部的Entry結構
    static class Entry {...}
    // 存數據的
    private Entry[] table;
    // set
    private void set(ThreadLocal<?> key, Object value) {
        int i = key.threadLocalHashCode & (len-1);
        tab[i] = new Entry(key, value);
    }
    // get
    private Entry getEntry(ThreadLocal<?> key) {
        int i = key.threadLocalHashCode & (table.length - 1);
        Entry e = table[i];
        if (e != null && e.get() == key)
            return e;
        else
            return getEntryAfterMiss(key, i, e);
    }
}

再來看看ThreadLocal的get方法

public T get() {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t); // 這個就是拿到的存在Thread的threadLocals這個變量
    if (map != null) {
        // 這裡就是毫無難度的事情了
        ThreadLocalMap.Entry e = map.getEntry(this);
        if (e != null) {
            @SuppressWarnings("unchecked")
            T result = (T)e.value;
            return result;
        }
    }
    // 這個也很簡單,他會調你重寫的initialValue方法,拿到一個值,set進去並且返回給你
    // 這個也很有趣,一般init在初始化完成,但是他是在你取的時候去調,應該算是一個小小優化吧
    return setInitialValue();
}

再來看看ThreadLocal的set, 超級簡單,不多說

public void set(T value) {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null)
        map.set(this, value);
    else
        createMap(t, value);
}

ThreadLocal看完了,再來瞅瞅InheritableThreadLocals,看看他是怎麼可以從父線程那裡拿東西的

// 繼承了ThreadLocal, 重寫了三個方法
public class InheritableThreadLocal<T> extends ThreadLocal<T> {
    // 這個方法在ThreadLocal是直接拋出一個異常UnsupportedOperationException
    protected T childValue(T parentValue) {
        return parentValue;
    }
    // 超簡單,我們的Map不要threadLocals了,改為inheritableThreadLocals
    ThreadLocalMap getMap(Thread t) {
       return t.inheritableThreadLocals;
    }
    // 同上
    void createMap(Thread t, T firstValue) {
        t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
    }
}

發現他和ThreadLocal長得差不多,就是重寫了三個方法,由此看來關鍵在inheritableThreadLocals是如何傳遞的

直接在Thread裏面搜inheritableThreadLocals

你會發現他是在init方法中賦值的,而init實在Thread的構造方法中調用的

// 這個parent就是 創建這個線程的那個線程,也就是父線程
if (inheritThreadLocals && parent.inheritableThreadLocals != null)
this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);

看來現在得看看ThreadLocal.createInheritedMap這個方法了

// parentMap就是父線程的inheritableThreadLocals
static ThreadLocalMap createInheritedMap(ThreadLocalMap parentMap) {
    return new ThreadLocalMap(parentMap);
}
// 發現很簡單,就是把父線程的東西到自己線程的inheritableThreadLocals裡邊
private ThreadLocalMap(ThreadLocalMap parentMap) {
    Entry[] parentTable = parentMap.table;
    int len = parentTable.length;
    setThreshold(len);
    table = new Entry[len];

    for (int j = 0; j < len; j++) {
        Entry e = parentTable[j];
        if (e != null) {
            @SuppressWarnings("unchecked")
            ThreadLocal<Object> key = (ThreadLocal<Object>) e.get();
            if (key != null) {
                Object value = key.childValue(e.value);
                Entry c = new Entry(key, value);
                int h = key.threadLocalHashCode & (len - 1);
                while (table[h] != null)
                    h = nextIndex(h, len);
                table[h] = c;
                size++;
            }
        }
    }
}

總結一下

ThreadLocal和InheritableThreadLocal是基於在Thread裡邊的兩個變量實現的,這兩個變量類似於一個HashMap的結構ThreadLocalMap,裡邊的Entry key為ThreadLocal, value為你存的值. InheritableThreadLocal的實現主要是在線程創建的時候,如果父線程有inheritableThreadLocal, 會被拷貝到子線程。


原子類

一個簡單的i++操作, 多線程環境下如果i是共享的,這個操作就不是原子的。

為此,java.util.concurrent.atomic這個包下邊提供了一些原子類,這些原子操作類提供了一種用法簡單、性能高效、線程安全地更新一個變量的方式。

atomic包下的類

一個使用的例子

public class Demo_04_01_1_Atomic {
    static class Counter {
        private AtomicInteger atomicInteger = new AtomicInteger(0);
        public int increment() {
            return atomicInteger.getAndIncrement();
        }
        public int get() {
            return atomicInteger.get();
        }
    }
    static class Counter2 {
        private int value = 0;
        public int increment() {
            return value++;
        }
        public int get() {
            return value;
        }
    }

    public static void main(String[] args) throws InterruptedException {
        // 這個用了原子類
        Counter counter = new Counter();
        // 這個沒有用原子類
        Counter2 counter2 = new Counter2();
        for (int i = 0; i < 50; i++) {
            new Thread(() -> {
                for (int j = 0; j < 100; j++) {
                    counter.increment();
                    counter2.increment();
                }
            }).start();
        }
        Thread.sleep(2000);
        System.out.println(counter.get());  // 一定是5000
        System.out.println(counter2.get()); // 可能少於5000
    }
}

超級簡單~

原子類的實現沒細看,貌似是CAS吧


章小結

並發編程基礎-總結

本圖源文件可以在github java-concurrent-programming-art-mini對應章下面找到

參考文獻

相關資源

大雄和你一起學編程

Tags: