­

生產者消費者模型在項目中的實際應用

  • 2019 年 10 月 27 日
  • 筆記

前言

  不知道大家有沒有遇到過這種情況,某個接口的響應時間會隨着請求量的變大而越來越慢,明明CPU已經馬力全開了怎麼還會越來越
慢。又或者是客戶端調用http接口,對於客戶端來說只是一個入庫操作就可以直接返回結果繼續處理了,而你還要比較尷尬的拿着傳過
來的數據做一堆比較耗時的操作,導致這個接口的整體吞吐量上不去。業務耦合、沒有控制業務的處理量導致cup狂飆線程池瘋狂阻塞
是造成問題的元兇,而生產者消費者模型則是這方面的專家了。
  生產者消費者的概念網上有很多,如果有不清楚的朋友可以在網上搜一下,這裡主要說一下生產者消費者模型的作用和在實際項目中
的具體應用。生產者消費者模型的作用有三個:
(1)解耦,這是生產者消費者模型附帶的作用,解耦意味着生產者和消費者之間的聯繫少,聯繫越少越可以獨自發展而不需要收到相互
的制約。
(2)異步,生產者只需要將消息放到隊列中,直接就可以進行其他的操作。而消費者則只負責從消息隊列中取出消息進行後續的邏輯處
理,當然在實際的場景中線程池也可以滿足異步並發的功能,這個也不算是主要作用。
(3)通過平衡生產者的生產能力和消費者的消費能力來提升整個系統的運行效率,這個才是生產者消費者模型最重要的作用。
  生產者消費者模型通過控制消息隊列的大小可以達到流量削峰的效果,可以在自己的系統內部達到一種MQ的效果。當然除了正常的
功能實現外如何保證消息不會丟一定會被百分百消費、在集群的生產環境中怎麼保證宕機的系統任務可以分配到其他健康的系統繼續
消費、整個系統在重啟時如何自動消費上次沒消費完的任務、數據庫的主從不同步會不會對整個模型的健壯性有所影響,這些都是我們
在實際的開發使用中需要考慮的問題。

生產者消費者模型的一種實現

  在開發之前我們先要結合自己項目的業務場景設計出一個臨時任務表,以保證任務的“安全”,然後在開始代碼的編寫,以下代碼里可
能會包含一些偽代碼,但是整體的實現步驟和一些容錯處理都會有所體現,下面看一下具體的實現:

  臨時任務表的大概結構:

@Data  public class TestTempMo {      /**       *       */      private Integer id;        /**       * 待處理業務的Id       */      private Integer logicId;        /**       * 本機ip       */      private String ip;        /**       * 是否塞入任務隊列       */      private Boolean isTask;        /**       * 創建時間       */      private Date createDate;    }  

 

  單例獲取阻塞隊列方法:

public class BlockingQueueUtils {        public static BlockingQueue<TestTempMo> testTaskQueue;        private BlockingQueueUtils() {        }        public BlockingQueue<TestTempMo> getInstance() {          if (Objects.isNull(testTaskQueue)) {              synchronized (this) {                  if (Objects.isNull(testTaskQueue)) {                      int cpuCores = Runtime.getRuntime().availableProcessors();                      testTaskQueue = new ArrayBlockingQueue<>(cpuCores * 10);                  }              }          }          return ocrScanTaskQueue;      }    }  

 

  任務生產者:

/**   * 每台機器只負責自己的任務(負載均衡由Nginx處理)   * Created by lcy on 2019/10/27.   */  @Service  public class TestProducer implements Runnable{        private static Logger LOG = LoggerFactory.getLogger(TestProducer.class);      /** 機器空閑時,定時掃描test_temp表的間隔 */      private static final long SCAN_PERIOD = 1000 * 10;      private BlockingQueue<TestTempMo> testTaskQueue;        @Resource      //臨時任務表的Mapper類      private TestTempMapper testTempMapper;      @Resource      //自定義SQL類      private SelectForMasterMapper selectForMasterMapper;      @Resource      //錯誤日誌記錄類      private LogRecord logRecord;      @Resource      private BlockingQueueUtils blockingQueueUtils;        @PostConstruct      public void init() {          try {              //初始化臨時表狀態(防止機器重啟時有未處理的任務處理不掉)              initTempTaskState();              testTaskQueue = blockingQueueUtils.getInstance();              new Thread(this, "ScanTempProducer").start();          } catch (Throwable e) {              LogUtils.error(LOG, "初始化test生產者線程異常", e);              throw new ExceptionInInitializerError(e);          }        }        @Override      @Transactional(rollbackFor = Throwable.class)      public void run() {          while(true) {              /** 是否還有未執行完的任務 */              boolean hasMoreTask = false;                long start = System.currentTimeMillis();                try {                  List<TestTempMo> taskTempMoList = produceTaskBatch();                    if(CollectionUtils.isNotEmpty(taskTempMoList)) {                      for (TestTempMo taskTempMo : taskTempMoList) {                          //將任務塞入阻塞隊列                          testTaskQueue.put(taskTempMo);                            //改變臨時表狀態,防止重複塞入任務隊列                          taskTempMo.setIsTask(true);                          testTempMapper.updateByPrimaryKeySelective(taskTempMo);                      }                        /** 分頁查詢結果不止一頁,則認為還有更多的任務(強制查詢主庫) */                      Double count = selectForMasterMapper.selectScanTempCount(ExternalOcrConstant.IP);                      if(count > 1) {                          hasMoreTask = true;                      }                  }                } catch (Throwable e) {                  LogUtils.error(LOG, "test生產者線程發生異常", e);                  //記錄錯誤日誌(自定義方法,將錯誤日誌入庫發送郵件方便及時處理問題)                  logRecord.selfDubboThrowableLogRecord(TraceUtils.getTrace(), "TestTempProducer"+"#"+"run", "test系統", (int)(System.currentTimeMillis()-start), e);              }                /** 沒有更多的任務,則休眠一段時間 */              if(!hasMoreTask) {                  waitAMoment();              }          }        }        /**       * 分頁查詢未完成的臨時表信息(根據本機IP和狀態進行查詢)       * @return       */      private List<TestTempMo> produceTaskBatch() {          try {              //這裡使用自定義SQL強制查詢主庫,防止主從不一致(根據id或時間排序保證任務執行順序)              List<TestTempMo> testTempMos = selectForMasterMapper.selectScanTempByPage(ExternalOcrConstant.IP);                return testTempMos;          } catch (Throwable e) {              LogUtils.error(LOG, "獲取優先任務列表異常", e);              throw new BusinessException(TestStatusEnum.SYSTEM_ERROR);          }      }        private void waitAMoment() {          try {              Thread.sleep(SCAN_PERIOD);          } catch (InterruptedException e) {              LogUtils.error(LOG, "生產者線程休眠異常", e);          }      }        /**       * 初始化臨時表狀態(每台機器只負責自己的任務)       */      private void initTempTaskState(){          TestTempExample example = new TestTempExample();          example.createCriteria().andIpEqualTo(ExternalOcrConstant.IP).andIsTaskEqualTo(true);          List<TestTempMo> testTempMos = testTempMapper.selectByExample(example);          //存在遺留數據          if (CollectionUtils.isNotEmpty(testTempMos)){              for (TestTempMo testTempMo : testTempMos) {                  testTempMo.setIsTask(false);                  //將臨時表狀態改為初始狀態                  testTempMapper.updateByPrimaryKeySelective(testTempMo);              }          }      }    }  

 

  任務消費者:

/**   * Created by lcy on 2019/10/27.   */  @Service  public class TestTempConsumer implements Runnable{        private static Logger LOG = LoggerFactory.getLogger(TestTempConsumer.class);      private BlockingQueue<TestTempMo> testTaskQueue;          @Resource     //錯誤日誌記錄類      private LogRecord logRecord;      @Resource      private BlockingQueueUtils blockingQueueUtils;      @Resource      //自定義SQL類      private SelectForMasterMapper selectForMasterMapper;        @PostConstruct      public void init() {          testTaskQueue = blockingQueueUtils.getInstance();          new Thread(this, "TestConsumer").start();      }        @Override      public void run() {          while(true) {              //從阻塞隊列里取出任務(如果沒有任務這裡會阻塞)              TestTempMo taskTempMo = acquireTask();              //使用線程池多線程處理任務              ThreadPoolUtil.TestPool.execute(() -> {                  //具體的消費邏輯                  consume(taskTempMo);                });          }      }        /**       * 從阻塞隊列里取出任務       * @return       */      private TestTempMo acquireTask() {            try {              TestTempMo testTemp = testTaskQueue.take();              return testTemp;          } catch (InterruptedException e) {              /** 簡單記錄異常,無需做特殊處理  */              LogUtils.error(LOG, "從隊列中獲取test任務異常", e);          }            return null;      }        /**       * 消費邏輯(這裡的所有SQl都要強制查詢主庫否則會因為主從延遲而處理失敗)       * @param taskTempMo       */      private void consume(TestTempMo taskTempMo) {          TraceUtils.beginTrace();          long start = System.currentTimeMillis();            try {              LogUtils.info(LOG, "開始處理具體的邏輯");              //開始處理具體的邏輯...              System.out.println("處理完啦");          } catch (Throwable e) {              LogUtils.error(LOG, "處理具體邏輯時發生異常", e);                //記錄錯誤日誌              logRecord.selfDubboThrowableLogRecord(TraceUtils.getTrace(), "TestTempConsumer"+"#"+"consume", "test系統,什麼數據:"+taskTempMo.getTestId(), (int)(System.currentTimeMillis()-start), e);          } finally {              try {                  //刪除任務表數據                  selectForMasterMapper.delScanTemp(taskTempMo.getId());              } catch (Throwable e) {                  LogUtils.error(LOG, "刪除任務表數據異常", e,"id",taskTempMo.getId());              }              LogUtils.info(LOG, "處理具體邏輯完成", "耗時(ms)", (System.currentTimeMillis() - start));              TraceUtils.endTrace();          }        }    }

  當然僅僅只有上邊這些代碼這個模型還是不夠可靠的,因為如果集群中某台機器宕機的話則該台機器上的所有未處理完成的任務都會“陷入僵局”因此這個時候就需要其他的兄弟進行“接盤”操作了,這裡是使用ZK進行處理的:

 

  ZK的操作類:

/**   * ZK的連接工具類   * Created by lcy on 2019/10/27.   */  @Component  public class ZooKeeperClient {        private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperClient.class);      //dubbo服務地址      @Value("${dubbo.registry.addrss}")      private String hosts;      //本機環境地址      @Value("${dubbo.provider.group}")      private String env;      //超時時間      private final int SESSION_TIMEOUT = 5000;      //根節點      private final String ROOT_NODE = "/test";        private ZooKeeper zk;      private CountDownLatch latch = new CountDownLatch(1);      @Resource      //錯誤日誌記錄類      private LogRecord logRecord;        @PostConstruct      private void init() {          try {              //鏈接ZK              initZookeeperClient();          } catch (Exception e) {              LogUtils.error(LOG, "初始化ZooKeeperClient錯誤", e);              throw new ExceptionInInitializerError("初始化ZooKeeperClient錯誤");          }      }        /**       * 鏈接ZK       * @throws Exception       */      private synchronized void initZookeeperClient() throws Exception {          LogUtils.info(LOG, "初始化Zookeeper鏈接", "hosts", hosts);          zk = new ZooKeeper(hosts, SESSION_TIMEOUT, event -> {              LogUtils.info(LOG, "處理ZooKeeper事件", "State", event.getState(), "Type", event.getType());              if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {                  LogUtils.info(LOG, "連接建立");                  latch.countDown();              }          }          );          // 等待連接建立          latch.await();          LogUtils.info(LOG, "成功建立ZooKeeper連接");          //判斷根節點是否存在          if (Objects.isNull(zk.exists(ROOT_NODE, false))){              //創建一個持久節點              zk.create(ROOT_NODE,"IP_Statistic".getBytes("UTF-8"), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);          }            //判斷環境節點是否存在          String envNode = ROOT_NODE + "/" + env;          if (Objects.isNull(zk.exists(envNode, false))){              //創建環境節點              zk.create(envNode,("environment:" + env).getBytes("UTF-8"), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);          }            //創建IP臨時節點          String childNode = envNode + "/" + IPConstant.IP;          String create = zk.create(childNode, ExternalOcrConstant.IP.getBytes("UTF-8"), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);          LogUtils.info(LOG, "創建IP節點成功", "create", create);      }        /**       * 關閉資源       */      private void close() {          if (Objects.nonNull(zk)) {              try {                  zk.close();                  zk = null;              } catch (InterruptedException e) {                  LogUtils.error(LOG, "關閉ZK節點失敗", e, "path", ROOT_NODE);              }          }      }        /**       * 重連ZK       * @return       */      private synchronized boolean reConnect() {          long start = System.currentTimeMillis();          //關閉鏈接          close();          try {              Thread.sleep(1000);              initZookeeperClient();                return true;          } catch (Exception e) {              LogUtils.error(LOG, "重連ZooKeeper失敗", e);              //記錄錯誤日誌              recordErroLog(e,"reConnect",start);          }            return false;      }        /**       * 獲取活躍節點       * @return       */      public synchronized List<String> fetchActiveNode() {          long start = System.currentTimeMillis();          try {              List<String> activeNodeList = zk.getChildren(ROOT_NODE + "/" + env, false);                return activeNodeList;          } catch (Throwable e) {              LogUtils.error(LOG, "獲取ZK節點列表失敗", e, "path", ROOT_NODE);              //記錄錯誤日誌              recordErroLog(e,"fetchActiveNode",start);              //重連ZK              reConnect();                return Lists.newArrayList();          }      }        /**       * 記錄錯誤日誌       * @param e       * @param methodName       * @param start       */      public void recordErroLog(Throwable e, String methodName, Long start){          logRecord.selfDubboThrowableLogRecord(TraceUtils.getTrace(), "ZooKeeperClient"+"#" + methodName, "test系統", (int)(System.currentTimeMillis()-start), e);      }    }  

   

  服務器健康檢測:

/**   * 服務器健康檢測和未處理的任務分配類   * Created by lcy on 2019/10/27.   */  @Component  public class CheckServerProcess implements Runnable{      private static final Logger LOG = LoggerFactory.getLogger(CheckServerProcess.class);      /** 檢查ZK健康狀況的間隔 */      private static final long CHECK_ZK = 1000 * 20;        @Resource      //臨時任務表Mapper類      private TestTempMapper testTempMapper;      @Resource      //錯誤日記記錄類      private LogRecord logRecord;      @Resource      //自定義SQL類      private SelectForMasterMapper selectForMasterMapper;      @Resource      //ZK的操作類      private ZooKeeperClient zooKeeperClient;        @PostConstruct      public void init() {          new Thread(this, "CheckServerProcess").start();      }        @Override      public void run() {          while(true) {              //檢查服務器的健康狀態,分配宕機的未完成任務              checkServerHealth();                waitAMoment();          }      }        /**       * 檢查服務器的健康狀態       */      public void checkServerHealth() {          long start = System.currentTimeMillis();          List<String> taskIpList=Lists.newArrayList();            try {              //查詢任務列表裡的全部Ip              taskIpList = selectForMasterMapper.selectIpForOcrScanTemp();              //當前沒有臨時任務              if (CollectionUtils.isEmpty(taskIpList)){                  return;              }                /** 從Zookeeper找到當前活動的機器 */              List<String> activeNodeList = zooKeeperClient.fetchActiveNode();              //活躍ip比任務ip數大於或等於則認為機器正常              if(activeNodeList.containsAll(taskIpList)) {                  return;              }                /** 全部IP去掉在線的IP,剩下的就是離線的IP */              taskIpList.removeAll(activeNodeList);              LogUtils.info(LOG, "存在離線機器", "serverIp", taskIpList);              //獲取離線機器的未完成任務              TestTempExample testTempExample =new TestTempExample();              testTempExample.createCriteria().andIpIn(taskIpList);              List<TestTempMo> unDealTestTemp = testTempMapper.selectByExample(testTempExample);              if(CollectionUtils.isEmpty(unDealOcrScanTemp)){                  //沒有未完成的處理任務                  return;              }                if (CollectionUtils.isNotEmpty(activeNodeList)){                  //平均分配未完成的任務                  List<TestTempMo> pendTestTempList = allotTask(unDealTestTemp, activeNodeList);                  //批量更新臨時表                  batchUpdateTemp(pendTestTempList);                  LogUtils.info(LOG, "分配未處理test任務結束","deadIp", taskIpList, "task:", pendTestTempList);              }else {                  LogUtils.error(LOG, "獲取ZK節點列表為空");              }            }catch (Exception e){              LogUtils.error(LOG, "分配未處理test任務失敗", e,"serverIpMos",taskIpList);              logRecord.selfDubboThrowableLogRecord(TraceUtils.getTrace(), "CheckServerProcess"+"#"+"checkServerHealth", "test系統", (int)(System.currentTimeMillis()-start), e);          }      }        /**       * 平均分配未完成的任務       * @param unDealTestTemp       * @param activeNodeList       */      public static List<TestTempMo> allotTask(List<TestTempMo> unDealTestTemp, List<String> activeNodeList) {          List<TestTempMo> testTemp=Lists.newArrayList();          //每台機器分配的任務數(平均分配)          int taskCount = unDealTestTemp.size() / activeNodeList.size();          //分配個數奇偶判斷          int type = unDealTestTemp.size() % activeNodeList.size();          int count=0;            for (String ip : activeNodeList) {              Iterator<TestTempMo> it = unDealTestTemp.iterator();              while(it.hasNext()){                  TestTempMo testTempMo = it.next();                  testTempMo.setIp(ip);                  //初始化任務狀態                  testTempMo.setIsTask(false);                  testTemp.add(testTempMo);                  it.remove();                    count++;                  //如果任務數大於平均任務數任務數,則分配到下台機器機器                  if (type == 0){                      if (count == taskCount){                          count=0;                          break;                      }                  }else {                      if (count>taskCount){                          count=0;                          break;                      }                  }                }          }            return testTemp;      }        /**       * 批量更新臨時表數據       * @param unDealTestTemp       */      public void batchUpdateTemp(List<TestTempMo> unDealTestTemp){          for (TestTempMo testTempMo : unDealTestTemp) {              testTempMapper.updateByPrimaryKeySelective(testTempMo);          }      }        private void waitAMoment() {          try {              Thread.sleep(CHECK_ZK);          } catch (InterruptedException e) {              LogUtils.error(LOG, "生產者線程休眠異常", e);          }      }    }  

  以上就是生產者消費者模型的全部思路了,1024程序員節的時候在公眾號上看到了一句話突然心裏感覺很暖:

    我們在鍵盤上留下的餘溫,也將隨時代傳遞到更遠的將來。共勉!