Java多線程總結

  • 2020 年 1 月 21 日
  • 筆記

線程和進程

對於進程和線程的概念可以簡單的理解成一個包含關係,例如:一個人個體可以稱為社會的一個進程,人可以同時做很多事情,這個稱之為線程

CPU一次只能執行一個指令,操作系統為了保證同一時刻多個程序同時執行, 把每次執行的指令過程分成若干時間片(timeslice),每一個程序都會在指定的時間片上運行一段時間後,然後保存運行的上下文資源,來保證下次執行。

由於進程對於資源的需求比較多,保存和恢復都會需要很多時間,CPU每次執行的單位都是線程

所以單核的CPU的執行其實本質都是單線程.

例如我們同時運行A、B、C三個程序:

疑問:如果是多線程本質還是單線程執行為什麼我們還要使用多線程?

因為在程序執行的過程的中,CPU的執行速度大於內存,也遠遠大於磁盤IO的運算,如果一個程序CPU執行完成後,要等待磁盤和內存的讀取。在等待期間,CPU處於空閑的狀態,這樣就導致的資源的浪費。

多線程的引入是在CPU存在空閑的時間片的時候,能夠有指令被執行,不必再等待其他的執行。

疑問: 如何控制線程的執行先後?

CPU的實行被劃分成時間片來執行,所以線程能否被調度,本質是能否搶到時間片。

既然是搶時間片,就存在隨機性,所以線程本身的調度時間我們無法完全控制。(可以採用讓出時間片來控制,但也不是根本上解決調度順序)

Java中的線程


Thread使用

在Java中,使用Thread來創建線程,使用start的方法來啟動線程(此處並不是真正的啟動)。我們可以簡單的使用:

public static void main(String[] args) {       Thread th=new Thread(()->{            System.out.println("a");       });       th.start();         try {            Thread.currentThread().join();       } catch (InterruptedException e) {            e.printStackTrace();       }  }

Thread中有stop,interrupt,join等方法。其中stop不推薦使用。interrupt表示中斷線程執行,join等待當前線程執行完成。

Fork/Join

Fork/Join框架是Java7提供的一個用於並行執行任務的框架,利用遞歸把總任務分割成若干個小任務,然後把每個任務的執行結果匯總到總任務

我們使用forkJoin框架計算1000的加和,具體使用代碼:

     private static final Integer MAX = 200;  	static class SumForkJoinTask extends RecursiveTask<Integer> {  		// 子任務開始計算的值  		private Integer startValue;    		// 子任務結束計算的值  		private Integer endValue;    		public SumForkJoinTask(Integer startValue , Integer endValue) {  			this.startValue = startValue;  			this.endValue = endValue;  		}  		@Override  		protected Integer compute() {  			if(endValue - startValue < MAX) {  				System.out.println(String.format("02.執行任務=>start:%s,end:%s",startValue,endValue));  				Integer totalValue = 0;  				for(int index = this.startValue ; index <= this.endValue  ; index++) {  					totalValue += index;  				}  				return totalValue;  			}  			else {  				SumForkJoinTask subTask1 = new SumForkJoinTask(startValue, (startValue + endValue) / 2);  				subTask1.fork();  				SumForkJoinTask subTask2 = new SumForkJoinTask((startValue + endValue) / 2 + 1 , endValue);  				subTask2.fork();  				System.out.println(String.format("01.拆分任務=>start:%s,end:%s",startValue,endValue));  				return subTask1.join() + subTask2.join();  			}  		}  	}  	public static void main(String[] args) {  		ForkJoinPool pool = new ForkJoinPool();  		ForkJoinTask<Integer> taskFuture =  pool.submit(new SumForkJoinTask(1,1001));  		try {  			Integer result = taskFuture.get();  			System.out.println("result = " + result);  		} catch (InterruptedException | ExecutionException e) {  			e.printStackTrace(System.out);  		}  	}

打印結果:

01.拆分任務=>start:1,end:1001  01.拆分任務=>start:1,end:251  02.執行任務=>start:1,end:126  02.執行任務=>start:127,end:251  01.拆分任務=>start:252,end:501  02.執行任務=>start:252,end:376  02.執行任務=>start:377,end:501  01.拆分任務=>start:502,end:1001  01.拆分任務=>start:502,end:751  01.拆分任務=>start:1,end:501  01.拆分任務=>start:752,end:1001  02.執行任務=>start:502,end:626  02.執行任務=>start:752,end:876  02.執行任務=>start:627,end:751  02.執行任務=>start:877,end:1001  result = 501501

wait和notity

waitnotify是線程的阻塞和通知,可以實現線程間的通信。具體的流程圖如下:

具體使用代碼如下:

 public class LockWait {    	static volatile List<String> itemContainer = new ArrayList<>();  	static Object obj = new Object();    	public static void main(String[] args) {  		Thread th1 = new Thread(() -> {  			synchronized (obj) {  				for (int i = 0; i < 10; i++) {  					System.out.println("th1添加元素");  					itemContainer.add(String.valueOf(i));  					if (itemContainer.size() == 5) {  						System.out.println("th1線程發出通知");  						obj.notify();  					}  				}  			}  		});    		Thread th2 = new Thread(() -> {  			synchronized (obj) {  				System.out.println("進入th2線程");  				if (itemContainer.size() != 5) {  					try {  						System.out.println("th2線程開始等待");  						obj.wait();  						System.out.println("th2線程等待結束");  					} catch (InterruptedException e) {  						e.printStackTrace();  					}  					System.out.println("th2線程結束");  				}  			}    		});    		th2.start();  		th1.start();  	}  }

具體可以查看Java多線程通信lock和wait

waitnotify釋放鎖的情況,wait 不釋放鎖,notify釋放鎖。

線程池

創建和銷毀線程需要耗費CPU的資源,為了不必要的浪費,可以把線程進行池化管理,這就是線程池。

在Java中有四個類型的線程池,分別是:

newFixedThreadPool:初始化一個固定線程數的線程池,即使沒有任務線程也會駐留在內存中。

newCachedThreadPool: 初始化一個緩存線程池,不控制線程數據量,當沒有任務執行的,超時會自動釋放。在使用時,要注意線程數量和創建線程的開銷。

newSingleThreadExecutor:初始化只有一個線程的線程池, 如果該線程異常結束,會重新創建一個新的線程繼續執行任務,唯一的線程可以保證所提交任務的順序執行。

newScheduledThreadPool: 初始化的線程池可以在指定的時間內周期性的執行所提交的任務。

下面newFixedThreadPool使用的方法,其他的用法類似。

ExecutorService executorService = Executors.newFixedThreadPool(10);       executorService.execute(()->{         });

Future和FutureTask的使用

Future是一個interface,FutureTask是其中的一個實現類, 多用於耗時的計算,主線程可以在完成自己的任務後,再去獲取結果。

具體使用方法:

public class FutureTaskObj {  	public static void main(String[] args) {  		TaskObj task = new TaskObj();  		FutureTask<Integer> futureTask = new FutureTask<Integer>(task);  		Thread thread = new Thread(futureTask);  		thread.start();  		System.out.println("創建Task完成");  		System.out.println("主線程繼續執行");    		try {  			System.out.println("運行結果" + futureTask.get());  		} catch (InterruptedException e) {  			e.printStackTrace();  		} catch (ExecutionException e) {  			e.printStackTrace();  		}    		System.out.println("所有任務執行完畢");  	}  }    class TaskObj implements Callable<Integer> {  	@Override  	public Integer call() throws Exception {  		System.out.println("子線程在進行計算");  		Thread.sleep(1000);  		int sum = 0;  		for (int i = 0; i < 100; i++)  			sum += i;  		return sum;  	}  }

Future也可以使用線程池的方法啟動,具體代碼如下:

ExecutorService executor = Executors.newCachedThreadPool();  TaskObj task = new TaskObj();  FutureTask<Integer> futureTask = new FutureTask<Integer>(task);  executor.submit(futureTask);  executor.shutdown();

其他幾個類的使用


CountDownLatch

CountDownLatch 是等待線程執行完,在進行執行,具體的執行邏輯:

具體的執行代碼:

public static void main(String[] args) {       try {            CountDownLatch countDownLatch = new CountDownLatch(5);              for (int i = 0; i < 5; i++) {                 int finalI = i;                 Thread th=new Thread(()->{                      System.out.println(String.format("執行第:%s個線程",finalI));                      countDownLatch.countDown();                 });                 th.start();            }              countDownLatch.await();            System.out.println("執行完成");       } catch (InterruptedException e) {            e.printStackTrace();       }  }

打印結果:

執行第:0個線程  執行第:1個線程  執行第:3個線程  執行第:2個線程  執行第:4個線程  執行完成

CyclicBarrier

CyclicBarrier柵欄的意思,線程數達到某個值時,再繼續執行。

具體代碼:

public static void main(String[] args) {  	try {  			CyclicBarrier cyclicBarrier = new CyclicBarrier(3);  			for (int i = 0; i < 3; i++) {  				int finalI = i;  				Thread th = new Thread(() -> {  					try {  						System.out.println(String.format("執行第%s個線程", finalI));  						cyclicBarrier.await();  						System.out.println(String.format("第%s個線程執行完成", finalI));  					} catch (InterruptedException e) {  						e.printStackTrace();  					} catch (BrokenBarrierException e) {  						e.printStackTrace();  					}  				});  				th.start();  			}  		} catch (Exception e) {  			e.printStackTrace();  		}  }

打印結果是:

執行第0個線程  執行第1個線程  執行第2個線程  第0個線程執行完成  第2個線程執行完成  第1個線程執行完成

Semaphore

Semaphore 稱為信號量,是指定幾個數量線程通過。

具體代碼如下:

public static void main(String[] args) {       Semaphore semaphore = new Semaphore(2);       for (int i = 0; i < 10; i++) {            try {                 System.out.println(String.format("獲得第%s個許可", i));                 semaphore.acquire();                 System.out.println(String.format("第%s個許可獲得成功", i));                 int finalI = i;                 Thread th = new Thread(() -> {                      System.out.println(String.format("執行第%s個線程", finalI));                      semaphore.release();                      System.out.println(String.format("第%s個線程執行完成", finalI));                 });                 th.start();            } catch (InterruptedException e) {                 e.printStackTrace();            }       }  }

執行打印結果:

獲得第0個許可  第0個許可獲得成功  獲得第1個許可  第1個許可獲得成功  獲得第2個許可  執行第0個線程  第0個線程執行完成  第2個許可獲得成功  獲得第3個許可  執行第1個線程  第1個線程執行完成  ....

分析上面的結果,可以發現只有兩個線程同時執行,等一個線程釋放了,另一個線程才能執行完成。

LockSupport

LockSupportSemaphore類似,相當於只有一個許可的信號量Semaphore semaphore = new Semaphore(1),具體的實現邏輯:

public static void main(String[] args) {       for (int i = 0; i < 10; i++) {            try {                 int finalI = i;                 Thread th = new Thread(() -> {                      System.out.println(String.format("執行第%s個線程", finalI));                      LockSupport.park();                      System.out.println(String.format("第%s個線程執行完成", finalI));                 });                 th.start();                 Thread.sleep(1000);                 LockSupport.unpark(th);                 Thread.sleep(1000);                 System.out.println("主線程執行完成");            } catch (Exception e) {                 e.printStackTrace();            }       }  }

打印結果:

執行第0個線程  第0個線程執行完成  主線程執行完成  執行第1個線程  第1個線程執行完成  主線程執行完成  執行第2個線程  第2個線程執行完成