Android線程池使用介紹
- 2021 年 9 月 13 日
- 筆記
- Android-101
本文主要使用kotlin,討論Android開發中的線程池用法。
我們想使用線程的時候,可以直接創建子線程並啟動
Thread { Log.d("rfDev", "rustfisher said: hello") }.start()
不想每次都創建新的子線程
如果有大量的異步任務,不想每次都創建子線程。有沒有什麼把子線程統一管理的方法?
遇到這樣的情況,我們可以考慮線程池。線程池解決兩個問題:需要執行大量異步任務的時候,減輕每個異步任務的調用開銷,提高性能。另外它還能夠限制和管理子線程。每個ThreadPoolExecutor都維護了一些統計數據,例如已執行的任務數量。
有大量異步任務的時候,可以考慮使用線程池。
預置線程池
代碼參考 Android API 29
ThreadPoolExecutor提供了很多參數,方便開發者調控。線程池的設計者建議開發者使用以下幾個工廠方法,Android中主要有5種
newCachedThreadPool()
不限制數量的線程池,能自動回收線程newFixedThreadPool(int nThreads)
固定數量的線程池newSingleThreadExecutor()
單一的子線程newScheduledThreadPool(int corePoolSize)
能執行延時任務或者周期性任務newWorkStealingPool()
工作竊取線程池
實際上我們在Android Studio里輸入Executors.new
的時候,會跳出很多個提示選項。
Executors.new 的智能提示
可緩存線程池
用Executors.newCachedThreadPool
獲得一個可緩存線程池對象,然後讓它執行任務。
val tp: ExecutorService = Executors.newCachedThreadPool()
tp.submit { Log.d(TAG, "rustfisher: cached線程池執行任務 3") }
可緩存線程池會在需要的時候創建新的子線程。當原有的線程可用的時候,會復用現有線程。
這個機制適用於執行多個短期異步任務。任務比較小,但是數量大。
調用execute
方法會先嘗試復用已有的可用線程。如果當前沒有線程,會新建一個線程並把它添加到池裡。
超過60秒沒有使用的線程會被停止並移除。因此即便長時間不用這個線程池,也不會造成多大的開銷。
定長線程池
使用newFixedThreadPool(int nThreads)
示例
val fixedTp: ExecutorService = Executors.newFixedThreadPool(4)
fixedTp.submit { Log.d(TAG, "rustfisher 定長線程池執行任務") }
靜態方法里傳入了一個int參數nThreads
,表示最大線程數量。
如果當前所有線程都在忙,又有新的任務添加進來。那麼任務會在隊列中等待,直到有可用的線程來處理任務。
如果有的線程遇到錯誤而停止了,要執行任務的話,會創建新的線程補上位置。
池裡的線程會一直存活,直到線程池停止(ExecutorService#shutdown
)。
單一線程池
val singleTp: ExecutorService = Executors.newSingleThreadExecutor()
singleTp.submit { Log.d(TAG, "單一線程池執行任務") }
只擁有1個子線程。任務隊列不限制任務數量。如果線程遇到問題停止了,接下來又要處理任務時,會新建一個線程來處理。
它能保證任務會按順序處理,同一時間只能處理1個任務。
單一線程池創建後,不能動態修改線程數量。不像newFixedThreadPool(1)
的定長線程池可以修改線程數。
計劃任務線程池
val scheduleTp: ScheduledExecutorService = Executors.newScheduledThreadPool(3)
計劃任務線程池能夠執行延遲任務和周期任務。
延遲任務
需要設定延時與時間單位
scheduleTp.schedule({ Log.d(TAG, "計劃任務1 runnable") }, 300, TimeUnit.MILLISECONDS)
scheduleTp.schedule(Callable { Log.d(TAG, "計劃任務2 callable") }, 400, TimeUnit.MILLISECONDS)
周期任務
主要涉及到2個方法scheduleAtFixedRate
和scheduleWithFixedDelay
。
假設任務時間小於周期時間,則按給定周期時間來進行。這兩個方法表現一致。
假設任務執行時間大於周期時間,這兩個方法有點不同
scheduleAtFixedRate
執行完上一個任務後,用時超過了周期時間,會立刻執行下一個任務。scheduleWithFixedDelay
在上一個任務執行完畢後,還會等待周期時間,再去執行下一個任務。
工作竊取線程池
Android SDK 大於等於24,有一種新的線程池,暫且稱為「工作竊取線程池」,或者叫「靈活調度線程池」。
if (android.os.Build.VERSION.SDK_INT >= android.os.Build.VERSION_CODES.N) {
Executors.newWorkStealingPool()
}
線程池維護足夠的線程來支持給定的並行度(parallelism level),可能會用多個隊列來減少爭用。
並行度對應的是活躍的線程最大數,或者能處理任務的線程最大數。
線程的實際數量可能會動態增減。工作竊取線程池不保證按提交順序來處理任務。
執行任務
執行任務的時候可以傳入Runnable和Callable,前面用的都是Runnable。
用Callable的例子
tp.submit(Callable { "OK" })
無返回值任務的調用
無返回值任務用Callable和Runnable都行。
val tp: ExecutorService = Executors.newCachedThreadPool()
tp.submit { Log.d(TAG, "rustfisher: cached線程池submit runnable") }
tp.execute { Log.d(TAG, "rustfisher: cached線程池execute runnable") }
tp.submit(Callable { Log.d(TAG, "rustfisher: cached線程池submit callable") })
tp.shutdown() // 最後記得用完後停掉線程池
有返回值任務的調用
有返回值的任務需要Callable接口。
submit
調用submit
方法時會返回一個Future對象。通過Future的get()
方法可拿到返回值。這裡需要注意get()
是阻塞的,完成任務後,能拿到返回值。
val tp: ExecutorService = Executors.newCachedThreadPool()
val future = tp.submit(Callable {
return@Callable "callable的返回值"
})
Log.d(TAG, "future get之前 isDone: ${future.isDone}, isCancelled: ${future.isCancelled}")
val res = future.get()
Log.d(TAG, "future get之後 isDone: ${future.isDone}, isCancelled: ${future.isCancelled}")
Log.d(TAG, "future get: $res")
運行log
future get之前 isDone: false, isCancelled: false
future get之後 isDone: true, isCancelled: false
future get: callable的返回值
invokeAll
對於列表裡的任務,可以使用invokeAll(Collection<? extends Callable<T>> tasks)
,返回一個Future的列表。
作為對比,給其中一個任務加上延時。
invokeAll示例
val tp: ExecutorService = Executors.newFixedThreadPool(5)
val callList = arrayListOf<Callable<String>>(
Callable {
Log.d(TAG, "task1 ${Thread.currentThread()}")
return@Callable "rust"
},
Callable {
Log.d(TAG, "task2 ${Thread.currentThread()}")
Thread.sleep(1500) // 加上延時
return@Callable "fisher"
},
Callable {
Log.d(TAG, "task3 ${Thread.currentThread()}")
return@Callable "列表裏面的任務"
},
)
Log.d(TAG, "invokeAll 準備提交任務")
val futureList = tp.invokeAll(callList)
Log.d(TAG, "invokeAll 已提交任務")
futureList.forEach { f ->
Log.d(TAG, "任務列表執行結果 ${f.get()}") // 這裡會阻塞 別在ui線程里get
}
運行log,可以看到提交任務後,經過延時,拿到了運行結果。注意看invokeAll
前後的時間。invokeAll
會阻塞當前線程。使用的時候必須小心,不要在ui線程中調用。
2021-09-11 14:40:07.062 16914-16914/com.rustfisher.tutorial2020 D/rfDevTp: invokeAll 準備提交任務
2021-09-11 14:40:07.063 16914-19230/com.rustfisher.tutorial2020 D/rfDevTp: task1 Thread[pool-4-thread-1,5,main]
2021-09-11 14:40:07.063 16914-19231/com.rustfisher.tutorial2020 D/rfDevTp: task2 Thread[pool-4-thread-2,5,main]
2021-09-11 14:40:07.063 16914-19232/com.rustfisher.tutorial2020 D/rfDevTp: task3 Thread[pool-4-thread-3,5,main]
2021-09-11 14:40:08.563 16914-16914/com.rustfisher.tutorial2020 D/rfDevTp: invokeAll 已提交任務
2021-09-11 14:40:08.563 16914-16914/com.rustfisher.tutorial2020 D/rfDevTp: 任務列表執行結果 rust
2021-09-11 14:40:08.563 16914-16914/com.rustfisher.tutorial2020 D/rfDevTp: 任務列表執行結果 fisher
2021-09-11 14:40:08.563 16914-16914/com.rustfisher.tutorial2020 D/rfDevTp: 任務列表執行結果 列表裏面的任務
提交了3個任務,在3個不同的子線程中執行。
invokeAny
invokeAny(Collection<? extends Callable<T>> tasks)
也是接收Callable集合。
然後返回最先執行結束的任務的值,其它未完成的任務將被正常取消掉不會有異常。
invokeAny示例
val tp: ExecutorService = Executors.newCachedThreadPool()
val callList = arrayListOf<Callable<String>>(
Callable {
Thread.sleep(1000) // 設計延時
return@Callable "rust"
},
Callable {
Thread.sleep(400)
return@Callable "fisher"
},
Callable {
Thread.sleep(2000)
return@Callable "列表裏面的任務"
},
)
Log.d(TAG, "invokeAny 提交任務")
val res = tp.invokeAny(callList)
Log.d(TAG, "執行結果 $res")
2021-09-11 14:04:55.253 14066-14066/com.rustfisher.tutorial2020 D/rfDevTp: invokeAny 提交任務
2021-09-11 14:04:55.654 14066-14066/com.rustfisher.tutorial2020 D/rfDevTp: 執行結果 fisher
觀察log可以看到,最後執行的是「fisher」這個任務。
停止線程池
使用完畢後,記得終止線程池
/*ExecutorService*/ shutdown()
shutdownNow()
shutdown()
在已提交的任務後面創建一個停止命令,並且不再接受新的任務。如果線程池已經停止了,調用這個方法將不生效。
shutdownNow()
方法嘗試停止所有執行中的任務,停下等待中的任務。並且返回等待執行的任務列表List<Runnable>
。