Java進階專題(十五) 從電商系統角度研究多線程(下)
前言
本章節繼上章節繼續梳理:線程相關的基礎理論和工具、多線程程序下的性能調優和電商場景下多線程的使用。
多線程J·U·C
ThreadLocal
概念
ThreadLocal類並不是用來解決多線程環境下的共享變量問題,而是用來提供線程內部的共享變量。在多線程環境下,可以保證各個線程之間的變量互相隔離、相互獨立。
使用
ThreadLocal實例一般定義為private static類型的,在一個線程內,該變量共享一份,類似上下文作用,可以用來上下傳遞信息。
public class ThreadLocalDemo implements Runnable{
private static ThreadLocal<Integer> threadLocal = new ThreadLocal<>();
public void run(){
for (int i = 0; i < 3; i++) {
threadLocal.set(i);
System.out.println(Thread.currentThread().getName()+",value="+threadLocal.get()
);
}
}
public static void main(String[] args) {
ThreadLocalDemo demo = new ThreadLocalDemo();
new Thread(demo).start();
new Thread(demo).start();
}
}
結果分析:
同一個demo實例,不同的thread嵌套
結果打印了各自的變量值,線程內上下文被傳遞,不同線程間被隔離
應用場景
數據庫連接,session管理
下面的基於日誌平台的訪問鏈路追蹤中,會用到
使用中遇到的坑
參與過一個項目,電商商鋪詳情頁凌晨調度生成。需要上下傳遞shopid,為每個商鋪重新生成一下。在商鋪詳情頁里因為是按麵包屑分片生成,比如商鋪信息、熱賣商品、最多好評、店主推薦、最新上架等。
其他信息全部生成ok,唯獨商品列表多個列表出現問題。經查,在商品部分的查詢中用到了ThreadLocal,造成當前商鋪id丟失。
源碼解析
ThreadLocalMap是ThreadLocal內部類,由ThreadLocal創建,每個Thread里維護一個
ThreadLocal. ThreadLocalMap類型的屬性threadLocals。所有的value值其實是存儲在
ThreadLocalMap中。
這個存儲結構的思路是反轉的…
set方法
public void set(T value) {
//取到當前線程
Thread t = Thread.currentThread();
//從當前線程中拿出Map
ThreadLocalMap map = getMap(t);
if (map != null)
//如果非空,說明之前創建過了
//以當前創建的ThreadLocal對象為key,需要存儲的值為value,寫入Map
//因為每個線程Thread里有自己獨自的Map,所以起到了隔離作用
map.set(this, value);
else
//如果沒有,那就創建
createMap(t, value);
}
get方法
public T get() {
Thread t = Thread.currentThread();
//獲取到當前線程下的Map
ThreadLocalMap map = getMap(t);
if (map != null) {
//如果非空,根據當前ThreadLocal為key,取出對應的value即可
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
//如果map是空的,往往返回一個初始值,這是一個protect方法
//這就是為什麼創建ThreadLocal的時候往往要求實現這個方法
return setInitialValue();
}
remove方法
public void remove() {
ThreadLocalMap m = getMap(Thread.currentThread());
//很簡單,獲取到map後,調用remove移除掉
if (m != null)
m.remove(this);
}
ThreadLocal是如何避免內存泄漏的
在上述的get方法中,Entry類繼承了WeakReference,即每個Entry對象都有一個ThreadLocal的弱引用,GC對於弱引用的對象採取積極的內存回收策略,避免無人搭理時發生內存泄露。
Fork/Join
概念
ForkJoin是由JDK1.7後提供多線並發處理框架。ForkJoinPool由Java大師Doug Lea主持編寫,處理邏輯大概分為兩步。
1.任務分割:Fork(分岔),先把大的任務分割成足夠小的子任務,如果子任務比較大的話還要對子任務進行繼續分割。
2.合併結果:join,分割後的子任務被多個線程執行後,再合併結果,得到最終的完整輸出。
組成
- ForkJoinT ask:主要提供fork和join兩個方法用於任務拆分與合併;多數使用。RecursiveAction(無返回值的任務)和RecursiveT ask(需要返回值)來實現compute方法。
- ForkJoinPool:調度ForkJoinT ask的線程池;
- ForkJoinWorkerThread:Thread的子類,存放於線程池中的工作線程(Worker);
- WorkQueue:任務隊列,用於保存任務;
基本使用
一個典型的例子:計算1-1000的和
public class SumTask {
private static final Integer MAX = 100;
static class SubTask extends RecursiveTask<Integer> {
// 子任務開始計算的值
private Integer start;
// 子任務結束計算的值
private Integer end;
public SubTask(Integer start , Integer end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
if(end - start < MAX) {
//小於邊界,開始計算
System.out.println("start = " + start + ";end = " + end);
Integer totalValue = 0;
for(int index = this.start ; index <= this.end ; index++) {
totalValue += index;
}
return totalValue;
}else {
//否則,中間劈開繼續拆分
SubTask subTask1 = new SubTask(start, (start + end) / 2);
subTask1.fork();
SubTask subTask2 = new SubTask((start + end) / 2 + 1 , end);
subTask2.fork();
return subTask1.join() + subTask2.join();
}
}
}
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
Future<Integer> taskFuture = pool.submit(new SubTask(1,1000));
try {
Integer result = taskFuture.get();
System.out.println("result = " + result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace(System.out);
}
}
}
設計思想
- 普通線程池內部有兩個重要集合:工作線程集合,和任務隊列。
- ForkJoinPool也類似,工作集合里放的是特殊線程ForkJoinWorkerThread,任務隊列里放的是特殊任務ForkJoinTask
- 不同之處在於,普通線程池只有一個隊列。而ForkJoinPool的工作線程ForkJoinWorkerThread每個線程內都綁定一個雙端隊列。
- 在fork的時候,也就是任務拆分,將拆分的task會被當前線程放到自己的隊列中。
- 隊列中的任務被線程執行時,有兩種模式,默認是同步模式(asyncMode==false)從隊尾取任務(LIFO)
- 竊取:當自己隊列中執行完後,工作線程會到其他隊列的隊首獲取任務(FIFO),取到後如果任務再次fork,拆分會被放入當前線程的隊列,依次擴張
注意
使用ForkJoin將相同的計算任務通過多線程執行。但是在使用中需要注意:
注意任務切分的粒度,也就是fork的界限。並非越小越好
判斷要不要使用ForkJoin。任務量不是太大的話,串行可能優於並行。因為多線程會涉及到上下文的切換
Volatile
概念
回顧Java 內存模型中的可見性、原子性和有序性:
- 可見性,是指線程之間的可見性,一個線程修改的狀態對另一個線程是可見的
- 原子性,指的是這個操作是原子不可拆分的,不允許別的線程中間插隊操作
- 有序性指的是你寫的代碼的順序要和最終執行的指令保持一致。因為在Java內存模型中,允許編譯器和處理器對指令進行重排序,重排序過程不會影響到單線程程序的執行,卻會影響到多線程並發執行的正確性。
volatile要解決的就是可見性和有序性問題。
原理
Java內存模型分為主內存和線程工作內存兩大類。
主內存:多個線程共享的內存。方法區和堆屬於主內存區域。
線程工作內存:每個線程獨享的內存。虛擬機棧、本地方法棧、程序計數器屬於線程獨享的工作內存
Java內存模型規定,所有變量都需要存儲在主內存中,線程需要時,在自己的工作內存保存變量的副本,線程對變量的所有操作都在工作內存中進行,執行結束後再同步到主內存中去。這裡必然會存在時間差,在這個時間差內,該線程對副本的操作,對於其他線程是不見的,從而造成了可見性問題。
但是,當對volatile變量進行寫操作的時候,JVM會向處理器發送一條lock前綴的指令,將這個緩存中的變量回寫到系統主存中。
同時,在多處理器下,為了保證各個處理器的緩存是一致的,就會實現緩存一致性協議。每個處理器通過嗅探在總線上傳播的數據來檢查自己緩存的值是不是過期,一旦發現過期就會將當前處理器的緩存行設置成無效狀態,強制從主內存讀取,這就保障了可見性。
而volatile變量,通過內存屏障可以禁止指令重排。從而實現指令的有序性。
注意
volatile不能保證鎖的原子性。
案例:給前面的計數器案例里加上volatile試試
public class BadVolatile {
private static volatile int i=0;
public int get(){
return i;
}
public void inc(){
int j=get();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
i=j+1;
}
public static void main(String[] args) throws InterruptedException {
final BadVolatile counter = new BadVolatile();
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
public void run() {
counter.inc();
}
}).start();
}
Thread.sleep(3000);
//理論上10才對。可是....
System.out.println(counter.i);
}
}
達不到目的。說明原子性無法保障。
ConcurrentHashMap
基本使用
public static void main(String[] args) throws InterruptedException {
//定義ConcurrentHashMap
Map map = new ConcurrentHashMap();
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
@Override
public void run() {
//多線程下的put可以放心使用
map.put(UUID.randomUUID().toString(), "1");
}
}).start();
}
Thread.sleep(3000);
System.out.println(map);
}
原理
jdk1.7是分段鎖,1.8使用的cas+sychronized操作,具體看代碼
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());//計算hash
int binCount = 0;
for (Node<K,V>[] tab = table;;) {//自旋,確保插入成功
Node<K,V> f; int n, i, fh; K fk; V fv;
if (tab == null || (n = tab.length) == 0)
tab = initTable();//表為空的話,初始化表
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//否則,插入元素,看下面的 casTabAt 方法
//cas 在這裡!
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
break;
}
//...
else {
V oldVal = null;
//其他情況下,加鎖保持
//synchronized 在這裡!
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
else if (f instanceof ReservationNode)
throw new IllegalStateException("Recursive update");
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
//compareAndSetObject,比較並插入,典型CAS操作
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSetObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
//get取值
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
//判斷table是不是空的,當前桶上是不是空的
//如果為空,返回null
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
//找到對應hash槽的第一個node,如果key相等,返回value
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
//如果正在擴容,不影響,繼續順着node找即可
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
//其他情況,逐個便利,比對key,找到後返回value
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
總結:
put過程:
1.根據key的hash值定位到桶位置
2.如果table為空if,先初始化table。
3.如果table當前桶里沒有node,cas添加元素。成功則跳出循環,失敗則進入下一輪for循環。
4.判斷是否有其他線程在擴容,有則幫忙擴容,擴容完成再添加元素。
5.如果桶的位置不為空,遍歷該桶的鏈表或者紅黑樹,若key已存在,則覆蓋,不存在則將key插入到鏈表或紅黑樹的尾部。
get過程:
1.根據key的hash值定位到桶位置。
2.map是否初始化,沒有初始化則返回null
3.定位的桶是否有頭結點,沒有返回null
4.是否有其他線程在擴容,有的話調用find方法沿node指針往後查找。擴容與find可以並行,因為node的next指針不會變
5.若沒有其他線程在擴容,則遍歷桶對應的鏈表或者紅黑樹,使用equals方法進行比較。key相同則返回value,不存在則返回null
並發容器
1.ConcurrentHashMap
對應:HashMap
目標:代替Hashtable、synchronizedMap,使用最多,前面詳細介紹過
原理:JDK7中採用Segment分段鎖,JDK8中採用CAS+synchronized
2.CopyOnWriteArrayList
對應:ArrayList
目標:代替Vector、synchronizedList
原理:高並發往往是讀多寫少的特性,讀操作不加鎖,而對寫操作加Lock獨享鎖,先複製一份新的集
合,在新的集合上面修改,然後將新集合賦值給舊的引用,並通過volatile 保證其可見性。
查看源碼:volatile array,lock加鎖,數組複製
3.CopyOnWriteArraySet
對應:HashSet
目標:代替synchronizedSet
原理:與CopyOnWriteArrayList實現原理類似。
4.ConcurrentSkipListMap
對應:TreeMap
目標:代替synchronizedSortedMap(TreeMap)
原理:基於Skip list(跳錶)來代替平衡樹,按照分層key上下鏈接指針來實現。
5.ConcurrentSkipListSet
對應:TreeSet
目標:代替synchronizedSortedSet(TreeSet)
原理:內部基於ConcurrentSkipListMap實現,原理一致
6.ConcurrentLinkedQueue
對應:LinkedList
對應:無界線程安全隊列
原理:通過隊首隊尾指針,以及Node類元素的next實現FIFO隊列
7.BlockingQueue
對應:Queue
特點:拓展了Queue,增加了可阻塞的插入和獲取等操作
原理:通過ReentrantLock實現線程安全,通過Condition實現阻塞和喚醒
實現類:
LinkedBlockingQueue:基於鏈表實現的可阻塞的FIFO隊列
ArrayBlockingQueue:基於數組實現的可阻塞的FIFO隊列
PriorityBlockingQueue:按優先級排序的隊列
性能調優
鎖優化
Synchronized
使用synchronized時注意鎖粒度
public synchronized void test(){
// TODO
}
public void test(){
synchronized (this) {
// TODO
}
}
public static synchronized void test(){
// TODO
}
public static void test(){
synchronized (TestSynchronized.class) {
// TODO
}
}
Lock鎖優化
舉個例子:電商系統中記錄首頁被用戶瀏覽的次數,以及最後一次操作的時間(含讀或寫)。
public class TotalLock {
//類創建的時間
final long start = System.currentTimeMillis();
//總耗時
AtomicLong totalTime = new AtomicLong(0);
//緩存變量
private Map<String,Long> map = new HashMap(){{put("count",0L);}};
ReentrantLock lock = new ReentrantLock();
//查看map被寫入了多少次
public Map read(){
lock.lock();
try {
Thread.currentThread().sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
long end = System.currentTimeMillis();
//最後操作完成的時間
map.put("time",end);
lock.unlock();
System.out.println(Thread.currentThread().getName()+",read="+(endstart));
totalTime.addAndGet(end - start);
return map;
}
//寫入
public Map write(){
lock.lock();
try {
Thread.currentThread().sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
//寫入計數
map.put("count",map.get("count")+1);
long end = System.currentTimeMillis();
map.put("time",end);
lock.unlock();
System.out.println(Thread.currentThread().getName()+",write="+(endstart));
totalTime.addAndGet(end - start);
return map;
}
public static void main(String[] args) throws InterruptedException {
TotalLock count = new TotalLock();
//讀
for (int i = 0; i < 4; i++) {
new Thread(()->{
count.read();
}).start();
}
//寫
for (int i = 0; i < 1; i++) {
new Thread(()->{
count.write();
}).start();
}
Thread.sleep(3000);
System.out.println(count.map);
System.out.println("讀寫總共耗時:"+count.totalTime.get());
}
}
查看後,我們發現查看次數這裡其實是可以並行讀取的,我們關注的業務是寫入次數,也就是count,至於讀取發生的時間time的寫入操作,只是一個put,不需要原子性保障,對這個加互斥鎖沒有必要。改成讀寫鎖試試……
public class ReadAndWrite {
//類創建的時間
final long start = System.currentTimeMillis();
//總耗時
AtomicLong totalTime = new AtomicLong(0);
//緩存變量,注意!因為read並發,這裡換成ConcurrentHashMap
private Map<String,Long> map = new ConcurrentHashMap(){{put("count",0L);}};
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
//查看map被寫入了多少次
public Map read(){
lock.readLock().lock();
try {
Thread.currentThread().sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
long end = System.currentTimeMillis();
//最後操作完成的時間
map.put("time",end);
lock.readLock().unlock();
System.out.println(Thread.currentThread().getName()+",read="+(endstart));
totalTime.addAndGet(end - start);
return map;
}
//寫入
public Map write(){
lock.writeLock().lock();
try {
Thread.currentThread().sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
//寫入計數
map.put("count",map.get("count")+1);
long end = System.currentTimeMillis();
map.put("time",end);
lock.writeLock().unlock();
System.out.println(Thread.currentThread().getName()+",write="+(endstart));
totalTime.addAndGet(end - start);
return map;
}
public static void main(String[] args) throws InterruptedException {
ReadAndWrite rw = new ReadAndWrite();
//讀
for (int i = 0; i < 4; i++) {
new Thread(()->{
rw.read();
}).start();
}
//寫
for (int i = 0; i < 1; i++) {
new Thread(()->{
rw.write();
}).start();
}
Thread.sleep(3000);
System.out.println(rw.map);
System.out.println("讀寫總共耗時:"+rw.totalTime.get());
}
}
再來看讀的時間變化和總執行時間。當read遠大於write時,這個差距會更明顯
CAS樂觀鎖優化
舉例,直接用synchronized
public class NormalSync implements Runnable{
Long start = System.currentTimeMillis();
int i=0;
public synchronized void run() {
int j = i;
//實際業務中可能會有一堆的耗時操作,這裡等待100ms模擬
try {
//做一系列操作
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
//業務結束後,增加計數
i = j+1;
System.out.println(Thread.currentThread().getId()+
" ok,time="+(System.currentTimeMillis() - start));
}
public static void main(String[] args) throws InterruptedException {
NormalSync test = new NormalSync();
new Thread(test).start();
new Thread(test).start();
Thread.currentThread().sleep(1000);
System.out.println("last value="+test.i);
}
}
//線程二最終耗時會在200ms+,總耗時300ms,原因是悲觀鎖卡在了read後的耗時操作上,但是保證了
//最終結果是2
使用cas
public class CasSync implements Runnable{
long start = System.currentTimeMillis();
int i=0;
public void run() {
int j = i;
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
//CAS處理,在這裡理解思想,實際中不推薦大家使用!
try {
Field f = Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
Unsafe unsafe = (Unsafe) f.get(null);
long offset =
unsafe.objectFieldOffset(CasSync.class.getDeclaredField("i"));
while (!unsafe.compareAndSwapInt(this,offset,j,j+1)){
j = i;
}
} catch (Exception e) {
e.printStackTrace();
}
//實際開發中,要用atomic包,或者while+synchronized自旋
// synchronized (this){
// //注意這裡!
// while (j != i){
// j = i;
// }
// i = j+1;
// }
System.out.println(Thread.currentThread().getName()+
" ok,time="+(System.currentTimeMillis() - start));
}
public static void main(String[] args) throws InterruptedException {
CasSync test = new CasSync();
new Thread(test).start();
new Thread(test).start();
Thread.currentThread().sleep(1000);
System.out.println("last value="+test.i);
}
}
//線程一、二均在100ms+,總耗時200ms,最終結果還是2
總結
減少鎖的時間 不需要同步執行的代碼,能不放在同步快裏面執行就不要放在同步快內,可以讓鎖儘快釋放
減少鎖的粒度 將物理上的一個鎖,拆成邏輯上的多個鎖,增加並行度,從而降低鎖競爭,典型如分段鎖
鎖的粒度 拆鎖的粒度不能無限拆,最多可以將一個鎖拆為當前cup數量相等
減少加減鎖的次數 假如有一個循環,循環內的操作需要加鎖,我們應該把鎖放到循環外面,否則每次進出循環,都要加鎖
使用讀寫鎖 業務細分,讀操作加讀鎖,可以並發讀,寫操作使用寫鎖,只能單線程寫,參考計數器案例
善用volatile volatile的控制比synchronized更輕量化,在某些變量上可以加以運用,如單例模式中
線程池參數優化
一些經驗
1)corePoolSize
核心線程數,一旦有任務進來,在core範圍內會立刻創建線程進入工作。所以這個值應該參考業務並發量在絕大多數時間內的並發情況。同時分析任務的特性。
高並發,執行時間短的,要儘可能小的線程數,如配置CPU個數+1,減少線程上下文的切換。因為它不怎麼占時間,讓少量線程快跑幹活。
並發不高、任務執行時間長的要分開看:如果時間都花在了IO上,那就調大,如配置兩倍CPU個數+1。不能讓CPU閑下來,線程多了並行處理更快。如果時間都花在了運算上,運算的任務還很重,本身就很佔cpu,那盡量減少cpu,減少切換時間。參考第一條
如果高並發,執行時間還很長……
2)workQueue
任務隊列,用於傳輸和保存等待執行任務的阻塞隊列。這個需要根據你的業務可接受的等待時間。是一個需要權衡時間還是空間的地方,如果你的機器cpu資源緊張,jvm內存夠大,同時任務又不是那麼緊迫,減少coresize,加大這裡。如果你的cpu不是問題,對內存比較敏感比較害怕內存溢出,同時任務又要求快點響應。那麼減少這裡。
3)maximumPoolSize
線程池最大數量,這個值和隊列要搭配使用,如果你採用了無界隊列,那很大程度上,這個參數沒有意義。同時要注意,隊列盛滿,同時達到max的時候,再來的任務可能會丟失(下面的handler會講)。
如果你的任務波動較大,同時對任務波峰來的時候,實時性要求比較高。也就是來的很突然並且都是着急的。那麼調小隊列,加大這裡。如果你的任務不那麼著急,可以慢慢做,那就扔隊列吧。
隊列與max是一個權衡。隊列空間換時間,多花內存少佔cpu,輕視任務緊迫度。max捨得cpu線程開銷,少佔內存,給任務最快的響應。
4)keepaliveTime
線程存活保持時間,超出該時間後,線程會從max下降到core,很明顯,這個決定了你養閑人所花的代價。如果你不缺cpu,同時任務來的時間沒法琢磨,波峰波谷的間隔比較短。經常性的來一波。那麼實當的延長銷毀時間,避免頻繁創建和銷毀線程帶來的開銷。如果你的任務波峰出現後,很長一段時間不再出現,間隔比較久,那麼要適當調小該值,讓閑着不幹活的線程儘快銷毀,不要佔據資源。
5)threadFactory(自定義展示實例)
線程工廠,用於創建新線程。threadFactory創建的線程也是採用new Thread()方式,threadFactory創建的線程名都具有統一的風格:pool-m-thread-n(m為線程池的編號,n為線程池內的線程編號)。如果需要自己定義線程的某些屬性,如個性化的線程名,可以在這裡動手。一般不需要折騰它。
6)handler
線程飽和策略,當線程池和隊列都滿了,再加入線程會執行此策略。默認不處理的話會扔出異常,打進日誌。這個與任務處理的數據重要程度有關。如果數據是可丟棄的,那不需要額外處理。如果數據極其重要,那需要在這裡採取措施防止數據丟失,如扔消息隊列或者至少詳細打入日誌文件可追蹤。
並發容器選擇
案例一:電商網站中記錄一次活動下各個商品售賣的數量。
場景分析:需要頻繁按商品id做get和set,但是商品id(key)的數量相對穩定不會頻繁增刪
初級方案:選用HashMap,key為商品id,value為商品購買的次數。每次下單取出次數,增加後再寫入
問題:HashMap線程不安全!在多次商品id寫入後,如果發生擴容,在JDK1.7 之前,在並發場景下HashMap 會出現死循環,從而導致CPU 使用率居高不下。JDK1.8 中修復了HashMap 擴容導致的死循環問題,但在高並發場景下,依然會有數據丟失以及不準確的情況出現。
選型:Hashtable 不推薦,鎖太重,選ConcurrentHashMap 確保高並發下多線程的安全性
案例二:在一次活動下,為每個用戶記錄瀏覽商品的歷史和次數。
場景分析:每個用戶各自瀏覽的商品量級非常大,並且每次訪問都要更新次數,頻繁讀寫
初級方案:為確保線程安全,採用上面的思路,ConcurrentHashMap
問題:ConcurrentHashMap 內部機制在數據量大時,會把鏈錶轉換為紅黑樹。而紅黑樹在高並發情況下,刪除和插入過程中有個平衡的過程,會牽涉到大量節點,因此競爭鎖資源的代價相對比較高
選型:用跳錶,ConcurrentSkipListMap將key值分層,逐個切段,增刪效率高於ConcurrentHashMap
結論:如果對數據有強一致要求,則需使用Hashtable;在大部分場景通常都是弱一致性的情況下,使用ConcurrentHashMap 即可;如果數據量級很高,且存在大量增刪改操作,則可以考慮使用
ConcurrentSkipListMap。
案例三:在活動中,創建一個用戶列表,記錄凍結的用戶。一旦凍結,不允許再下單搶購,但是可
以瀏覽。
場景分析:違規被凍結的用戶不會太多,但是絕大多數非凍結用戶每次搶單都要去查一下這個列表。低頻寫,高頻讀。
初級方案:ArrayList記錄要凍結的用戶id
問題:ArrayList對凍結用戶id的插入和讀取操作在高並發時,線程不安全。Vector可以做到線程安全,但並發性能差,鎖太重。
選型:綜合業務場景,選CopyOnWriteArrayList,會佔空間,但是也僅僅發生在添加新凍結用戶的時候。絕大多數的訪問在非凍結用戶的讀取和比對上,不會阻塞。