Java 多執行緒:執行緒池

Java 多執行緒:執行緒池

作者:Grey

原文地址:

部落格園:Java 多執行緒:執行緒池

CSDN:Java 多執行緒:執行緒池

工作原理

執行緒池內部是通過隊列結合執行緒實現的,當我們利用執行緒池執行任務時:

  1. 如果此時執行緒池中的執行緒數量小於corePoolSize,即使執行緒池中的執行緒都處於空閑狀態,也要創建新的執行緒來處理被添加的任務。

  2. 如果此時執行緒池中的執行緒數量等於corePoolSize,但是緩衝隊列workQueue未滿,那麼任務被放入緩衝隊列。

  3. 如果此時執行緒池中的執行緒數量大於等於corePoolSize,緩衝隊列workQueue已滿,並且執行緒池中的執行緒數量小於maximumPoolSize,建新的執行緒來處理被添加的任務。

  4. 如果此時線裎池中的線數量大於corePoolSize,快取沖隊列workQueue已滿, 並且執行緒池中的數量等於maximumPoolSize,那麼過handler所指定的策略來處理此任務。

  5. 當執行緒池中的執行緒數量大於corePoolSize時,如果某執行緒空閑時間超過keepAliveTime, 線將被終止。這樣,執行緒池可以動態的調整池中的執行緒數。

相關配置

corePoolSize:核心執行緒數

maximumPoolSize:最大執行緒數 【包括核心執行緒數】

keepAliveTime:生存時間【執行緒長時間不幹活了,歸還給作業系統,核心執行緒不用歸還,可以指定是否參與歸還過程】

生存時間單位

任務隊列:等待隊列,如果不指定,最大值是Integer.MAX_VALUE【各種各樣的BlockingQueue

執行緒工廠【默認設置優先順序是普通優先順序,非守護執行緒】,最好自定義執行緒名稱,方便回溯

拒絕策略,包括以下四種:

ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。

ThreadPoolExecutor.DiscardPolicy:丟棄任務,但是不拋出異常。

ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然後重新提交被拒絕的任務

ThreadPoolExecutor.CallerRunsPolicy:由調用執行緒(提交任務的執行緒)處理該任務

執行流程:先佔滿核心執行緒-> 再佔滿任務隊列-> 再佔滿(最大執行緒數-核心執行緒數)-> 最後執行拒絕策略
一般自定義拒絕策略:將相關資訊保存到redis,kafka,日誌,MySQL記錄 實現RejectedExecutionHandler並重寫rejectedExecution方法

自定義拒絕策略程式碼示例:

package git.snippets.juc;

import java.util.concurrent.*;

/**
 * 自定義拒絕策略
 */
public class MyRejectedHandler {
    public static void main(String[] args) {
        ExecutorService service = new ThreadPoolExecutor(4, 4,
                0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(6),
                Executors.defaultThreadFactory(),
                new MyHandler());
    }

    static class MyHandler implements RejectedExecutionHandler {

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            //log("r rejected")
            //save r kafka mysql redis
            //try 3 times
            if (executor.getQueue().size() < 10000) {
                //try put again();
            }
        }
    }
}

SingleThreadPool

  • 保證執行緒按順序執行

  • 為什麼要有單執行緒的執行緒池?這個主要是用來做任務隊列和執行緒生命周期管理

  • 使用LinkedBlockingQueue作為任務隊列,上界為:Integer.MAX_VALUE(2147483647) 約等於無界。

示例程式碼見:

package git.snippets.juc;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static java.util.concurrent.TimeUnit.SECONDS;

public class SingleThreadPoolUsage {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService service = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 10; i++) {
            final int j = i;
            service.submit(() -> System.out.println("current thread " + Thread.currentThread() + "  " + j));
        }
        service.shutdown();
        service.awaitTermination(60, SECONDS);
    }
}

CachedThreadPool

  • corePoolSize:0

  • maxiumPoolSize:Integer.MAX_VALUE(2147483647)

  • keepAliveTime 60秒

  • 使用SynchronousQueue作為任務隊列 必須馬上執行

使用示例:

package git.snippets.juc;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class CachedThreadPoolUsage {
    public static void main(String[] args) throws InterruptedException {
        System.out.println("cached thread pool usage...");
        ExecutorService service = Executors.newCachedThreadPool();
        System.out.println(service);
        for (int i = 0; i < 2; i++) {
            service.execute(() -> {
                try {
                    TimeUnit.MILLISECONDS.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName());
            });
        }
        System.out.println(service);
        TimeUnit.SECONDS.sleep(80);
        System.out.println(service);
    }
}

FixedThreadPool

  • 最大執行緒數等於核心執行緒數

  • 使用LinkedBlockingQueue作為任務隊列,上界為:Integer.MAX_VALUE(2147483647)

使用示例見:

package git.snippets.juc;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
 * 多執行緒和單執行緒計算某個範圍內的所有素數
 */
public class FixedThreadPoolUsage {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        long start = System.currentTimeMillis();
        getPrime(1, 200000);
        long end = System.currentTimeMillis();
        System.out.println("use single thread...cost: " + (end - start));

        final int cpuCoreNum = 4;

        ExecutorService service = Executors.newFixedThreadPool(cpuCoreNum);

        MyTask t1 = new MyTask(1, 80000); //1-5 5-10 10-15 15-20
        MyTask t2 = new MyTask(80001, 130000);
        MyTask t3 = new MyTask(130001, 170000);
        MyTask t4 = new MyTask(170001, 200000);

        Future<List<Integer>> f1 = service.submit(t1);
        Future<List<Integer>> f2 = service.submit(t2);
        Future<List<Integer>> f3 = service.submit(t3);
        Future<List<Integer>> f4 = service.submit(t4);
        System.out.println();
        start = System.currentTimeMillis();
        f1.get();
        f2.get();
        f3.get();
        f4.get();
        end = System.currentTimeMillis();
        System.out.println("use fixed thread pool...cost: " + (end - start));
        service.shutdown();
        service.awaitTermination(1, TimeUnit.MINUTES);
    }

    static boolean isPrime(int num) {
        for (int i = 2; i <= num / 2; i++) {
            if (num % i == 0) {
                return false;
            }
        }
        return true;
    }

    static List<Integer> getPrime(int start, int end) {
        List<Integer> results = new ArrayList<>();
        for (int i = start; i <= end; i++) {
            if (isPrime(i)) results.add(i);
        }

        return results;
    }

    static class MyTask implements Callable<List<Integer>> {
        int startPos, endPos;

        MyTask(int s, int e) {
            this.startPos = s;
            this.endPos = e;
        }

        @Override
        public List<Integer> call() {
            List<Integer> r = getPrime(startPos, endPos);
            return r;
        }

    }
}

程式碼說明:本實例演示了多執行緒和單執行緒計算某個範圍內的所有素數。輸出結果如下

use single thread...cost: 1733

use fixed thread pool...cost: 505

ScheduledThreadPool

使用DelayWorkQueue,包括了如下兩個主要方法

scheduleAtFixedRate()

當前任務執行時間小於間隔時間,每次到點即執行;

當前任務執行時間大於等於間隔時間,任務執行後立即執行下一次任務。相當於連續執行了。

scheduleWithFixedDelay()

每當上次任務執行完畢後,間隔一段時間執行。不管當前任務執行時間大於、等於還是小於間隔時間,執行效果都是一樣的。

使用示例:

package git.snippets.juc;

import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import static java.util.concurrent.TimeUnit.SECONDS;

public class ScheduleThreadPoolUsage {
    static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);


    public static void main(String[] args) {
        test1();
        test2();
        test3();
    }

    /**
     * 任務執行時間(8s)小於間隔時間(10s)
     */
    public static void test1() {
        scheduler.scheduleAtFixedRate(() -> {
            System.out.println("Start: scheduleAtFixedRate:    " + new Date());
            try {
                Thread.sleep(8000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("End  : scheduleAtFixedRate:    " + new Date());
        }, 0, 10, SECONDS);
    }

    /**
     * 任務執行時間(12s)大於間隔時間(10s)
     */
    public static void test2() {
        scheduler.scheduleAtFixedRate(() -> {
            System.out.println("Start: scheduleAtFixedRate:    " + new Date());
            try {
                Thread.sleep(12000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("End  : scheduleAtFixedRate:    " + new Date());
        }, 0, 10, SECONDS);
    }

    /**
     * 任務執行時間(8s)小於間隔時間(10s)
     */
    public static void test3() {
        scheduler.scheduleWithFixedDelay(() -> {
            System.out.println("Start: scheduleWithFixedDelay: " + new Date());
            try {
                Thread.sleep(12000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("End  : scheduleWithFixedDelay: " + new Date());
        }, 0, 10, SECONDS);
    }
}

ForkJoinPool

Java SE 1.7 以後新增的執行緒池,包括以下兩個核心類

第一個是:RecursiveAction

它是一種沒有任何返回值的任務。只是做一些工作,比如寫數據到磁碟,然後就退出了。 一個RecursiveAction可以把自己的工作分割成更小的幾塊, 這樣它們可以由獨立的執行緒或者 CPU 執行。
我們可以通過繼承來實現一個RecursiveAction

第二個是:RecursiveTask

它是一種會返回結果的任務。可以將自己的工作分割為若干更小任務,並將這些子任務的執行合併到一個集體結果。 可以有幾個水平的分割和合併。

使用示例:

package git.snippets.juc;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.TimeUnit;
import java.util.stream.LongStream;

/**
 * @author <a href="mailto:[email protected]">Grey</a>
 * @date 2021/4/25
 * @since 1.7
 */
public class ForkJoinPoolUsage implements Calculator {
    private ForkJoinPool pool;

    public ForkJoinPoolUsage() {
        // 也可以使用公用的 ForkJoinPool:
        // pool = ForkJoinPool.commonPool()
        pool = new ForkJoinPool();
    }

    public static void useRecursiveAction() throws InterruptedException {
        // 創建包含Runtime.getRuntime().availableProcessors()返回值作為個數的並行執行緒的ForkJoinPool
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        // 提交可分解的PrintTask任務
        forkJoinPool.submit(new MyRecursiveAction(0, 1000));

        while (!forkJoinPool.isTerminated()) {
            forkJoinPool.awaitTermination(2, TimeUnit.SECONDS);
        }
        // 關閉執行緒池
        forkJoinPool.shutdown();
    }

    public static void useRecursiveTask() {
        long[] numbers = LongStream.rangeClosed(1, 1000).toArray();
        Calculator calculator = new ForkJoinPoolUsage();
        System.out.println(calculator.sumUp(numbers)); // 列印結果500500
    }

    public static void main(String[] args) throws InterruptedException {
        useRecursiveTask();
        useRecursiveAction();
    }

    @Override
    public long sumUp(long[] numbers) {
        return pool.invoke(new SumTask(numbers, 0, numbers.length - 1));
    }

    private static class MyRecursiveAction extends RecursiveAction {

        /**
         * 每個"小任務"最多只列印20個數
         */
        private static final int MAX = 20;

        private int start;
        private int end;

        public MyRecursiveAction(int start, int end) {
            this.start = start;
            this.end = end;
        }

        @Override
        protected void compute() {
            //當end-start的值小於MAX時,開始列印
            if ((end - start) < MAX) {
                for (int i = start; i < end; i++) {
                    System.out.println(Thread.currentThread().getName() + "-i的值" + i);
                }
            } else {
                // 將大任務分解成兩個小任務
                int middle = (start + end) / 2;
                MyRecursiveAction left = new MyRecursiveAction(start, middle);
                MyRecursiveAction right = new MyRecursiveAction(middle, end);
                left.fork();
                right.fork();
            }
        }


    }

    private static class SumTask extends RecursiveTask<Long> {
        private long[] numbers;
        private int from;
        private int to;

        public SumTask(long[] numbers, int from, int to) {
            this.numbers = numbers;
            this.from = from;
            this.to = to;
        }


        @Override
        protected Long compute() {

            // 當需要計算的數字小於6時,直接計算結果
            if (to - from < 6) {
                long total = 0;
                for (int i = from; i <= to; i++) {
                    total += numbers[i];
                }
                return total;
                // 否則,把任務一分為二,遞歸計算
            } else {
                int middle = (from + to) / 2;
                SumTask taskLeft = new SumTask(numbers, from, middle);
                SumTask taskRight = new SumTask(numbers, middle + 1, to);
                taskLeft.fork();
                taskRight.fork();
                return taskLeft.join() + taskRight.join();
            }
        }
    }
}

interface Calculator {
    long sumUp(long[] numbers);
}

此外,Java 的流式 API 底層也是 ForkJoinPool 實現的。

更多內容參考如下兩篇文章

ForkJoinPool 的使用以及原理

聊聊並發(八)——Fork/Join 框架介紹

WorkStealingPool

每個執行緒都有單獨的隊列,每個執行緒隊列執行完畢後,就會去其他的執行緒隊列裡面拿過來執行, 底層是ForkJoinPool

  • Java SE 1.8 新增

  • 會自動啟動 CPU 核數個執行緒去執行任務

使用示例:

/**
 *
 */
package git.snippets.juc;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @since 1.8
 */
public class WorkStealingPoolUsage {
    public static void main(String[] args) throws IOException {
        int core = Runtime.getRuntime().availableProcessors();
        //  會自動啟動cpu核數個執行緒去執行任務 ,其中第一個是1s執行完畢,其餘都是2s執行完畢,
        //  有一個任務會進行等待,當第一個執行完畢後,會再次偷取最後一個任務執行
        ExecutorService service = Executors.newWorkStealingPool();
        service.execute(new R(1000));
        for (int i = 0; i < core; i++) {
            service.execute(new R(2000));
        }
        //由於產生的是精靈執行緒(守護執行緒、後台執行緒),主執行緒不阻塞的話,看不到輸出
        System.in.read();
    }

    static class R implements Runnable {

        int time;

        R(int t) {
            this.time = t;
        }

        @Override
        public void run() {
            try {
                TimeUnit.MILLISECONDS.sleep(time);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println(time + " " + Thread.currentThread().getName());
        }
    }
}

CompletableFuture

  • Java SE 1.8 新增

  • anyOf()可以實現「任意個 CompletableFuture 只要一個成功」,allOf()可以實現「所有 CompletableFuture 都必須成功」,這些組合操作可以實現非常複雜的非同步流程式控制制。

使用示例:

package git.snippets.juc;

import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

/**
 * 假設你能夠提供一個服務
 * 這個服務查詢各大電商網站同一類產品的價格並匯總展示
 */
public class CompletableFutureUsage {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        way1();
        way2();
    }

    public static void way1() {
        long start = System.currentTimeMillis();
        System.out.println("p1 " + priceOfJD());
        System.out.println("p2 " + priceOfTB());
        System.out.println("p3 " + priceOfTM());
        long end = System.currentTimeMillis();
        System.out.println("串列執行,耗時(ms):" + (end - start));
    }

    public static void way2() throws ExecutionException, InterruptedException {
        long start = System.currentTimeMillis();
        CompletableFuture<Double> p1 = CompletableFuture.supplyAsync(() -> priceOfJD());
        CompletableFuture<Double> p2 = CompletableFuture.supplyAsync(() -> priceOfTB());
        CompletableFuture<Double> p3 = CompletableFuture.supplyAsync(() -> priceOfTM());
        CompletableFuture.allOf(p1, p2, p3).join();
        System.out.println("p1 " + p1.get());
        System.out.println("p2 " + p2.get());
        System.out.println("p3 " + p3.get());
        long end = System.currentTimeMillis();
        System.out.println("使用CompletableFuture並行執行,耗時(ms): " + (end - start));
    }

    private static double priceOfTM() {
        delay();
        return 1.00;
    }

    private static double priceOfTB() {
        delay();
        return 2.00;
    }

    private static double priceOfJD() {
        delay();
        return 3.00;
    }

    private static void delay() {
        int time = new Random().nextInt(500);
        try {
            TimeUnit.MILLISECONDS.sleep(time);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

證明原子操作類比synchronized更高效

示例程式碼如下

package git.snippets.juc;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 證明原子操作類比synchronized更高效
 *
 * @author <a href="mailto:[email protected]">Grey</a>
 * @date 2021/4/26
 */
public class AtomVSSync {
    public static void main(String[] args) {
        test1();
    }

    AtomicInteger atomicCount = new AtomicInteger(0);
    int count = 0;
    final static int TIMES = 80000000;

    void m() {
        for (int i = 0; i < TIMES; i++) {
            atomicCount.incrementAndGet(); //原子操作
        }
    }

    void m2() {
        for (int i = 0; i < TIMES; i++) {
            synchronized (this) {
                count++;
            }
        }
    }


    public static void test1() {
        AtomVSSync t1 = new AtomVSSync();
        AtomVSSync t2 = new AtomVSSync();
        long time1 = time(t1::m);
        System.out.println("使用原子類得到的結果是:" + t1.atomicCount);
        long time2 = time(t2::m2);
        System.out.println("使用synchronized得到的結果是:" + t2.count);

        System.out.println("使用原子類花費的時間是:" + time1);
        System.out.println("使用 synchronized 花費的時間是 :" + time2);
    }

    private static long time(Runnable runnable) {
        List<Thread> threads = new ArrayList<>();
        long startTime = System.currentTimeMillis();
        for (int i = 0; i < 10; i++) {
            threads.add(new Thread(runnable, "thread-" + i));
        }
        threads.forEach(Thread::start);
        threads.forEach(o -> {
            try {
                o.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        long endTime = System.currentTimeMillis();
        return endTime - startTime;
    }
}

Java SE 11 下運行上述程式碼,程式碼輸出結果如下:

使用原子類得到的結果是:800000000
使用synchronized得到的結果是:800000000
使用原子類花費的時間是:12111
使用 synchronized 花費的時間是 :16471

AtomXXX類可以保證可見性嗎?

可以。

程式碼如下

package git.snippets.juc;

import java.util.concurrent.atomic.AtomicBoolean;

/**
 * AtomXXX類可以保證可見性嗎?請寫一個程式來證明
 *
 * @author <a href="mailto:[email protected]">Grey</a>
 * @date 2021/4/26
 */
public class AtomVisible {
    public static void main(String[] args) {
        test2();
    }

    AtomicBoolean running = new AtomicBoolean(true);

    void m3() {
        System.out.println("m1 start");
        while (running.get()) {  //死循環。只有running=false時,才能執行後面的語句

        }
        System.out.println("m2 end");
    }

    public static void test2() {
        AtomVisible t = new AtomVisible();
        new Thread(t::m3, "t1").start();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        t.running.getAndSet(false);

    }
}

輸出結果

m1 start
m2 end

寫一個程式證明AtomXXX類的多個方法並不構成原子性

package git.snippets.juc;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 寫一個程式證明AtomXXX類的多個方法並不構成原子性
 *
 * @author <a href="mailto:[email protected]">Grey</a>
 * @date 2021/4/26
 */
public class MultiAtomMethod {
    public static void main(String[] args) {
        test3();
    }

    AtomicInteger count = new AtomicInteger(0);

    void m4() {
        for (int i = 0; i < 10000; i++) {
            if (count.get() < 999 && count.get() >= 0) { //如果未加鎖,之間還會有其他執行緒插進來
                count.incrementAndGet();
            }
        }
    }

    public static void test3() {
        MultiAtomMethod t = new MultiAtomMethod();
        List<Thread> threads = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            threads.add(new Thread(t::m4, "thread" + i));
        }
        threads.forEach(Thread::start);
        threads.forEach((o) -> {
            try {
                //join()方法阻塞調用此方法的執行緒,直到執行緒t完成,此執行緒再繼續。通常用於在main()主執行緒內,等待其它執行緒完成再結束main()主執行緒。
                o.join(); //相當於在main執行緒中同步o執行緒,o執行完了,main執行緒才有執行的機會
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        System.out.println(t.count);
    }
}

說明

本文涉及到的所有程式碼和圖例

圖例

程式碼

更多內容見:Java 多執行緒

參考資料

工作執行緒數究竟要設置為多少 | 架構師之路

實戰Java高並發程式設計(第2版)

深入淺出Java多執行緒

多執行緒與高並發-馬士兵

Java並發編程實戰

理解ScheduledExecutorService中scheduleAtFixedRate和scheduleWithFixedDelay的區別

ForkJoinPool 的使用以及原理

聊聊並發(八)——Fork/Join 框架介紹

使用CompletableFuture

圖解Java多執行緒設計模式