java多線程並發執行demo,主線程阻塞

其中有四個知識點我單獨羅列了出來,屬於多線程編程中需要知道的知識:

         知識點1:X,T為泛型,為什麼要用泛型,泛型和Object的區別請看://www.cnblogs.com/xiaoxiong2015/p/12705815.html
         知識點2:線程池://www.cnblogs.com/xiaoxiong2015/p/12706153.html
         知識點3:隊列:@author Doug Lea //www.cnblogs.com/xiaoxiong2015/p/12825636.html
         知識點4:計數器,還是並發包大神 @author Doug Lea 編寫。是一個原子安全的計數器,可以利用它實現發令槍
Doug Lea真是大神,編程不識Doug Lea,寫盡Java也枉然,concurrent包點進去,都是他寫的。可能是需要什麼東西就寫了吧,信手拈來的感覺。

主類:MultiThread,執行並發類
複製代碼
package java8test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * @param <H> 為被處理的數據類型
 * @param <T>返回數據類型
 * 知識點1:X,T為泛型,為什麼要用泛型,泛型和Object的區別請看://www.cnblogs.com/xiaoxiong2015/p/12705815.html
 */
public abstract class MultiThread<X, T> {

    public static int i = 0;

    // 知識點2:線程池://www.cnblogs.com/xiaoxiong2015/p/12706153.html
    private final ExecutorService exec; // 線程池

    // 知識點3:@author Doung Lea 隊列://www.cnblogs.com/xiaoxiong2015/p/12825636.html
    private final BlockingQueue<Future<T>> queue = new LinkedBlockingQueue<>();

    // 知識點4:計數器,還是並發包大神 @author Doug Lea 編寫。是一個原子安全的計數器,可以利用它實現發令槍
    private final CountDownLatch startLock = new CountDownLatch(1); // 啟動門,當所有線程就緒時調用countDown

    private final CountDownLatch endLock; // 結束門

    private final List<X> listData;// 被處理的數據

    /**
     * @param list list.size()為多少個線程處理,list裏面的H為被處理的數據
     */
    public MultiThread(List<X> list) {
        if (list != null && list.size() > 0) {
            this.listData = list;
            exec = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); // 創建線程池,線程池共有nThread個線程
            endLock = new CountDownLatch(list.size()); // 設置結束門計數器,當一個線程結束時調用countDown
        } else {
            listData = null;
            exec = null;
            endLock = null;
        }
    }

    /**
     * 
     * @return 獲取每個線程處理結速的數組
     * @throws InterruptedException
     * @throws ExecutionException
     */
    public List<T> getResult() throws InterruptedException, ExecutionException {

        List<T> resultList = new ArrayList<>();
        if (listData != null && listData.size() > 0) {
            int nThread = listData.size(); // 線程數量
            for (int i = 0; i < nThread; i++) {
                X data = listData.get(i);
                Future<T> future = exec.submit(new Task(i, data) {

                    @Override
                    public T execute(int currentThread, X data) {

                        return outExecute(currentThread, data);
                    }
                }); // 將任務提交到線程池
                queue.add(future); // 將Future實例添加至隊列
            }
            startLock.countDown(); // 所有任務添加完畢,啟動門計數器減1,這時計數器為0,所有添加的任務開始執行
            endLock.await(); // 主線程阻塞,直到所有線程執行完成
            for (Future<T> future : queue) {
                resultList.add(future.get());
            }
            exec.shutdown(); // 關閉線程池
        }
        return resultList;
    }

    /**
     * 每一個線程執行的功能,需要調用者來實現
     * @param currentThread 線程號
     * @param data 每個線程被處理的數據
     * @return T返回對象
     */
    public abstract T outExecute(int currentThread, X data);

    /**
     * 線程類
     */
    private abstract class Task implements Callable<T> {

        private int currentThread;// 當前線程號

        private X data;

        public Task(int currentThread, X data) {
            this.currentThread = currentThread;
            this.data = data;
        }

        @Override
        public T call() throws Exception {

            // startLock.await(); // 線程啟動後調用await,當前線程阻塞,只有啟動門計數器為0時當前線程才會往下執行
            T t = null;
            try {
                t = execute(currentThread, data);
            } finally {
                endLock.countDown(); // 線程執行完畢,結束門計數器減1
            }
            return t;
        }

        /**
         * 每一個線程執行的功能
         * @param currentThread 線程號
         * @param data 每個線程被處理的數據
         * @return T返回對象
         */
        public abstract T execute(int currentThread, X data);
    }
}
複製代碼

 

結果類:ResultVO,保存返回結果,根據實際情況替換成自己的
複製代碼
package java8test;

public class ResultVo {

    int i;

    public ResultVo(int i) {
        this.i = i;
    }

    public ResultVo() {
        // TODO Auto-generated constructor stub
    }

}
複製代碼

參數類:ParamVO,傳入參數類,根據實際情況替換成自己的
複製代碼
package java8test;

public class ParamVo {

    private int i;

    ParamVo(int i) {
        this.i = i;
    }

    public int getI() {

        return i;
    }

    @Override
    public String toString() {

        return String.valueOf(i) + "  " + hashCode();
    }
}
複製代碼

測試類:new兩個MultiThread,可以看到MultiThread這個類不存在線程安全問題。
複製代碼
package java8test;

import java.util.ArrayList;
import java.util.List;

public class Test {

    public static void main(String[] args) {

        try {
            List<ParamVo> splitList = new ArrayList<ParamVo>();
            for (int i = 0; i < 100; i++) {
                splitList.add(new ParamVo(i));
            }
            List<ParamVo> splitList1 = new ArrayList<ParamVo>();
            for (int i = 200; i < 300; i++) {
                splitList1.add(new ParamVo(i));
            }
            MultiThread<ParamVo, ResultVo> multiThread = new MultiThread<ParamVo, ResultVo>(splitList) {

                @Override
                public ResultVo outExecute(int currentThread, ParamVo data) {

                    System.out.println("當前線程名稱:" + Thread.currentThread().getName() + "當前線程號=" + currentThread
                            + " data=" + data);
                    i--;
                    return new ResultVo(data.getI());
                }
            };

            MultiThread<ParamVo, ResultVo> multiThread1 = new MultiThread<ParamVo, ResultVo>(splitList1) {

                @Override
                public ResultVo outExecute(int currentThread, ParamVo data) {

                    System.out.println("當前線程名稱:" + Thread.currentThread().getName() + "當前線程號=" + currentThread
                            + " data=" + data);
                    i--;
                    return new ResultVo(data.getI());
                }
            };
            List<ResultVo> list = multiThread.getResult();
            List<ResultVo> list1 = multiThread1.getResult();
            // 獲取每一批次處理結果
            System.out.println("獲取處理結果........................");
            for (ResultVo vo : list) {
                System.out.println(vo.i);
            }
            System.out.println("獲取1處理結果........................");
            for (ResultVo vo : list1) {
                System.out.println(vo.i);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

}
複製代碼

 

 

 

 

 

這個類也用在了生產當中,用來並發插入數據。但是事務不能被管控,需要自己保證最終事務一致。需要注意。

Tags: