ThreadLocal應用及源碼分析

ThreadLocal

基本使用

ThreadLocal 的作用是:提供線程內的局部變量,不同的線程之間不會相互干擾,這種變量在線程的生命周期內起作用,減少同一個線程內多個函數或組件之間一些公共變量傳遞的複雜度,降低耦合性。

方法聲明 描述
ThreadLocal() 創建ThreadLocal對象
public void set( T value) 設置當前線程綁定的局部變量
public T get() 獲取當前線程綁定的局部變量
public void remove() 移除當前線程綁定的局部變量

簡單使用:

public class MyDemo {
    private String content;

    private String getContent() {
        return content;
    }

    private void setContent(String content) {
        this.content = content;
    }

    public static void main(String[] args) {
        MyDemo demo = new MyDemo();
        for (int i = 0; i < 5; i++) {
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
                    demo.setContent(Thread.currentThread().getName() + "的數據");
                    System.out.println("-----------------------");
             		System.out.println(Thread.currentThread().getName() + "--->" + demo.getContent());
                }
            });
            thread.setName("線程" + i);
            thread.start();
        }
    }
}

public class MyDemo {

    private static ThreadLocal<String> tl = new ThreadLocal<>();

    private String content;

    private String getContent() {
        return tl.get();
    }

    private void setContent(String content) {
         tl.set(content);
    }

    public static void main(String[] args) {
        MyDemo demo = new MyDemo();
        for (int i = 0; i < 5; i++) {
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
                    demo.setContent(Thread.currentThread().getName() + "的數據");
                    System.out.println("-----------------------");
                    System.out.println(Thread.currentThread().getName() + "--->" + demo.getContent());
                }
            });
            thread.setName("線程" + i);
            thread.start();
        }
    }
}

這樣可以很好的解決多線程之間數據隔離的問題,用synchronized加鎖也可以實現,但synchronized側重的是多個線程之間訪問資源的同步性,而ThreadLocal側重的是每個線程之間的數據隔離。

synchronized ThreadLocal
原理 同步機制採用’以時間換空間’的方式, 只提供了一份變量,讓不同的線程排隊訪問 ThreadLocal採用’以空間換時間’的方式, 為每一個線程都提供了一份變量的副本,從而實現同時訪問而相不干擾
側重點 多個線程之間訪問資源的同步性 多線程中讓每個線程之間的數據相互隔離

應用場景

涉及到數據傳遞線程隔離的場景,可以考慮用ThreadLocal來解決:轉賬案例,涉及兩個DML操作: 一個轉出,一個轉入。這些操作是需要具備原子性的。所以這裡就需要操作事務,來保證轉出和轉入操作具備原子性。開啟事務的注意兩點:

  • 為了保證所有的操作在一個事務中, 使用的連接必須是同一個: service層開啟事務的connection需要跟dao層訪問數據庫的connection保持一致。
  • 線程並發情況下, 每個線程只能操作各自的 connection。

用ThreadLocal的解決方案:在獲取Connection連接的JdbcUtils工具類加入ThreadLocal,代碼如下:

public class JdbcUtils {
    //ThreadLocal對象 : 將connection綁定在當前線程中
    private static final ThreadLocal<Connection> tl = new ThreadLocal();

    // c3p0 數據庫連接池對象屬性
    private static final ComboPooledDataSource ds = new ComboPooledDataSource();

    // 獲取連接
    public static Connection getConnection() throws SQLException {
        //取出當前線程綁定的connection對象
        Connection conn = tl.get();
        if (conn == null) {
            //如果沒有,則從連接池中取出
            conn = ds.getConnection();
            //再將connection對象綁定到當前線程中
            tl.set(conn);
        }
        return conn;
    }

    //釋放資源
    public static void release(AutoCloseable... ios) {
        for (AutoCloseable io : ios) {
            if (io != null) {
                try {
                    io.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void commitAndClose() {
        try {
            Connection conn = getConnection();
            //提交事務
            conn.commit();
            //解除綁定
            tl.remove();
            //釋放連接
            conn.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }

    public static void rollbackAndClose() {
        try {
            Connection conn = getConnection();
            //回滾事務
            conn.rollback();
            //解除綁定
            tl.remove();
            //釋放連接
            conn.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}

可以看出使用ThreadLocal的好處:

  1. 傳遞數據 : 保存每個線程綁定的數據,在需要的地方可以直接獲取, 避免參數直接傳遞帶來的代碼耦合問題
  2. 線程隔離 : 各線程之間的數據相互隔離卻又具備並發性,避免同步方式帶來的性能損失

ThreadLocal的內部結構

jdk8以前:

jdk8之前使用ThreadLocal來維護一個ThreadLocalMap,以線程作為key

jdk8以後:

jdk8之後使用Thread來維護一個ThreadLocalMap,以ThreadLocal作為key

這樣涉及的好處:

(1) 每個Map存儲的Entry數量就會變少,因為jdk8之前的存儲數量由Thread的數量決定,現在是由ThreadLocal的數量決定。

(2) 當Thread銷毀之後,對應的ThreadLocalMap也會隨之銷毀,能減少內存的使用。

ThreadLocal核心方法的源碼

方法聲明 描述
protected T initialValue() 返回當前線程局部變量的初始值
public void set( T value) 設置當前線程綁定的局部變量
public T get() 獲取當前線程綁定的局部變量
public void remove() 移除當前線程綁定的局部變量

get()

/**
     * 返回當前線程中保存ThreadLocal的值
     * 如果當前線程沒有此ThreadLocal變量,
     * 則它會通過調用{@link #initialValue} 方法進行初始化值
     *
     * @return 返回當前線程對應此ThreadLocal的值
     */
public T get() {
    // 獲取當前線程對象
    Thread t = Thread.currentThread();
    // 獲取此線程對象中維護的ThreadLocalMap對象
    ThreadLocalMap map = getMap(t);
    // 如果此map存在
    if (map != null) {
        // 以當前的ThreadLocal 為 key,調用getEntry獲取對應的存儲實體e
        ThreadLocalMap.Entry e = map.getEntry(this);
        // 找到對應的存儲實體 e 
        if (e != null) {
            @SuppressWarnings("unchecked")
            // 獲取存儲實體 e 對應的 value值
            // 即為我們想要的當前線程對應此ThreadLocal的值
            T result = (T)e.value;
            return result;
        }
    }
    // 如果map不存在,則證明此線程沒有維護的ThreadLocalMap對象
    // 調用setInitialValue進行初始化
    return setInitialValue();
}

/**
     * set的變樣實現,用於初始化值initialValue,
     * 用於代替防止用戶重寫set()方法
     *
     * @return the initial value 初始化後的值
     */
private T setInitialValue() {
    // 調用initialValue獲取初始化的值
    T value = initialValue();
    // 獲取當前線程對象
    Thread t = Thread.currentThread();
    // 獲取此線程對象中維護的ThreadLocalMap對象
    ThreadLocalMap map = getMap(t);
    // 如果此map存在
    if (map != null)
        // 存在則調用map.set設置此實體entry
        map.set(this, value);
    else
        // 1)當前線程Thread 不存在ThreadLocalMap對象
        // 2)則調用createMap進行ThreadLocalMap對象的初始化
        // 3)並將此實體entry作為第一個值存放至ThreadLocalMap中
        createMap(t, value);
    // 返回設置的值value
    return value;
}

/**
     * 獲取當前線程Thread對應維護的ThreadLocalMap 
     * 
     * @param  t the current thread 當前線程
     * @return the map 對應維護的ThreadLocalMap 
     */
ThreadLocalMap getMap(Thread t) {
    return t.threadLocals;
}
/**
     *創建當前線程Thread對應維護的ThreadLocalMap 
     *
     * @param t 當前線程
     * @param firstValue 存放到map中第一個entry的值
     */
void createMap(Thread t, T firstValue) {
    //這裡的this是調用此方法的threadLocal
    t.threadLocals = new ThreadLocalMap(this, firstValue);
}

首先調用Thread.currentThread()方法獲取當前線程對象,然後根據當前線程獲取維護的ThreadLocalMap對象;如果獲取的Map不為空,則在Map中以ThreadLocal的引用作為key,調用getEntry獲取對應的存儲實體,如果Entry不為空,獲取對應的 value值。如果Map為空或者Entry為空,則調用setInitialValue()方法。setInitialValue()方法里,調用initialValue()方法獲取初始化值value,然後判斷當前線程是否有ThreadLocalMap,map存在,調用set設置Entry;map不存在則調用createMap()進行ThreadLocalMap對象的初始化,並將此entry作為第一個值存放至ThreadLocalMap中。

set()

/**
     * 設置當前線程對應的ThreadLocal的值
     *
     * @param value 將要保存在當前線程對應的ThreadLocal的值
     */
public void set(T value) {
    // 獲取當前線程對象
    Thread t = Thread.currentThread();
    // 獲取此線程對象中維護的ThreadLocalMap對象
    ThreadLocalMap map = getMap(t);
    // 如果此map存在
    if (map != null)
        // 存在則調用map.set設置此實體entry
        map.set(this, value);
    else
        // 1)當前線程Thread 不存在ThreadLocalMap對象
        // 2)則調用createMap進行ThreadLocalMap對象的初始化
        // 3)並將此實體entry作為第一個值存放至ThreadLocalMap中
        createMap(t, value);
}

​ A. 首先獲取當前線程,並根據當前線程獲取一個ThreadLocalMap

​ B. 如果獲取的Map不為空,則將參數設置到Map中(當前ThreadLocal的引用作為key)

​ C. 如果Map為空,則調用createMap給該線程創建 Map,並設置初始值

remove()

/**
     * 刪除當前線程中保存的ThreadLocal對應的實體entry
     */
public void remove() {
    // 獲取當前線程對象中維護的ThreadLocalMap對象
    ThreadLocalMap m = getMap(Thread.currentThread());
    // 如果此map存在
    if (m != null)
        // 存在則調用map.remove
        // 以當前ThreadLocal為key刪除對應的實體entry
        m.remove(this);
}

A. 首先獲取當前線程,並根據當前線程獲取一個ThreadLocalMap

B. 如果獲取的Map不為空,則移除當前ThreadLocal對象對應的entry

initialValue()

protected T initialValue() {
    return null;
}

(1) 這個方法是一個延遲調用方法,在set方法還未調用而先調用了get方法時才執行,並且僅執行1次。

(2)這個方法直接返回一個null

(3)如果想要一個除null之外的初始值,可以重寫此方法。(備註: 該方法是一個protected的方法,顯然是為了讓子類覆蓋而設計的)

ThreadLocalMap

ThreadLocalMap是ThreadLocal的內部類,沒有實現Map接口,用獨立的方式實現了Map的功能,其內部的Entry也是獨立實現。

  1. 成員變量

    /**
         * 初始容量 —— 必須是2的整次冪
         */
        private static final int INITIAL_CAPACITY = 16;
    
        /**
         * 存放數據的table
         * 同樣,數組長度必須是2的冪。
         */
        private Entry[] table;
    
        /**
         * 數組裏面entrys的個數,可以用於判斷table當前使用量是否超過負載因子。
         */
        private int size = 0;
    
        /**
         * 進行擴容的閾值,表使用量大於它的時候進行擴容。
         */
        private int threshold; // Default to 0
        
        /**
         * 閾值設置為長度的2/3
         */
        private void setThreshold(int len) {
            threshold = len * 2 / 3;
        }
    
  2. Entry

    static class Entry extends WeakReference<ThreadLocal> {
        /** The value associated with this ThreadLocal. */
        Object value;
    
        Entry(ThreadLocal k, Object v) {
            super(k);
            value = v;
        }
    }
    

    在ThreadLocalMap中,也是用Entry來保存K-V結構數據的。但是Entry中key只能是ThreadLocal對象,這點被Entry的構造方法已經限定死了;
    另外,Entry繼承WeakReference,使用弱引用,可以將ThreadLocal對象的生命周期和線程生命周期解綁,持有對ThreadLocal的弱引用,可以使得ThreadLocal在沒有其他強引用的時候被回收掉,這樣可以避免因為線程得不到銷毀導致ThreadLocal對象無法被回收

  3. hash衝突的解決

    ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
        //初始化table
        table = new ThreadLocal.ThreadLocalMap.Entry[INITIAL_CAPACITY]; //16
        //計算索引
        int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
        //設置值
        table[i] = new ThreadLocal.ThreadLocalMap.Entry(firstKey, firstValue);
        size = 1;
        //設置閾值
        setThreshold(INITIAL_CAPACITY);
    }
    
    • & (INITIAL_CAPACITY - 1),這是取模的一種方式,對於2的冪取模,用此代替%(2^n),這也就是為啥容量必須為2的冪

    • firstKey.threadLocalHashCode

      private final int threadLocalHashCode = nextHashCode();
          
      private static int nextHashCode() {
          return nextHashCode.getAndAdd(HASH_INCREMENT);
      }
      private static AtomicInteger nextHashCode =  new AtomicInteger();
      
      private static final int HASH_INCREMENT = 0x61c88647;
      

      這裡定義了一個AtomicInteger類型,每次獲取當前值並加上HASH_INCREMENT,HASH_INCREMENT = 0x61c88647,這個值是32位整型上限2^32-1乘以黃金分割比例0.618….的值2654435769,用有符號整型表示就是-1640531527,去掉符號後16進制表示為0x61c88647,目的就是為了讓哈希碼能均勻的分佈在2的n次方的數組Entry[] table中。

    • 線性探測法:

      該方法一次探測下一個地址,直到有空的地址後插入,若整個空間都找不到空餘的地址,則產生溢出。假設當前table長度為16,也就是說如果計算出來key的hash值為14,如果table[14]上已經有值,並且其key與當前key不一致,那麼就發生了hash衝突,這個時候將14加1得到15,取table[15]進行判斷,這個時候如果還是衝突會回到0,取table[0],以此類推,直到可以插入。可以把table看成一個環形數組

       /**
           * 獲取環形數組的下一個索引
           */
          private static int nextIndex(int i, int len) {
              return ((i + 1 < len) ? i + 1 : 0);
          }
      
          /**
           * 獲取環形數組的上一個索引
           */
          private static int prevIndex(int i, int len) {
              return ((i - 1 >= 0) ? i - 1 : len - 1);
          }
      
    • ThreadLocalMap的set():

      private void set(ThreadLocal<?> key, Object value) {
          ThreadLocal.ThreadLocalMap.Entry[] tab = table;
          int len = tab.length;
          //計算索引
          int i = key.threadLocalHashCode & (len-1);
          /**
               * 根據獲取到的索引進行循環,如果當前索引上的table[i]不為空,在沒有return的情況下,
               * 就使用nextIndex()獲取下一個(線性探測法)。
               */
          for (ThreadLocal.ThreadLocalMap.Entry e = tab[i];
               e != null;
               e = tab[i = nextIndex(i, len)]) {
              ThreadLocal<?> k = e.get();
              //table[i]上key不為空,並且和當前key相同,更新value
              if (k == key) {
                  e.value = value;
                  return;
              }
              /**
                   * table[i]上的key為空,說明被回收了
                   * 這個時候table[i]可以重新使用,用新的key-value將其替換,並刪除其他無效的entry
                   */
              if (k == null) {
                  replaceStaleEntry(key, value, i);
                  return;
              }
          }