Android線程池使用介紹

本文主要使用kotlin,討論Android開發中的線程池用法。

我們想使用線程的時候,可以直接創建子線程並啟動

Thread { Log.d("rfDev", "rustfisher said: hello") }.start()

不想每次都創建新的子線程
如果有大量的異步任務,不想每次都創建子線程。有沒有什麼把子線程統一管理的方法?

遇到這樣的情況,我們可以考慮線程池。線程池解決兩個問題:需要執行大量異步任務的時候,減輕每個異步任務的調用開銷,提高性能。另外它還能夠限制和管理子線程。每個ThreadPoolExecutor都維護了一些統計數據,例如已執行的任務數量。

有大量異步任務的時候,可以考慮使用線程池。

預置線程池

代碼參考 Android API 29

ThreadPoolExecutor提供了很多參數,方便開發者調控。線程池的設計者建議開發者使用以下幾個工廠方法,Android中主要有5種

  • newCachedThreadPool() 不限制數量的線程池,能自動回收線程
  • newFixedThreadPool(int nThreads) 固定數量的線程池
  • newSingleThreadExecutor() 單一的子線程
  • newScheduledThreadPool(int corePoolSize) 能執行延時任務或者周期性任務
  • newWorkStealingPool() 工作竊取線程池

實際上我們在Android Studio里輸入Executors.new的時候,會跳出很多個提示選項。

Executors.new 的智能提示

可緩存線程池

Executors.newCachedThreadPool獲得一個可緩存線程池對象,然後讓它執行任務。

val tp: ExecutorService = Executors.newCachedThreadPool()
tp.submit { Log.d(TAG, "rustfisher: cached線程池執行任務 3") }

可緩存線程池會在需要的時候創建新的子線程。當原有的線程可用的時候,會復用現有線程。
這個機制適用於執行多個短期異步任務。任務比較小,但是數量大。

調用execute方法會先嘗試復用已有的可用線程。如果當前沒有線程,會新建一個線程並把它添加到池裡。
超過60秒沒有使用的線程會被停止並移除。因此即便長時間不用這個線程池,也不會造成多大的開銷。

定長線程池

使用newFixedThreadPool(int nThreads)示例

val fixedTp: ExecutorService = Executors.newFixedThreadPool(4)
fixedTp.submit { Log.d(TAG, "rustfisher 定長線程池執行任務") }

靜態方法里傳入了一個int參數nThreads,表示最大線程數量。
如果當前所有線程都在忙,又有新的任務添加進來。那麼任務會在隊列中等待,直到有可用的線程來處理任務。

如果有的線程遇到錯誤而停止了,要執行任務的話,會創建新的線程補上位置。

池裡的線程會一直存活,直到線程池停止(ExecutorService#shutdown)。

單一線程池

val singleTp: ExecutorService = Executors.newSingleThreadExecutor()
singleTp.submit { Log.d(TAG, "單一線程池執行任務") }

只擁有1個子線程。任務隊列不限制任務數量。如果線程遇到問題停止了,接下來又要處理任務時,會新建一個線程來處理。

它能保證任務會按順序處理,同一時間只能處理1個任務。

單一線程池創建後,不能動態修改線程數量。不像newFixedThreadPool(1)的定長線程池可以修改線程數。

計劃任務線程池

val scheduleTp: ScheduledExecutorService = Executors.newScheduledThreadPool(3)

計劃任務線程池能夠執行延遲任務和周期任務。

延遲任務

需要設定延時與時間單位

scheduleTp.schedule({ Log.d(TAG, "計劃任務1 runnable") }, 300, TimeUnit.MILLISECONDS)
scheduleTp.schedule(Callable { Log.d(TAG, "計劃任務2 callable") }, 400, TimeUnit.MILLISECONDS)

周期任務

主要涉及到2個方法scheduleAtFixedRatescheduleWithFixedDelay

假設任務時間小於周期時間,則按給定周期時間來進行。這兩個方法表現一致。

假設任務執行時間大於周期時間,這兩個方法有點不同

  • scheduleAtFixedRate執行完上一個任務後,用時超過了周期時間,會立刻執行下一個任務。
  • scheduleWithFixedDelay在上一個任務執行完畢後,還會等待周期時間,再去執行下一個任務。

工作竊取線程池

Android SDK 大於等於24,有一種新的線程池,暫且稱為「工作竊取線程池」,或者叫「靈活調度線程池」。

if (android.os.Build.VERSION.SDK_INT >= android.os.Build.VERSION_CODES.N) {
    Executors.newWorkStealingPool()
}

線程池維護足夠的線程來支持給定的並行度(parallelism level),可能會用多個隊列來減少爭用。
並行度對應的是活躍的線程最大數,或者能處理任務的線程最大數。

線程的實際數量可能會動態增減。工作竊取線程池不保證按提交順序來處理任務。

執行任務

執行任務的時候可以傳入RunnableCallable,前面用的都是Runnable

Callable的例子

tp.submit(Callable { "OK" })

無返回值任務的調用

無返回值任務用CallableRunnable都行。

val tp: ExecutorService = Executors.newCachedThreadPool()
tp.submit { Log.d(TAG, "rustfisher: cached線程池submit runnable") }
tp.execute { Log.d(TAG, "rustfisher: cached線程池execute runnable") }
tp.submit(Callable { Log.d(TAG, "rustfisher: cached線程池submit callable") })

tp.shutdown() // 最後記得用完後停掉線程池

有返回值任務的調用

有返回值的任務需要Callable接口。

submit

調用submit方法時會返回一個Future對象。通過Futureget()方法可拿到返回值。這裡需要注意get()是阻塞的,完成任務後,能拿到返回值。

val tp: ExecutorService = Executors.newCachedThreadPool()
val future = tp.submit(Callable {
    return@Callable "callable的返回值"
})
Log.d(TAG, "future get之前 isDone: ${future.isDone}, isCancelled: ${future.isCancelled}")
val res = future.get()
Log.d(TAG, "future get之後 isDone: ${future.isDone}, isCancelled: ${future.isCancelled}")
Log.d(TAG, "future get: $res")

運行log

future get之前 isDone: false, isCancelled: false
future get之後 isDone: true, isCancelled: false
future get: callable的返回值

invokeAll

對於列表裡的任務,可以使用invokeAll(Collection<? extends Callable<T>> tasks),返回一個Future的列表。
作為對比,給其中一個任務加上延時。

invokeAll示例

    val tp: ExecutorService = Executors.newFixedThreadPool(5)
    val callList = arrayListOf<Callable<String>>(
            Callable {
                Log.d(TAG, "task1 ${Thread.currentThread()}")
                return@Callable "rust"
            },
            Callable {
                Log.d(TAG, "task2 ${Thread.currentThread()}")
                Thread.sleep(1500) // 加上延時
                return@Callable "fisher"
            },
            Callable {
                Log.d(TAG, "task3 ${Thread.currentThread()}")
                return@Callable "列表裏面的任務"
            },
    )
    Log.d(TAG, "invokeAll 準備提交任務")
    val futureList = tp.invokeAll(callList)
    Log.d(TAG, "invokeAll 已提交任務")
    futureList.forEach { f ->
        Log.d(TAG, "任務列表執行結果 ${f.get()}") // 這裡會阻塞 別在ui線程里get
    }

運行log,可以看到提交任務後,經過延時,拿到了運行結果。注意看invokeAll前後的時間。invokeAll會阻塞當前線程。使用的時候必須小心,不要在ui線程中調用。

    2021-09-11 14:40:07.062 16914-16914/com.rustfisher.tutorial2020 D/rfDevTp: invokeAll 準備提交任務
    2021-09-11 14:40:07.063 16914-19230/com.rustfisher.tutorial2020 D/rfDevTp: task1 Thread[pool-4-thread-1,5,main]
    2021-09-11 14:40:07.063 16914-19231/com.rustfisher.tutorial2020 D/rfDevTp: task2 Thread[pool-4-thread-2,5,main]
    2021-09-11 14:40:07.063 16914-19232/com.rustfisher.tutorial2020 D/rfDevTp: task3 Thread[pool-4-thread-3,5,main]
    2021-09-11 14:40:08.563 16914-16914/com.rustfisher.tutorial2020 D/rfDevTp: invokeAll 已提交任務
    2021-09-11 14:40:08.563 16914-16914/com.rustfisher.tutorial2020 D/rfDevTp: 任務列表執行結果 rust
    2021-09-11 14:40:08.563 16914-16914/com.rustfisher.tutorial2020 D/rfDevTp: 任務列表執行結果 fisher
    2021-09-11 14:40:08.563 16914-16914/com.rustfisher.tutorial2020 D/rfDevTp: 任務列表執行結果 列表裏面的任務

提交了3個任務,在3個不同的子線程中執行。

invokeAny

invokeAny(Collection<? extends Callable<T>> tasks)也是接收Callable集合。
然後返回最先執行結束的任務的值,其它未完成的任務將被正常取消掉不會有異常。

invokeAny示例

    val tp: ExecutorService = Executors.newCachedThreadPool()
    val callList = arrayListOf<Callable<String>>(
            Callable {
                Thread.sleep(1000) // 設計延時
                return@Callable "rust"
            },
            Callable {
                Thread.sleep(400)
                return@Callable "fisher"
            },
            Callable {
                Thread.sleep(2000)
                return@Callable "列表裏面的任務"
            },
    )
    Log.d(TAG, "invokeAny 提交任務")
    val res = tp.invokeAny(callList)
    Log.d(TAG, "執行結果 $res")
    2021-09-11 14:04:55.253 14066-14066/com.rustfisher.tutorial2020 D/rfDevTp: invokeAny 提交任務
    2021-09-11 14:04:55.654 14066-14066/com.rustfisher.tutorial2020 D/rfDevTp: 執行結果 fisher

觀察log可以看到,最後執行的是「fisher」這個任務。

停止線程池

使用完畢後,記得終止線程池

/*ExecutorService*/ shutdown()
shutdownNow()

shutdown()在已提交的任務後面創建一個停止命令,並且不再接受新的任務。如果線程池已經停止了,調用這個方法將不生效。

shutdownNow()方法嘗試停止所有執行中的任務,停下等待中的任務。並且返回等待執行的任務列表List<Runnable>