多執行緒編程(1)
- 2019 年 10 月 21 日
- 筆記
1. 進程與執行緒
通常,一個任務就是一個進程(Process),比如打開一個瀏覽器就是啟動一個瀏覽器進程,打開一個Word就啟動了一個Word進程。大多時候一個進程需要同時干很多件事情,比如Word,它可以同時進行打字、拼寫檢查、列印等事情。在一個進程內部,要同時干多件事,就需要同時運行多個“子任務”,我們把進程內的這些“子任務”稱為執行緒(Thread)。即一個程式至少有一個進程,一個進程至少有一個執行緒。
我們大學學作業系統的時候,都知道進程是資源分配的基本單位,執行緒是執行和調度的基本單位,執行緒本身不擁有資源,資源來自於它的進程。
執行緒分為兩種:用戶執行緒和守護執行緒。守護 執行緒,是指在程式運行的時候在後台提供一種通用服務的執行緒,比如垃圾回收執行緒就是一個很稱職的守護者,並且這種執行緒並不屬於程式中不可或缺的部分。守護執行緒是用來服務用戶執行緒的,一旦用戶執行緒全部運行結束,程式會終止,守護執行緒也會隨之退出。
在進入多執行緒之前,可以先看看執行緒的幾種狀態:
- 1. 新建(NEW):新創建了一個執行緒對象。
- 2. 就緒(RUNNABLE或READY) :執行緒正在參與競爭CPU的使用權。
- 3. 運行(RUNNING):執行緒取到了CPU的使用權,正在執行。
- 4. 阻塞(BLOCKED):阻塞狀態是執行緒因為某種原因放棄CPU使用權,暫時停止運行。直到滿足條件(比如超時等待、喚醒)時,該執行緒重新回到就緒態,參與競爭CPU使用權。
- 5. 等待(WAITING):執行緒無限等待某個對象的鎖,或等待另一個執行緒結束的狀態。
- 6. 計時等待(TIME_WAITING):執行緒在某一段時間內等待某個對象的“鎖”,或者主動休眠,亦或者等待一個執行緒結束,除非被中斷,時間一到,馬上回到就緒狀態,被中斷的方法則拋出異常。
- 7. 終止(Terminated):即執行緒終止(執行緒的的程式碼被執行完畢)和執行過程出現異常或者被外界強制中斷。
狀態的轉換的具體轉換如下圖所示:
2. Thread類和Runnable介面
通過繼承Thread類,實run方法即可實現一個執行緒類,常用的API如下:
方法描述 | |
---|---|
start() | 從新建狀態轉化為就緒狀態,開始參與CPU使用權的競爭。 |
run() | 直接調用該 Runnable 對象的 run 方法時直接取得CPU的使用權 |
interrupt() | 中斷執行緒。在程式程式碼中搭配while (!Thread.interrupted()){..}使用。 |
isDaemon() | 判斷當前執行緒是否是守護執行緒。 |
setDaemon(boolean true) | 將當前執行緒設置為守護執行緒,必須在調用start()之後才有效。 |
setPriority(int priority) | 更改執行緒的優先順序。 |
interrupt() | 中斷執行緒。 |
isAlive() | 測試執行緒是否處於活動狀態。 |
join(long millisec) | 等待該執行緒終止的時間最長為 millis 毫秒。 |
Thread.yield() | 暫停當前正在執行的執行緒對象(讓出當前執行緒的CPU,轉為就緒狀態),並執行其他執行緒。 |
Thread.currentThread() | 返回對當前正在執行的執行緒對象的引用。 |
Thread.sleep(long millisec) | 在指定的毫秒數內讓當前正在執行的執行緒休眠(暫停執行),此操作受到系統計時器和調度程式精度和準確性的影響。 |
由於java中的類是單繼承的,而介面可以多繼承。一個類實現多個介面的情況,因為介面只有抽象方法,具體方法只能由實現介面的類實現,在調用的時候始終只會調用實現類的方法(不存在歧義),因此在開發中通常使用Runnable。
public class Thread1 implements Runnable { @Override public void run() { System.out.println("iii"); } public static void main(String[] args) { Thread1 rt = new Thread1(); Thread t = new Thread(rt); t.start(); } }
3. 執行緒池
當執行緒的在某一時刻大量的創建與銷毀會消耗很多資源,我們可以提前創建好一些執行緒,將他們集中管理起來,形成一個執行緒池,需要使用的時候直接拿過來用,使用完後,放回執行緒池。
Executor框架
在 java.util.cocurrent 包下,通過該框架來控制執行緒的啟動、執行和關閉,可以簡化並發編程的操作,由Executors類的五個靜態工廠方法創建,其常用方法如下。
3.1 執行緒池的創建
-
newFixedThreadPool:創建固定大小的執行緒池。執行緒池的大小一旦達到最大值就會保持不變,如果某個執行緒因為執行異常而結束,那麼執行緒池會補充一個新執行緒。
public class Executors { /* 函數功能:創建一個固定長度的的執行緒池,用於保存任務的阻塞隊列為無限制長度的LinkedBlockingQueue。 執行緒池中的執行緒將會一直存在除非執行緒池shutdown,即執行緒池中的執行緒沒有受到存活時間的限制。 */ public static ExecutorService newFixedThreadPool(int nThreads) { /* 參數一 核心執行緒數大小(最小執行緒數),當執行緒數 < 參數一 ,會創建執行緒執行 runnable * 參數二 最大執行緒數, 當執行緒數 >= 參數二,會把runnable放入workQueue(參數5)中 * 參數三 保持存活時間,空閑執行緒能保持的最大時間。 * 參數四 時間單位 * 參數五 保存任務的阻塞隊列 *public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueu */ return new ThreadPoolExecutor(nThreads1, nThreads2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } //... } ExecutorService es = Executors.newFixedThreadPool(20); //如果執行緒池中執行緒數過大或過小,都會影響性能
-
newCachedThreadPool:創建一個可快取空閑執行緒60秒的執行緒池。如果執行緒池的大小超過了處理任務所需要的執行緒,那麼就會回收部分空閑(60秒不執行任務)的執行緒,當任務數增加時,此執行緒池又可以智慧的添加新執行緒來處理任務。
public class Executors { public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } //... } ExecutorService es = Executors.newCachedThreadPool(); //缺點是在訪問量突然很大的時候,會創建大量執行緒
-
創建一個單執行緒的執行緒池。這個執行緒池只有一個執行緒在工作,也就是相當於單執行緒串列執行所有任務。如果這個唯一的執行緒因為異常結束,那麼會有一個新的執行緒來替代它。
ExecutorService es = Executors.newSingleThreadExecutor(); //等同於 ExecutorService es = Executors.newFixedThreadPool(1);
-
newScheduledThreadPool:創建一個大小無限的執行緒池。此執行緒池支援定時以及周期性執行任務的需求。
public static void main(String[] args) { ScheduledExecutorService ses = Executors.newScheduledThreadPool(1); ses.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(new Date()); } }, 1000/*第一個周期開始的時間*/, 2000/*每個周期間隔的時間*/, TimeUnit.MILLISECONDS); }
-
newSingleThreadScheduledExecutor:創建一個單執行緒的執行緒池。此執行緒池支援定時以及周期性執行任務的需求。
3.2 執行緒池中執行緒的使用
通過Executors類去獲得的執行緒池都實現了ExecutorService這個介面。可以調用execute()或者submit()方法把相應的任務提交到執行緒池中去。
1. execute(Runnable): 這個方法接收一個Runnable實例,並且非同步的執行。
public static void main(String[] args) { ExecutorService es = Executors.newCachedThreadPool(); // Future future = es.execute(new Runnable() { @Override public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("run the thread."); } }); System.out.println("over"); } /* output: * over * run the thread. */
2. submit(Runnable): submit(Runnable)
和execute(Runnable)
區別是前者可以返回一個Future對象,通過返回的Future對象,我們可以檢查提交的任務是否執行完畢。
public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService es = Executors.newCachedThreadPool(); Future future = es.submit(new Runnable() { @Override public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("run the thread."); } }); future.get(); //future.get()方法會產生阻塞,直到上面的執行緒完成,即等待一秒鐘 System.out.println("over"); } /* output: * run the thread. * over */
3. submit(Callable): submit(Callable)
和submit(Runnable)
類似,也會返回一個Future對象,但是參數Callable類中的call方法可以返回一個值,而Runable不行。
public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService es = Executors.newCachedThreadPool(); Future future = es.submit(new Callable() { @Override public Object call() throws Exception { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "run the thread."; } }); String rs = (String)future.get(); //future.get()方法會產生阻塞 System.out.println("over and " + rs); } /* output: * over and run the thread. */
4. invokeAny(Collection<? extends Callable<T>> tasks>): 方法輸入接受一個Callable集合類型的參數,啟動多個執行緒相互獨立的去執行對應執行緒的任務,一旦有一個執行緒執行完畢,則返回,同時其他執行緒終止。
public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService es = Executors.newFixedThreadPool(3); Set<Callable<String>> callables = new HashSet<Callable<String>>(); callables.add(new Callable<String>() { @Override public String call() throws Exception { Thread.sleep(2000); return " first task"; } }); callables.add(new Callable<String>() { @Override public String call() throws Exception { Thread.sleep(1000); return " second task"; } }); callables.add(new Callable<String>() { @Override public String call() throws Exception { Thread.sleep(3000); return " third task"; } }); String rs = es.invokeAny(callables); System.out.println(rs); } /* output * second task */
5. invokeAll(Collection<? extends Callable<T>> tasks>): 該方法則會並行的執行Callable集合類型的所有方法。
public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService es = Executors.newFixedThreadPool(3); Set<Callable<String>> callables = new HashSet<Callable<String>>(); callables.add(new Callable<String>() { @Override public String call() throws Exception { Thread.sleep(2000); return " first task"; } }); callables.add(new Callable<String>() { @Override public String call() throws Exception { Thread.sleep(1000); return " second task"; } }); callables.add(new Callable<String>() { @Override public String call() throws Exception { Thread.sleep(3000); return " third task"; } }); List<Future<String>> list= es.invokeAll(callables); for (Future<String> future:list) { String s = future.get(); System.out.println(s); } }
6. shotdown(): 調用該方法後,關閉執行緒池,已提交的方法會繼續執行,執行結束後,執行緒池全部關閉,該方法是一個非同步方法,一旦調用,立即返回。
7.shotdownNow(): 調用該方法後,關閉執行緒池,已提交的方法也會被取消,執行緒池立即全部關閉,該方法是一個非同步方法,一旦調用,立即返回。
8. awaitTermination(timeout,unit): 調用該方法阻塞當前執行緒,使得執行緒池中的執行緒執行完畢,最長等待時間為timeout,此方法需要在調用shotdown/shotdownNow後才有效。
public class ThreadSafe implements Runnable { private static int count = 0; @Override public void run() { for (int i = 0; i < 10; i++) { count++; } } public static void main(String[] args) throws InterruptedException { ExecutorService es = Executors.newFixedThreadPool(10); for (int i = 0; i < 20; i++) { es.execute(new ThreadSafe()); } es.shutdown(); //不允許添加執行緒,非同步關閉連接池 es.awaitTermination(10L, TimeUnit.SECONDS); //等待連接池的執行緒任務完成 System.out.println(count); } } /* output * 200 */
參考文獻
-
龐永華. Java多執行緒與Socket:實戰微服務框架[M].電子工業出版社.2019.3