從一次生產消費者的bug看看執行緒池如何增加執行緒

0 背景

某個閑來無事的下午,看到舊有的項目中,有個任務調度的地方都是同步的操作,就是流程A的完成依賴流程B,流程B的完成依賴流程C,按此類推。

作為一名垃圾程式碼生產者,QA的噩夢、故障報告槍手的我來說,發掘可以「優化」的空間,是我的分內之事。

因為是在一個工程內,並且本身工程組件沒有使用到任何消息隊列的軟體(例如kafka、rocketMQ),如果只是要因為這個功能而貿然引用,對其進行維護的成本就比較高,我的技術組長大人是萬萬不會同意的。沒辦法,自己來吧。很快的,我完成了下面幾個類的編寫

整體的設計很簡單,就是傳統的生產消費者,只是利用了阻塞隊列,作為緩衝。

  • 在生產者內部有個定時執行的執行緒,將隊列中的消息轉發給消費者。生產者會單獨佔用一個執行緒
  • 每個消費者自己也有一個阻塞隊列,用來接收生產者產生的消息,消費者們因為可能不是所有的topic每時每刻都會有消息的產生,因此利用執行緒池即可。

1 程式碼實現


public interface IEvent {

    String getTopic();

}

// 消息實體
public class Event<T> implements IEvent{

    /**
     * 產生的時間戳
     */
    private long ts = System.currentTimeMillis();

    /**
     * 攜帶的實體數據
     */
    private T entity;


    /**
     * topic
     */
    private String topic;


    // setter getter 省略
}

// 如何處理消息
public interface ConfigListener {

    String ALL = "all";

    /**
     * 提供給監聽器處理
     *
     * @param event
     */
    void handler(IEvent event);


    /**
     * 優先順序順序
     * @return
     */
    int getOrder();

    /**
     *
     * @return
     */
    String getTopic();

}

// 創建4個消息處理的類,這裡省略了,只展示一個
public class RandomSleepConfigListener implements ConfigListener {

    @Override
    public void handler(IEvent event) {
        logger.info("execute " + this.getClass().getSimpleName());
        // 20ms - 50ms
        long t = (long) (Math.random() * 5) + 5L;
        try {
            TimeUnit.MILLISECONDS.sleep(t);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}




// 執行緒池類
public class ScheduleThreadPool {

    private static final AtomicInteger atomic = new AtomicInteger();

    // 被生產者單獨使用的執行緒
    public static final ExecutorService EVENT_POOL = Executors.newFixedThreadPool(1, r -> new Thread(r, "EVENT-PRODUCER-" + atomic.incrementAndGet()));

    /**
     * 常駐執行緒2個,最大8個,最多接受任務128個,超過則由提交執行緒來處理
     */
    public static final ExecutorService EVENT_CONSUMER_POOL =
            new ThreadPoolExecutor(2, 8, 50L,
                    TimeUnit.MILLISECONDS,
                    new ArrayBlockingQueue<>(128),
                    r -> new Thread(r, "EVENT-CONSUMER-" + atomic.incrementAndGet()),
                    new ThreadPoolExecutor.CallerRunsPolicy());
}



// ############################### 以上的準備工作完成,下面就是編寫生產者和消費者     ###########################################


public class Producer {

    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

    /**
     * 外部提交的消息體會被送入到這個隊列當中
     */
    private static final ArrayBlockingQueue<IEvent> blockingQueue = new ArrayBlockingQueue<>(128);

    /**
     *  topic, consumer
     */
    private static Map<String, Consumer> topic2ConsumerMap = Maps.newHashMap();



    // 一些初始化的工作
    static {
        logger.info("Producer init start...");
        // SPI方式插件式載入,這裡可以改為你熟悉的載入類的方式
        Iterator<ConfigListener> configListenerIterator = ServiceBootstrap.loadAll(ConfigListener.class);

        // 整體遍歷一遍,不同的listener分散到不同的地方去
        while (configListenerIterator.hasNext()) {
            ConfigListener configListener = configListenerIterator.next();
            String topic = configListener.getTopic();
            // 沒有明確topic的,我們不進行處理
            if (null == topic) {
                continue;
            }

            logger.info("we init {} topic", topic);

            if (topic2ConsumerMap.containsKey(topic)) {
                topic2ConsumerMap.get(topic).addListener(configListener);
            } else {
                topic2ConsumerMap.put(topic, new Consumer(topic).addListener(configListener));
            }
        }

        // 如果有定義對全部都適用的事件處理,需要加入到每個topic的listener的隊列中去
        if (topic2ConsumerMap.containsKey(ConfigListener.ALL)) {
            Consumer consumer = topic2ConsumerMap.get(ConfigListener.ALL);
            topic2ConsumerMap.remove(ConfigListener.ALL);

            for (Map.Entry<String, Consumer> entry : topic2ConsumerMap.entrySet()) {
                entry.getValue().addAllListener(consumer.getPriorityList());
            }
        }

        // 啟動監聽執行緒
        ScheduleThreadPool.EVENT_POOL.execute(() -> {
            //noinspection InfiniteLoopStatement
            int i = 0;
            while (true) {
                try {
                    // 從隊列獲取需要處理的任務,沒有會進行阻塞
                    IEvent iEvent = blockingQueue.take();
                    logger.info("from producer queue take a message {} {}", iEvent.getTopic(), (i++));
                    topic2ConsumerMap.get(iEvent.getTopic()).addEvent(iEvent);
                } catch (InterruptedException e) {
                    //
                }
            }
        });

        logger.info("Producer init end...");
    }


    /**
     * 阻塞隊列添加要處理的事件
     * @param iEvent
     * @return true 添加成功
     */
    public static void publish(IEvent iEvent) throws InterruptedException {
        logger.info("publish start...");
        // 當隊列滿時,這個方法會被阻塞
        blockingQueue.put(iEvent);
        logger.info("publish over...");
    }

}



public class Consumer {

    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

    /**
     * 排序好的列表
     */
    private List<ConfigListener> priorityList = Lists.newArrayListWithCapacity(16);

    /**
     * 降序排列
     */
    private Comparator<ConfigListener> comparator = (o1, o2) -> o2.getOrder() - o1.getOrder();


    /**
     * 等待被處理的事件
     */
    private LinkedBlockingQueue<IEvent> waitEvent = new LinkedBlockingQueue<>(32);

    /**
     * 統計已經完成的任務數
     */
    private AtomicInteger count = new AtomicInteger();

    /**
     * 處理哪種topic
     */
    private String topic;

//    //CODE-B 這塊程式碼是後來產生問題的程式碼,也是因為這個程式碼引起了我對執行緒池創建過程的好奇
//    {
//        logger.info("non-static invoke--------");
//        // 創建任務提交
//        ScheduleThreadPool.EVENT_CONSUMER_POOL.execute(() -> {
//            // 注意這裡有個循環
//            for (;;) {
//                try {
//                    logger.info("take event");
//                    IEvent take = waitEvent.take();
//                    priorityList.forEach(c -> c.handler(take));
//                    int t = count.incrementAndGet();
//                    logger.info("TOPIC[{}] size {}, remainingCapacity {} finish {} ",
//                            topic, waitEvent.size(), waitEvent.remainingCapacity(), t);
//                } catch (InterruptedException e) {
//                    // 記錄錯誤失敗
//                }
//            }
//        });
//    }

    public Consumer(String topic) {
        this.topic = topic;
    }


    public List<ConfigListener> getPriorityList() {
        return priorityList;
    }

    public Consumer addListener(ConfigListener listener) {
        priorityList.add(listener);
        priorityList.sort(comparator);
        return this;
    }

    public void addAllListener(Collection<? extends ConfigListener> c) {
        priorityList.addAll(c);
        priorityList.sort(comparator);
    }

    public void addEvent(IEvent iEvent) {
        try {
            logger.info(" topic {} queueSize {} finish {}", this.topic, waitEvent.size(), count.get());
            waitEvent.put(iEvent);
        } catch (InterruptedException e) {
            //
        }


        // CODE-A
        ScheduleThreadPool.EVENT_CONSUMER_POOL.execute(() -> {
            // 注意這裡和分發的producer不一樣,不使用循環
            try {
                logger.info("take event");
                IEvent take = waitEvent.take();
                priorityList.forEach(c -> c.handler(take));
                int t = count.incrementAndGet();
                logger.info("TOPIC[{}] size {}, remainingCapacity {} finish {} ",
                        topic, waitEvent.size(), waitEvent.remainingCapacity(), t);
            } catch (InterruptedException e) {
                // 記錄錯誤失敗
            }
        });

    }

}


// 測試類
public class ProductTest{
    // 這裡我自己創建了4個消息處理的類,對應的topic分別如下
    String[] topics = {"random1","random2","random3","random4"};

    @Test(timeout = 30000L)
    public void publish() throws InterruptedException {
        
        for (int i = 0; i < 720; i++) {
            int j = i & 0x3;
            System.out.println(i);
            Producer.publish(new Event<String>("hello", topics[j]));
        }

        TimeUnit.SECONDS.sleep(60L);
    }

}

2 開搞

程式碼都準備好了以後,我們就開始了,debug出來的結果和設想的符合預期

4個topic,720個任務,每個處理掉180個

2021-01-17 16:27:56.210 [EVENT-CONSUMER-3] INFO  - TOPIC[random1] size 0, remainingCapacity 32 finish 180 
2021-01-17 16:27:56.210 [EVENT-CONSUMER-2] INFO  - TOPIC[random4] size 1, remainingCapacity 31 finish 179 
2021-01-17 16:27:56.210 [EVENT-CONSUMER-3] INFO  - take event
2021-01-17 16:27:56.210 [EVENT-CONSUMER-2] INFO  - take event
2021-01-17 16:27:56.210 [EVENT-CONSUMER-3] INFO  - execute RandomSleepConfigListener2
2021-01-17 16:27:56.210 [EVENT-CONSUMER-2] INFO  - execute RandomSleepConfigListener3
2021-01-17 16:27:56.215 [EVENT-CONSUMER-3] INFO  - TOPIC[random2] size 0, remainingCapacity 32 finish 180 
2021-01-17 16:27:56.215 [EVENT-CONSUMER-3] INFO  - take event
2021-01-17 16:27:56.215 [EVENT-CONSUMER-3] INFO  - execute RandomSleepConfigListener4
2021-01-17 16:27:56.217 [EVENT-CONSUMER-2] INFO  - TOPIC[random3] size 0, remainingCapacity 32 finish 180 
2021-01-17 16:27:56.221 [EVENT-CONSUMER-3] INFO  - TOPIC[random4] size 0, remainingCapacity 32 finish 180

嗯,目前為止覺得很完美,然後看consumer類,覺得每次任務被推入阻塞隊列,然後執行執行緒去從阻塞隊列中去拉取消息出來,這不符合我作死的風格,改。
然後就變為了CODE-B的模樣,執行緒池創建出來後,一直循環來拉取即可

    {
        logger.info("non-static invoke--------");
        // 創建任務提交
        ScheduleThreadPool.EVENT_CONSUMER_POOL.execute(() -> {
            // 注意這裡有個循環
            for (;;) {
                try {
                    logger.info("take event");
                    IEvent take = waitEvent.take();
                    priorityList.forEach(c -> c.handler(take));
                    int t = count.incrementAndGet();
                    logger.info("TOPIC[{}] size {}, remainingCapacity {} finish {} ",
                            topic, waitEvent.size(), waitEvent.remainingCapacity(), t);
                } catch (InterruptedException e) {
                    // 記錄錯誤失敗
                }
            }
        });
    }

然後,將CODE-A的程式碼注釋掉,神奇的事情就發生了,直接一發入魂

2021-01-17 16:32:49.539 [Time-limited test] INFO  - Producer init start...
2021-01-17 16:32:49.562 [Time-limited test] INFO  - we init all topic
2021-01-17 16:32:49.806 [Time-limited test] INFO  - non-static invoke--------   ##########
2021-01-17 16:32:49.819 [Time-limited test] INFO  - we init random1 topic
2021-01-17 16:32:49.819 [Time-limited test] INFO  - non-static invoke--------   ##########
2021-01-17 16:32:49.819 [EVENT-CONSUMER-1] INFO  - take event**                 ##########
2021-01-17 16:32:49.820 [EVENT-CONSUMER-2] INFO  - take event**                 ##########
2021-01-17 16:32:49.821 [Time-limited test] INFO  - we init random2 topic
2021-01-17 16:32:49.821 [Time-limited test] INFO  - non-static invoke--------
2021-01-17 16:32:49.824 [Time-limited test] INFO  - we init random3 topic
2021-01-17 16:32:49.824 [Time-limited test] INFO  - non-static invoke--------
2021-01-17 16:32:49.826 [Time-limited test] INFO  - we init random4 topic
2021-01-17 16:32:49.880 [Time-limited test] INFO  - non-static invoke--------
2021-01-17 16:32:49.884 [Time-limited test] INFO  - Producer init end...
2021-01-17 16:32:49.884 [Time-limited test] INFO  - publish start...
2021-01-17 16:32:49.884 [Time-limited test] INFO  - publish over...



2021-01-17 16:32:49.885 [ **EVENT-PRODUCER-3** ] INFO  -  topic random1 queueSize 0 finish 0     ##########
2021-01-17 16:32:49.885 [Time-limited test] INFO  - publish over...

2021-01-17 16:32:49.886 [EVENT-PRODUCER-3] INFO  - from producer queue take a message random2 1
2021-01-17 16:32:49.886 [Time-limited test] INFO  - publish start...
2021-01-17 16:32:49.886 [ **EVENT-PRODUCER-3** ] INFO  -  topic random2 queueSize 0 finish 0      ##########
2021-01-17 16:32:49.886 [Time-limited test] INFO  - publish over...

2021-01-17 16:32:49.886 [EVENT-PRODUCER-3] INFO  - from producer queue take a message random3 2
2021-01-17 16:32:49.886 [Time-limited test] INFO  - publish start...
2021-01-17 16:32:49.886 [**EVENT-PRODUCER-3**] INFO  -  topic random3 queueSize 0 finish 0        ##########
2021-01-17 16:32:49.886 [Time-limited test] INFO  - publish over...

2021-01-17 16:32:49.886 [EVENT-PRODUCER-3] INFO  - from producer queue take a message random4 3
2021-01-17 16:32:49.886 [**EVENT-PRODUCER-3**] INFO  -  topic random4 queueSize 0 finish 0        ##########

....

2021-01-17 16:32:50.031 [EVENT-PRODUCER-3] INFO  -  topic random1 queueSize 27 finish 5
2021-01-17 16:32:50.031 [EVENT-PRODUCER-3] INFO  - from producer queue take a message random2 129
2021-01-17 16:32:50.031 [EVENT-PRODUCER-3] INFO  -  topic random2 queueSize 32 finish 0
.
.
.

2021-01-17 16:32:50.275 [EVENT-CONSUMER-2] INFO  - execute RandomSleepConfigListener
2021-01-17 16:32:50.283 [EVENT-CONSUMER-2] INFO  - TOPIC[random1] size 4, remainingCapacity 28 finish 29 
2021-01-17 16:32:50.283 [EVENT-CONSUMER-2] INFO  - take event
2021-01-17 16:32:50.283 [EVENT-CONSUMER-2] INFO  - execute RandomSleepConfigListener
2021-01-17 16:32:50.289 [EVENT-CONSUMER-2] INFO  - TOPIC[random1] size 3, remainingCapacity 29 finish 30 
2021-01-17 16:32:50.290 [EVENT-CONSUMER-2] INFO  - take event
2021-01-17 16:32:50.290 [EVENT-CONSUMER-2] INFO  - execute RandomSleepConfigListener
2021-01-17 16:32:50.299 [EVENT-CONSUMER-2] INFO  - TOPIC[random1] size 2, remainingCapacity 30 finish 31 
2021-01-17 16:32:50.299 [EVENT-CONSUMER-2] INFO  - take event
2021-01-17 16:32:50.299 [EVENT-CONSUMER-2] INFO  - execute RandomSleepConfigListener
2021-01-17 16:32:50.305 [EVENT-CONSUMER-2] INFO  - TOPIC[random1] size 1, remainingCapacity 31 finish 32 
2021-01-17 16:32:50.305 [EVENT-CONSUMER-2] INFO  - take event
2021-01-17 16:32:50.306 [EVENT-CONSUMER-2] INFO  - execute RandomSleepConfigListener
2021-01-17 16:32:50.315 [EVENT-CONSUMER-2] INFO  - TOPIC[random1] size 0, remainingCapacity 32 finish 33 
2021-01-17 16:32:50.316 [EVENT-CONSUMER-2] INFO  - take event

看日誌是只有topic1被消費了,其他的topic都沒有被消費。

第一段和第二段表明,生產者是如期按照我們設想的,逐個將詳細進行分發,我的測試程式是按順序進行1~4的消息分發的。

EVENT-CONSUMER的執行緒編號只有到2,3是屬於生產者執行緒的編號。於是我就感覺很奇怪,為什麼執行緒池沒有繼續創建執行緒呢?

3 分析原因

我開始去查看了執行緒池execute()這個方法

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)    // ------------  debug後發現進入到這裡條件無法滿足
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

英文注釋解釋的很明白,execute在執行緒創建方面有會進行3種情況考慮

1 本身workthread 小於 coresize 則果斷進行創建

2 執行緒池處於運行狀態,將要執行的命令進行入隊,這個入隊就是我們在創建執行緒池時使用的隊列,我這裡用的是128個

3 進入到第三部可能是執行緒池已經關閉了,或者是隊列已經滿了,如果是關閉,這一步肯定會失敗,如果是隊列滿了那麼也是同樣的,之所以要再直接創建工作執行緒,是因為可能這個瞬間剛好有機會創建,因此不放棄這種可行性。

4 哦豁是這樣

隨後我就行了debug大法,發現一開始的2個消費者執行緒都是創建的十分的順利,但是後面的執行緒任務就沒辦法了創建出新的執行緒了。

仔細觀察,發現是if (workerCountOf(recheck) == 0)到這一步判斷不滿足條件,就不往下進行創建了。

那麼是為什麼呢? 哦原來是因為使用了死循環,儘管是阻塞隊列,但執行緒卻被死死地佔用了。這個判斷值不會為0. 於是就一直只有一個topic在消費消息。

至於卡住的原因也很簡單,使用阻塞隊列,一定是某一個阻塞了。從後面觀察來看,是生產者的緩衝隊列滿了。只進行到32的原因,也是因為剛好每個消費者的緩衝隊列是32的大小。4個就是一個生產者的隊列長度。當第一批128個分發玩了以後,從129開始,給topic的隊列已經滿了,put進行的阻塞。於是生產者和消費者處於全員懵逼的狀態。

最開始沒有使用死循環的程式碼就和一般我們寫的多執行緒程式碼一樣,大家都靠本事去競爭,因此每個consumer都有機會被執行。

那麼最後一個問題,要想讓執行緒池創建超過coreSize的執行緒要怎麼做呢?從注釋長短你就能看出,哪些條件比較簡單,滿足條件3隻要我們創造多一些任務即可,或者將執行緒池的工作隊列大小調小。(這裡我選擇調整隊列大小,改為16,很快就創建出新的執行緒了)

5 結論

那麼執行緒池創建的哲學是什麼?

首先,按照coreSize,創建出24h不間斷休息的好員工
其次,有處理不來的工作先堆放到某個地方,等待處理
最後,核心員工不行,趕緊招募臨時工,來一起進行攻堅

希望這篇文章對你理解執行緒池有所幫助。