java多線程並發執行demo,主線程阻塞
其中有四個知識點我單獨羅列了出來,屬於多線程編程中需要知道的知識:
知識點1:X,T為泛型,為什麼要用泛型,泛型和Object的區別請看://www.cnblogs.com/xiaoxiong2015/p/12705815.html
知識點2:線程池://www.cnblogs.com/xiaoxiong2015/p/12706153.html
知識點3:隊列:@author Doug Lea //www.cnblogs.com/xiaoxiong2015/p/12825636.html
知識點4:計數器,還是並發包大神 @author Doug Lea 編寫。是一個原子安全的計數器,可以利用它實現發令槍
Doug Lea真是大神,編程不識Doug Lea,寫盡Java也枉然,concurrent包點進去,都是他寫的。可能是需要什麼東西就寫了吧,信手拈來的感覺。
主類:MultiThread,執行並發類
package java8test; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; /** * @param <H> 為被處理的數據類型 * @param <T>返回數據類型 * 知識點1:X,T為泛型,為什麼要用泛型,泛型和Object的區別請看://www.cnblogs.com/xiaoxiong2015/p/12705815.html */ public abstract class MultiThread<X, T> { public static int i = 0; // 知識點2:線程池://www.cnblogs.com/xiaoxiong2015/p/12706153.html private final ExecutorService exec; // 線程池 // 知識點3:@author Doung Lea 隊列://www.cnblogs.com/xiaoxiong2015/p/12825636.html private final BlockingQueue<Future<T>> queue = new LinkedBlockingQueue<>(); // 知識點4:計數器,還是並發包大神 @author Doug Lea 編寫。是一個原子安全的計數器,可以利用它實現發令槍 private final CountDownLatch startLock = new CountDownLatch(1); // 啟動門,當所有線程就緒時調用countDown private final CountDownLatch endLock; // 結束門 private final List<X> listData;// 被處理的數據 /** * @param list list.size()為多少個線程處理,list裏面的H為被處理的數據 */ public MultiThread(List<X> list) { if (list != null && list.size() > 0) { this.listData = list; exec = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); // 創建線程池,線程池共有nThread個線程 endLock = new CountDownLatch(list.size()); // 設置結束門計數器,當一個線程結束時調用countDown } else { listData = null; exec = null; endLock = null; } } /** * * @return 獲取每個線程處理結速的數組 * @throws InterruptedException * @throws ExecutionException */ public List<T> getResult() throws InterruptedException, ExecutionException { List<T> resultList = new ArrayList<>(); if (listData != null && listData.size() > 0) { int nThread = listData.size(); // 線程數量 for (int i = 0; i < nThread; i++) { X data = listData.get(i); Future<T> future = exec.submit(new Task(i, data) { @Override public T execute(int currentThread, X data) { return outExecute(currentThread, data); } }); // 將任務提交到線程池 queue.add(future); // 將Future實例添加至隊列 } startLock.countDown(); // 所有任務添加完畢,啟動門計數器減1,這時計數器為0,所有添加的任務開始執行 endLock.await(); // 主線程阻塞,直到所有線程執行完成 for (Future<T> future : queue) { resultList.add(future.get()); } exec.shutdown(); // 關閉線程池 } return resultList; } /** * 每一個線程執行的功能,需要調用者來實現 * @param currentThread 線程號 * @param data 每個線程被處理的數據 * @return T返回對象 */ public abstract T outExecute(int currentThread, X data); /** * 線程類 */ private abstract class Task implements Callable<T> { private int currentThread;// 當前線程號 private X data; public Task(int currentThread, X data) { this.currentThread = currentThread; this.data = data; } @Override public T call() throws Exception { // startLock.await(); // 線程啟動後調用await,當前線程阻塞,只有啟動門計數器為0時當前線程才會往下執行 T t = null; try { t = execute(currentThread, data); } finally { endLock.countDown(); // 線程執行完畢,結束門計數器減1 } return t; } /** * 每一個線程執行的功能 * @param currentThread 線程號 * @param data 每個線程被處理的數據 * @return T返回對象 */ public abstract T execute(int currentThread, X data); } }
結果類:ResultVO,保存返回結果,根據實際情況替換成自己的
package java8test; public class ResultVo { int i; public ResultVo(int i) { this.i = i; } public ResultVo() { // TODO Auto-generated constructor stub } }
參數類:ParamVO,傳入參數類,根據實際情況替換成自己的
package java8test; public class ParamVo { private int i; ParamVo(int i) { this.i = i; } public int getI() { return i; } @Override public String toString() { return String.valueOf(i) + " " + hashCode(); } }
測試類:new兩個MultiThread,可以看到MultiThread這個類不存在線程安全問題。
package java8test; import java.util.ArrayList; import java.util.List; public class Test { public static void main(String[] args) { try { List<ParamVo> splitList = new ArrayList<ParamVo>(); for (int i = 0; i < 100; i++) { splitList.add(new ParamVo(i)); } List<ParamVo> splitList1 = new ArrayList<ParamVo>(); for (int i = 200; i < 300; i++) { splitList1.add(new ParamVo(i)); } MultiThread<ParamVo, ResultVo> multiThread = new MultiThread<ParamVo, ResultVo>(splitList) { @Override public ResultVo outExecute(int currentThread, ParamVo data) { System.out.println("當前線程名稱:" + Thread.currentThread().getName() + "當前線程號=" + currentThread + " data=" + data); i--; return new ResultVo(data.getI()); } }; MultiThread<ParamVo, ResultVo> multiThread1 = new MultiThread<ParamVo, ResultVo>(splitList1) { @Override public ResultVo outExecute(int currentThread, ParamVo data) { System.out.println("當前線程名稱:" + Thread.currentThread().getName() + "當前線程號=" + currentThread + " data=" + data); i--; return new ResultVo(data.getI()); } }; List<ResultVo> list = multiThread.getResult(); List<ResultVo> list1 = multiThread1.getResult(); // 獲取每一批次處理結果 System.out.println("獲取處理結果........................"); for (ResultVo vo : list) { System.out.println(vo.i); } System.out.println("獲取1處理結果........................"); for (ResultVo vo : list1) { System.out.println(vo.i); } } catch (Exception e) { e.printStackTrace(); } } }
這個類也用在了生產當中,用來並發插入數據。但是事務不能被管控,需要自己保證最終事務一致。需要注意。