Java 多執行緒:執行緒池
Java 多執行緒:執行緒池
作者:Grey
原文地址:
工作原理
執行緒池內部是通過隊列結合執行緒實現的,當我們利用執行緒池執行任務時:
-
如果此時執行緒池中的執行緒數量小於
corePoolSize
,即使執行緒池中的執行緒都處於空閑狀態,也要創建新的執行緒來處理被添加的任務。 -
如果此時執行緒池中的執行緒數量等於
corePoolSize
,但是緩衝隊列workQueue
未滿,那麼任務被放入緩衝隊列。 -
如果此時執行緒池中的執行緒數量大於等於
corePoolSize
,緩衝隊列workQueue
已滿,並且執行緒池中的執行緒數量小於maximumPoolSize
,建新的執行緒來處理被添加的任務。 -
如果此時線裎池中的線數量大於
corePoolSize
,快取沖隊列workQueue
已滿, 並且執行緒池中的數量等於maximumPoolSize
,那麼過handler
所指定的策略來處理此任務。 -
當執行緒池中的執行緒數量大於
corePoolSize
時,如果某執行緒空閑時間超過keepAliveTime
, 線將被終止。這樣,執行緒池可以動態的調整池中的執行緒數。
相關配置
corePoolSize
:核心執行緒數
maximumPoolSize
:最大執行緒數 【包括核心執行緒數】
keepAliveTime
:生存時間【執行緒長時間不幹活了,歸還給作業系統,核心執行緒不用歸還,可以指定是否參與歸還過程】
生存時間單位
任務隊列:等待隊列,如果不指定,最大值是Integer.MAX_VALUE
【各種各樣的BlockingQueue
】
執行緒工廠【默認設置優先順序是普通優先順序,非守護執行緒】,最好自定義執行緒名稱,方便回溯
拒絕策略,包括以下四種:
ThreadPoolExecutor.AbortPolicy
:丟棄任務並拋出RejectedExecutionException
異常。
ThreadPoolExecutor.DiscardPolicy
:丟棄任務,但是不拋出異常。
ThreadPoolExecutor.DiscardOldestPolicy
:丟棄隊列最前面的任務,然後重新提交被拒絕的任務
ThreadPoolExecutor.CallerRunsPolicy
:由調用執行緒(提交任務的執行緒)處理該任務
執行流程:先佔滿核心執行緒-> 再佔滿任務隊列-> 再佔滿(最大執行緒數-核心執行緒數)-> 最後執行拒絕策略
一般自定義拒絕策略:將相關資訊保存到redis,kafka,日誌,MySQL記錄 實現RejectedExecutionHandler並重寫rejectedExecution方法
自定義拒絕策略程式碼示例:
package git.snippets.juc;
import java.util.concurrent.*;
/**
* 自定義拒絕策略
*/
public class MyRejectedHandler {
public static void main(String[] args) {
ExecutorService service = new ThreadPoolExecutor(4, 4,
0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(6),
Executors.defaultThreadFactory(),
new MyHandler());
}
static class MyHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//log("r rejected")
//save r kafka mysql redis
//try 3 times
if (executor.getQueue().size() < 10000) {
//try put again();
}
}
}
}
SingleThreadPool
-
保證執行緒按順序執行
-
為什麼要有單執行緒的執行緒池?這個主要是用來做任務隊列和執行緒生命周期管理
-
使用LinkedBlockingQueue作為任務隊列,上界為:Integer.MAX_VALUE(2147483647) 約等於無界。
示例程式碼見:
package git.snippets.juc;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static java.util.concurrent.TimeUnit.SECONDS;
public class SingleThreadPoolUsage {
public static void main(String[] args) throws InterruptedException {
ExecutorService service = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
final int j = i;
service.submit(() -> System.out.println("current thread " + Thread.currentThread() + " " + j));
}
service.shutdown();
service.awaitTermination(60, SECONDS);
}
}
CachedThreadPool
-
corePoolSize:0
-
maxiumPoolSize:Integer.MAX_VALUE(2147483647)
-
keepAliveTime 60秒
-
使用
SynchronousQueue
作為任務隊列 必須馬上執行
使用示例:
package git.snippets.juc;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class CachedThreadPoolUsage {
public static void main(String[] args) throws InterruptedException {
System.out.println("cached thread pool usage...");
ExecutorService service = Executors.newCachedThreadPool();
System.out.println(service);
for (int i = 0; i < 2; i++) {
service.execute(() -> {
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
});
}
System.out.println(service);
TimeUnit.SECONDS.sleep(80);
System.out.println(service);
}
}
FixedThreadPool
-
最大執行緒數等於核心執行緒數
-
使用
LinkedBlockingQueue
作為任務隊列,上界為:Integer.MAX_VALUE(2147483647)
使用示例見:
package git.snippets.juc;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
* 多執行緒和單執行緒計算某個範圍內的所有素數
*/
public class FixedThreadPoolUsage {
public static void main(String[] args) throws InterruptedException, ExecutionException {
long start = System.currentTimeMillis();
getPrime(1, 200000);
long end = System.currentTimeMillis();
System.out.println("use single thread...cost: " + (end - start));
final int cpuCoreNum = 4;
ExecutorService service = Executors.newFixedThreadPool(cpuCoreNum);
MyTask t1 = new MyTask(1, 80000); //1-5 5-10 10-15 15-20
MyTask t2 = new MyTask(80001, 130000);
MyTask t3 = new MyTask(130001, 170000);
MyTask t4 = new MyTask(170001, 200000);
Future<List<Integer>> f1 = service.submit(t1);
Future<List<Integer>> f2 = service.submit(t2);
Future<List<Integer>> f3 = service.submit(t3);
Future<List<Integer>> f4 = service.submit(t4);
System.out.println();
start = System.currentTimeMillis();
f1.get();
f2.get();
f3.get();
f4.get();
end = System.currentTimeMillis();
System.out.println("use fixed thread pool...cost: " + (end - start));
service.shutdown();
service.awaitTermination(1, TimeUnit.MINUTES);
}
static boolean isPrime(int num) {
for (int i = 2; i <= num / 2; i++) {
if (num % i == 0) {
return false;
}
}
return true;
}
static List<Integer> getPrime(int start, int end) {
List<Integer> results = new ArrayList<>();
for (int i = start; i <= end; i++) {
if (isPrime(i)) results.add(i);
}
return results;
}
static class MyTask implements Callable<List<Integer>> {
int startPos, endPos;
MyTask(int s, int e) {
this.startPos = s;
this.endPos = e;
}
@Override
public List<Integer> call() {
List<Integer> r = getPrime(startPos, endPos);
return r;
}
}
}
程式碼說明:本實例演示了多執行緒和單執行緒計算某個範圍內的所有素數。輸出結果如下
use single thread...cost: 1733
use fixed thread pool...cost: 505
ScheduledThreadPool
使用DelayWorkQueue
,包括了如下兩個主要方法
scheduleAtFixedRate()
當前任務執行時間小於間隔時間,每次到點即執行;
當前任務執行時間大於等於間隔時間,任務執行後立即執行下一次任務。相當於連續執行了。
scheduleWithFixedDelay()
每當上次任務執行完畢後,間隔一段時間執行。不管當前任務執行時間大於、等於還是小於間隔時間,執行效果都是一樣的。
使用示例:
package git.snippets.juc;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import static java.util.concurrent.TimeUnit.SECONDS;
public class ScheduleThreadPoolUsage {
static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
public static void main(String[] args) {
test1();
test2();
test3();
}
/**
* 任務執行時間(8s)小於間隔時間(10s)
*/
public static void test1() {
scheduler.scheduleAtFixedRate(() -> {
System.out.println("Start: scheduleAtFixedRate: " + new Date());
try {
Thread.sleep(8000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("End : scheduleAtFixedRate: " + new Date());
}, 0, 10, SECONDS);
}
/**
* 任務執行時間(12s)大於間隔時間(10s)
*/
public static void test2() {
scheduler.scheduleAtFixedRate(() -> {
System.out.println("Start: scheduleAtFixedRate: " + new Date());
try {
Thread.sleep(12000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("End : scheduleAtFixedRate: " + new Date());
}, 0, 10, SECONDS);
}
/**
* 任務執行時間(8s)小於間隔時間(10s)
*/
public static void test3() {
scheduler.scheduleWithFixedDelay(() -> {
System.out.println("Start: scheduleWithFixedDelay: " + new Date());
try {
Thread.sleep(12000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("End : scheduleWithFixedDelay: " + new Date());
}, 0, 10, SECONDS);
}
}
ForkJoinPool
Java SE 1.7 以後新增的執行緒池,包括以下兩個核心類
第一個是:RecursiveAction
它是一種沒有任何返回值的任務。只是做一些工作,比如寫數據到磁碟,然後就退出了。 一個RecursiveAction
可以把自己的工作分割成更小的幾塊, 這樣它們可以由獨立的執行緒或者 CPU 執行。
我們可以通過繼承來實現一個RecursiveAction
。
第二個是:RecursiveTask
它是一種會返回結果的任務。可以將自己的工作分割為若干更小任務,並將這些子任務的執行合併到一個集體結果。 可以有幾個水平的分割和合併。
使用示例:
package git.snippets.juc;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.TimeUnit;
import java.util.stream.LongStream;
/**
* @author <a href="mailto:[email protected]">Grey</a>
* @date 2021/4/25
* @since 1.7
*/
public class ForkJoinPoolUsage implements Calculator {
private ForkJoinPool pool;
public ForkJoinPoolUsage() {
// 也可以使用公用的 ForkJoinPool:
// pool = ForkJoinPool.commonPool()
pool = new ForkJoinPool();
}
public static void useRecursiveAction() throws InterruptedException {
// 創建包含Runtime.getRuntime().availableProcessors()返回值作為個數的並行執行緒的ForkJoinPool
ForkJoinPool forkJoinPool = new ForkJoinPool();
// 提交可分解的PrintTask任務
forkJoinPool.submit(new MyRecursiveAction(0, 1000));
while (!forkJoinPool.isTerminated()) {
forkJoinPool.awaitTermination(2, TimeUnit.SECONDS);
}
// 關閉執行緒池
forkJoinPool.shutdown();
}
public static void useRecursiveTask() {
long[] numbers = LongStream.rangeClosed(1, 1000).toArray();
Calculator calculator = new ForkJoinPoolUsage();
System.out.println(calculator.sumUp(numbers)); // 列印結果500500
}
public static void main(String[] args) throws InterruptedException {
useRecursiveTask();
useRecursiveAction();
}
@Override
public long sumUp(long[] numbers) {
return pool.invoke(new SumTask(numbers, 0, numbers.length - 1));
}
private static class MyRecursiveAction extends RecursiveAction {
/**
* 每個"小任務"最多只列印20個數
*/
private static final int MAX = 20;
private int start;
private int end;
public MyRecursiveAction(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected void compute() {
//當end-start的值小於MAX時,開始列印
if ((end - start) < MAX) {
for (int i = start; i < end; i++) {
System.out.println(Thread.currentThread().getName() + "-i的值" + i);
}
} else {
// 將大任務分解成兩個小任務
int middle = (start + end) / 2;
MyRecursiveAction left = new MyRecursiveAction(start, middle);
MyRecursiveAction right = new MyRecursiveAction(middle, end);
left.fork();
right.fork();
}
}
}
private static class SumTask extends RecursiveTask<Long> {
private long[] numbers;
private int from;
private int to;
public SumTask(long[] numbers, int from, int to) {
this.numbers = numbers;
this.from = from;
this.to = to;
}
@Override
protected Long compute() {
// 當需要計算的數字小於6時,直接計算結果
if (to - from < 6) {
long total = 0;
for (int i = from; i <= to; i++) {
total += numbers[i];
}
return total;
// 否則,把任務一分為二,遞歸計算
} else {
int middle = (from + to) / 2;
SumTask taskLeft = new SumTask(numbers, from, middle);
SumTask taskRight = new SumTask(numbers, middle + 1, to);
taskLeft.fork();
taskRight.fork();
return taskLeft.join() + taskRight.join();
}
}
}
}
interface Calculator {
long sumUp(long[] numbers);
}
此外,Java 的流式 API 底層也是 ForkJoinPool 實現的。
更多內容參考如下兩篇文章
WorkStealingPool
每個執行緒都有單獨的隊列,每個執行緒隊列執行完畢後,就會去其他的執行緒隊列裡面拿過來執行, 底層是
ForkJoinPool
-
Java SE 1.8 新增
-
會自動啟動 CPU 核數個執行緒去執行任務
使用示例:
/**
*
*/
package git.snippets.juc;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @since 1.8
*/
public class WorkStealingPoolUsage {
public static void main(String[] args) throws IOException {
int core = Runtime.getRuntime().availableProcessors();
// 會自動啟動cpu核數個執行緒去執行任務 ,其中第一個是1s執行完畢,其餘都是2s執行完畢,
// 有一個任務會進行等待,當第一個執行完畢後,會再次偷取最後一個任務執行
ExecutorService service = Executors.newWorkStealingPool();
service.execute(new R(1000));
for (int i = 0; i < core; i++) {
service.execute(new R(2000));
}
//由於產生的是精靈執行緒(守護執行緒、後台執行緒),主執行緒不阻塞的話,看不到輸出
System.in.read();
}
static class R implements Runnable {
int time;
R(int t) {
this.time = t;
}
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(time + " " + Thread.currentThread().getName());
}
}
}
CompletableFuture
-
Java SE 1.8 新增
-
anyOf()
可以實現「任意個 CompletableFuture 只要一個成功」,allOf()
可以實現「所有 CompletableFuture 都必須成功」,這些組合操作可以實現非常複雜的非同步流程式控制制。
使用示例:
package git.snippets.juc;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
* 假設你能夠提供一個服務
* 這個服務查詢各大電商網站同一類產品的價格並匯總展示
*/
public class CompletableFutureUsage {
public static void main(String[] args) throws ExecutionException, InterruptedException {
way1();
way2();
}
public static void way1() {
long start = System.currentTimeMillis();
System.out.println("p1 " + priceOfJD());
System.out.println("p2 " + priceOfTB());
System.out.println("p3 " + priceOfTM());
long end = System.currentTimeMillis();
System.out.println("串列執行,耗時(ms):" + (end - start));
}
public static void way2() throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
CompletableFuture<Double> p1 = CompletableFuture.supplyAsync(() -> priceOfJD());
CompletableFuture<Double> p2 = CompletableFuture.supplyAsync(() -> priceOfTB());
CompletableFuture<Double> p3 = CompletableFuture.supplyAsync(() -> priceOfTM());
CompletableFuture.allOf(p1, p2, p3).join();
System.out.println("p1 " + p1.get());
System.out.println("p2 " + p2.get());
System.out.println("p3 " + p3.get());
long end = System.currentTimeMillis();
System.out.println("使用CompletableFuture並行執行,耗時(ms): " + (end - start));
}
private static double priceOfTM() {
delay();
return 1.00;
}
private static double priceOfTB() {
delay();
return 2.00;
}
private static double priceOfJD() {
delay();
return 3.00;
}
private static void delay() {
int time = new Random().nextInt(500);
try {
TimeUnit.MILLISECONDS.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
證明原子操作類比synchronized更高效
示例程式碼如下
package git.snippets.juc;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 證明原子操作類比synchronized更高效
*
* @author <a href="mailto:[email protected]">Grey</a>
* @date 2021/4/26
*/
public class AtomVSSync {
public static void main(String[] args) {
test1();
}
AtomicInteger atomicCount = new AtomicInteger(0);
int count = 0;
final static int TIMES = 80000000;
void m() {
for (int i = 0; i < TIMES; i++) {
atomicCount.incrementAndGet(); //原子操作
}
}
void m2() {
for (int i = 0; i < TIMES; i++) {
synchronized (this) {
count++;
}
}
}
public static void test1() {
AtomVSSync t1 = new AtomVSSync();
AtomVSSync t2 = new AtomVSSync();
long time1 = time(t1::m);
System.out.println("使用原子類得到的結果是:" + t1.atomicCount);
long time2 = time(t2::m2);
System.out.println("使用synchronized得到的結果是:" + t2.count);
System.out.println("使用原子類花費的時間是:" + time1);
System.out.println("使用 synchronized 花費的時間是 :" + time2);
}
private static long time(Runnable runnable) {
List<Thread> threads = new ArrayList<>();
long startTime = System.currentTimeMillis();
for (int i = 0; i < 10; i++) {
threads.add(new Thread(runnable, "thread-" + i));
}
threads.forEach(Thread::start);
threads.forEach(o -> {
try {
o.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long endTime = System.currentTimeMillis();
return endTime - startTime;
}
}
Java SE 11 下運行上述程式碼,程式碼輸出結果如下:
使用原子類得到的結果是:800000000
使用synchronized得到的結果是:800000000
使用原子類花費的時間是:12111
使用 synchronized 花費的時間是 :16471
AtomXXX類可以保證可見性嗎?
可以。
程式碼如下
package git.snippets.juc;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* AtomXXX類可以保證可見性嗎?請寫一個程式來證明
*
* @author <a href="mailto:[email protected]">Grey</a>
* @date 2021/4/26
*/
public class AtomVisible {
public static void main(String[] args) {
test2();
}
AtomicBoolean running = new AtomicBoolean(true);
void m3() {
System.out.println("m1 start");
while (running.get()) { //死循環。只有running=false時,才能執行後面的語句
}
System.out.println("m2 end");
}
public static void test2() {
AtomVisible t = new AtomVisible();
new Thread(t::m3, "t1").start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
t.running.getAndSet(false);
}
}
輸出結果
m1 start
m2 end
寫一個程式證明AtomXXX類的多個方法並不構成原子性
package git.snippets.juc;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 寫一個程式證明AtomXXX類的多個方法並不構成原子性
*
* @author <a href="mailto:[email protected]">Grey</a>
* @date 2021/4/26
*/
public class MultiAtomMethod {
public static void main(String[] args) {
test3();
}
AtomicInteger count = new AtomicInteger(0);
void m4() {
for (int i = 0; i < 10000; i++) {
if (count.get() < 999 && count.get() >= 0) { //如果未加鎖,之間還會有其他執行緒插進來
count.incrementAndGet();
}
}
}
public static void test3() {
MultiAtomMethod t = new MultiAtomMethod();
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < 100; i++) {
threads.add(new Thread(t::m4, "thread" + i));
}
threads.forEach(Thread::start);
threads.forEach((o) -> {
try {
//join()方法阻塞調用此方法的執行緒,直到執行緒t完成,此執行緒再繼續。通常用於在main()主執行緒內,等待其它執行緒完成再結束main()主執行緒。
o.join(); //相當於在main執行緒中同步o執行緒,o執行完了,main執行緒才有執行的機會
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println(t.count);
}
}
說明
本文涉及到的所有程式碼和圖例
更多內容見:Java 多執行緒
參考資料
理解ScheduledExecutorService中scheduleAtFixedRate和scheduleWithFixedDelay的區別