ZooKeeper實現生產-消費者隊列

  • 2019 年 10 月 8 日
  • 筆記

目錄

對前續程式碼的重構

  1. 隊列的生產者
  2. 隊列的消費者
  3. 測試日誌
  4. 源程式碼

生產-消費者隊列,用於多節點的分散式數據結構,生產和消費數據。生產者創建一個數據對象,並放到隊列中;消費者從隊列中取出一個數據對象並進行處理。在ZooKeeper中,隊列可以使用一個容器節點下創建多個子節點來實現;創建子節點時,CreateMode使用 PERSISTENT_SEQUENTIAL,ZooKeeper會自動在節點名稱後面添加唯一序列號。EPHEMERAL_SEQUENTIAL也有同樣的特點,區別在於會話結束後是否會自動刪除。

敲小黑板:*_SEQUENTIAL是ZooKeeper的一個很重要的特性,分散式鎖、選舉制度都依靠這個特性實現的。

1 對前續程式碼的重構

之前的文章,我們已經用實現了Watcher和Barrier,創建ZooKeeper連接的程式碼已經複製了一遍。後續還需要類似的工作,因此先對原有程式碼做一下重構,讓程式碼味道乾淨一點。

以下是 process(WatchedEvent)的程式碼

  final public void process(WatchedEvent event) {      if (Event.EventType.None.equals(event.getType())) {        // 連接狀態發生變化        if (Event.KeeperState.SyncConnected.equals(event.getState())) {          // 連接建立成功          connectedSemaphore.countDown();        }      } else if (Event.EventType.NodeCreated.equals(event.getType())) {        processNodeCreated(event);      } else if (Event.EventType.NodeDeleted.equals(event.getType())) {        processNodeDeleted(event);      } else if (Event.EventType.NodeDataChanged.equals(event.getType())) {        processNodeDataChanged(event);      } else if (Event.EventType.NodeChildrenChanged.equals(event.getType())) {        processNodeChildrenChanged(event);      }    }

以ZooKeeperBarrier為例,看看重構之後的構造函數和監聽Event的程式碼

ZooKeeperBarrier(String address, String tableSerial, int tableCapacity, String customerName)        throws IOException {      super(address);      this.tableSerial = createRootNode(tableSerial);      this.tableCapacity = tableCapacity;      this.customerName = customerName;    }    protected void processNodeChildrenChanged(WatchedEvent event) {      log.info("{} 接收到了通知 : {}", customerName, event.getType());      // 子節點有變化      synchronized (mutex) {        mutex.notify();      }    }

2 隊列的生產者

生產者的關鍵程式碼

String elementName = queueName + "/element";  ArrayList<ACL> ids = ZooDefs.Ids.OPEN_ACL_UNSAFE;  CreateMode createMode = CreateMode.PERSISTENT_SEQUENTIAL;  getZooKeeper().create(elementName, value, ids, createMode);

注意,重點是PERSISTENT_SEQUENTIAL,PERSISTENT是表示永久存儲直到有命令刪除,SEQUENTIAL表示自動在後面添加自增的唯一序列號。這樣,儘管elementName都一樣,但實際生成的zNode名字在 「element」後面會添加格式為%010d的10個數字,如0000000001。如一個完整的zNode名可能為/queue/element0000000021。

3 隊列的消費者

消費者嘗試從子節點列表獲取zNode名最小的一個子節點,如果隊列為空則等待NodeChildrenChanged事件。關鍵程式碼

/** 隊列的同步訊號 */    private static Integer queueMutex = Integer.valueOf(1);        @Override    protected void processNodeChildrenChanged(WatchedEvent event) {      synchronized (queueMutex) {        queueMutex.notify();      }    }        /**     * 從隊列中刪除第一個對象     *     * @return     * @throws KeeperException     * @throws InterruptedException     */    int consume() throws KeeperException, InterruptedException {      while (true) {        synchronized (queueMutex) {          List<String> list = getZooKeeper().getChildren(queueName, true);          if (list.size() == 0) {            queueMutex.wait();          } else {            // 獲取第一個子節點的名稱            String firstNodeName = getFirstElementName(list);            // 刪除節點,並返回節點的值            return deleteNodeAndReturnValue(firstNodeName);          }        }      }    }

4 測試日誌

把測試結果放源碼前面,免得大家被無聊的程式碼晃暈。

測試程式碼創建了兩個執行緒,一個執行緒是生產者,按隨機間隔往隊列中添加對象;一個執行緒是消費者,隨機間隔嘗試從隊列中取出第一個,如果當時隊列為空,會等到直到新的數據。

兩個進程都加上隨機間隔,是為了模擬生產可能比消費更快的情況。以下是測試日誌,為了更突出,生產和消費的日誌我增加了不同的文字樣式。

49:47.866 [INFO] ZooKeeperQueueTest.testQueue(29) 開始ZooKeeper隊列測試,本次將測試 10 個數據    49:48.076 [DEBUG] ZooKeeperQueue.log(201)    + Profiler [tech.codestory.zookeeper.queue.ZooKeeperQueue 連接到ZooKeeper]    |-- elapsed time                   [開始鏈接]   119.863 milliseconds.    |-- elapsed time           [等待連接成功的Event]    40.039 milliseconds.    |-- Total        [tech.codestory.zookeeper.queue.ZooKeeperQueue 連接到ZooKeeper]   159.911 milliseconds.        49:48.082 [DEBUG] ZooKeeperQueue.log(201)    + Profiler [tech.codestory.zookeeper.queue.ZooKeeperQueue 連接到ZooKeeper]    |-- elapsed time                   [開始鏈接]   103.795 milliseconds.    |-- elapsed time           [等待連接成功的Event]    65.899 milliseconds.    |-- Total        [tech.codestory.zookeeper.queue.ZooKeeperQueue 連接到ZooKeeper]   170.263 milliseconds.        49:48.102 [INFO] ZooKeeperQueueTest.run(51) 生產對象 : 1 , 然後等待 1700 毫秒    49:48.134 [INFO] ZooKeeperQueueTest.run(80) 消費對象: 1 , 然後等待 4000 毫秒    49:49.814 [INFO] ZooKeeperQueueTest.run(51) 生產對象 : 2 , 然後等待 900 毫秒    49:50.717 [INFO] ZooKeeperQueueTest.run(51) 生產對象 : 3 , 然後等待 1300 毫秒    49:52.020 [INFO] ZooKeeperQueueTest.run(51) 生產對象 : 4 , 然後等待 3700 毫秒    49:52.139 [INFO] ZooKeeperQueueTest.run(80) 消費對象: 2 , 然後等待 2800 毫秒    49:54.947 [INFO] ZooKeeperQueueTest.run(80) 消費對象: 3 , 然後等待 4500 毫秒    49:55.724 [INFO] ZooKeeperQueueTest.run(51) 生產對象 : 5 , 然後等待 3500 毫秒    49:59.228 [INFO] ZooKeeperQueueTest.run(51) 生產對象 : 6 , 然後等待 4200 毫秒    49:59.454 [INFO] ZooKeeperQueueTest.run(80) 消費對象: 4 , 然後等待 2400 毫秒    50:01.870 [INFO] ZooKeeperQueueTest.run(80) 消費對象: 5 , 然後等待 4900 毫秒    50:03.435 [INFO] ZooKeeperQueueTest.run(51) 生產對象 : 7 , 然後等待 4500 毫秒    50:06.776 [INFO] ZooKeeperQueueTest.run(80) 消費對象: 6 , 然後等待 3600 毫秒    50:07.938 [INFO] ZooKeeperQueueTest.run(51) 生產對象 : 8 , 然後等待 1900 毫秒    50:09.846 [INFO] ZooKeeperQueueTest.run(51) 生產對象 : 9 , 然後等待 3200 毫秒    50:10.388 [INFO] ZooKeeperQueueTest.run(80) 消費對象: 7 , 然後等待 2900 毫秒    50:13.051 [INFO] ZooKeeperQueueTest.run(51) 生產對象 : 10 , 然後等待 4900 毫秒    50:13.294 [INFO] ZooKeeperQueueTest.run(80) 消費對象: 8 , 然後等待 300 毫秒    50:13.600 [INFO] ZooKeeperQueueTest.run(80) 消費對象: 9 , 然後等待 4800 毫秒    50:18.407 [INFO] ZooKeeperQueueTest.run(80) 消費對象: 10 , 然後等待 2400 毫秒

5 完整源碼

5.1 ZooKeeperBase.java

  package tech.codestory.zookeeper;        import java.io.IOException;    import java.util.concurrent.CountDownLatch;    import org.apache.zookeeper.*;    import org.apache.zookeeper.data.Stat;    import org.slf4j.Logger;    import org.slf4j.LoggerFactory;    import org.slf4j.profiler.Profiler;        /**     * 為 ZooKeeper測試程式碼創建一個基類,封裝建立連接的過程     *     * @author code story     * @date 2019/8/16     */    public class ZooKeeperBase implements Watcher {      /** 日誌,不使用 @Slf4j ,是要使用子類的log */      Logger log = null;          /** 等待連接建立成功的訊號 */      private CountDownLatch connectedSemaphore = new CountDownLatch(1);      /** ZooKeeper 客戶端 */      private ZooKeeper zooKeeper = null;      /** 避免重複根節點 */      static Integer rootNodeInitial = Integer.valueOf(1);          /** 構造函數 */      public ZooKeeperBase(String address) throws IOException {        log = LoggerFactory.getLogger(getClass());            Profiler profiler = new Profiler(this.getClass().getName() + " 連接到ZooKeeper");        profiler.start("開始鏈接");        zooKeeper = new ZooKeeper(address, 3000, this);        try {          profiler.start("等待連接成功的Event");          connectedSemaphore.await();        } catch (InterruptedException e) {          log.error("InterruptedException", e);        }        profiler.stop();        profiler.setLogger(log);        profiler.log();      }          /**       * 創建測試需要的根節點       *       * @param rootNodeName       * @return       */      public String createRootNode(String rootNodeName) {        synchronized (rootNodeInitial) {          // 創建 tableSerial 的zNode          try {            Stat existsStat = getZooKeeper().exists(rootNodeName, false);            if (existsStat == null) {              rootNodeName = getZooKeeper().create(rootNodeName, new byte[0],                  ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);            }          } catch (KeeperException e) {            log.error("KeeperException", e);          } catch (InterruptedException e) {            log.error("InterruptedException", e);          }        }        return rootNodeName;      }          /** 讀取ZooKeeper對象,供子類調用 */      protected ZooKeeper getZooKeeper() {        return zooKeeper;      }          @Override      final public void process(WatchedEvent event) {        if (Event.EventType.None.equals(event.getType())) {          // 連接狀態發生變化          if (Event.KeeperState.SyncConnected.equals(event.getState())) {            // 連接建立成功            connectedSemaphore.countDown();          }        } else if (Event.EventType.NodeCreated.equals(event.getType())) {          processNodeCreated(event);        } else if (Event.EventType.NodeDeleted.equals(event.getType())) {          processNodeDeleted(event);        } else if (Event.EventType.NodeDataChanged.equals(event.getType())) {          processNodeDataChanged(event);        } else if (Event.EventType.NodeChildrenChanged.equals(event.getType())) {          processNodeChildrenChanged(event);        }      }          /**       * 處理事件: NodeCreated       *       * @param event       */      protected void processNodeCreated(WatchedEvent event) {}          /**       * 處理事件: NodeDeleted       *       * @param event       */      protected void processNodeDeleted(WatchedEvent event) {}          /**       * 處理事件: NodeDataChanged       *       * @param event       */      protected void processNodeDataChanged(WatchedEvent event) {}          /**       * 處理事件: NodeChildrenChanged       *       * @param event       */      protected void processNodeChildrenChanged(WatchedEvent event) {}    }

5.2 ZooKeeperQueue.java

  package tech.codestory.zookeeper.queue;        import java.io.IOException;    import java.nio.ByteBuffer;    import java.util.ArrayList;    import java.util.List;    import org.apache.zookeeper.CreateMode;    import org.apache.zookeeper.KeeperException;    import org.apache.zookeeper.WatchedEvent;    import org.apache.zookeeper.ZooDefs;    import org.apache.zookeeper.data.ACL;    import org.apache.zookeeper.data.Stat;    import lombok.extern.slf4j.Slf4j;    import tech.codestory.zookeeper.ZooKeeperBase;        /**     * ZooKeeper實現Queue     *     * @author code story     * @date 2019/8/16     */    @Slf4j    public class ZooKeeperQueue extends ZooKeeperBase {      /** 隊列名稱 */      private String queueName;          /** 隊列的同步訊號 */      private static Integer queueMutex = Integer.valueOf(1);          /**       * 構造函數       *       * @param address       * @param queueName       * @throws IOException       */      public ZooKeeperQueue(String address, String queueName) throws IOException {        super(address);            this.queueName = createRootNode(queueName);      }          @Override      protected void processNodeChildrenChanged(WatchedEvent event) {        synchronized (queueMutex) {          queueMutex.notify();        }      }          /**       * 將對象添加到隊列中       *       * @param i       * @return       */      boolean produce(int i) throws KeeperException, InterruptedException {        ByteBuffer b = ByteBuffer.allocate(4);        byte[] value;            // Add child with value i        b.putInt(i);        value = b.array();        String elementName = queueName + "/element";        ArrayList<ACL> ids = ZooDefs.Ids.OPEN_ACL_UNSAFE;        CreateMode createMode = CreateMode.PERSISTENT_SEQUENTIAL;        getZooKeeper().create(elementName, value, ids, createMode);            return true;      }          /**       * 從隊列中刪除第一個對象       *       * @return       * @throws KeeperException       * @throws InterruptedException       */      int consume() throws KeeperException, InterruptedException {        while (true) {          synchronized (queueMutex) {            List<String> list = getZooKeeper().getChildren(queueName, true);            if (list.size() == 0) {              queueMutex.wait();            } else {              // 獲取第一個子節點的名稱              String firstNodeName = getFirstElementName(list);              // 刪除節點,並返回節點的值              return deleteNodeAndReturnValue(firstNodeName);            }          }        }      }          /**       * 獲取第一個子節點的名稱       *       * @param list       * @return       */      private String getFirstElementName(List<String> list) {        Integer min = Integer.MAX_VALUE;        String minNode = null;        for (String s : list) {          Integer tempValue = Integer.valueOf(s.substring(7));          if (tempValue < min) {            min = tempValue;            minNode = s;          }        }        return minNode;      }          /**       * 刪除節點,並返回節點的值       *       * @param minNode       * @return       * @throws KeeperException       * @throws InterruptedException       */      private int deleteNodeAndReturnValue(String minNode)          throws KeeperException, InterruptedException {        String fullNodeName = queueName + "/" + minNode;        Stat stat = new Stat();        byte[] b = getZooKeeper().getData(fullNodeName, false, stat);        getZooKeeper().delete(fullNodeName, stat.getVersion());        ByteBuffer buffer = ByteBuffer.wrap(b);        return buffer.getInt();      }    }

5.3 ZooKeeperQueueTest.java

package tech.codestory.zookeeper.queue;        import java.io.IOException;    import java.security.SecureRandom;    import java.util.Random;    import java.util.concurrent.CountDownLatch;    import org.apache.zookeeper.KeeperException;    import org.testng.annotations.Test;    import lombok.extern.slf4j.Slf4j;        /**     * ZooKeeperQueue測試     *     * @author code story     * @date 2019/8/16     */    @Slf4j    public class ZooKeeperQueueTest {      final String address = "192.168.5.128:2181";      final String queueName = "/queue";      final Random random = new SecureRandom();      // 隨機生成10-20之間的個數      final int count = 10 + random.nextInt(10);      /** 等待生產者和消費者執行緒都結束 */      private CountDownLatch connectedSemaphore = new CountDownLatch(2);          @Test      public void testQueue() {        log.info("開始ZooKeeper隊列測試,本次將測試 {} 個數據", count);        new QueueProducer().start();        new QueueConsumer().start();        try {          connectedSemaphore.await();        } catch (InterruptedException e) {          log.error("InterruptedException", e);        }      }          /**       * 隊列的生產者       */      class QueueProducer extends Thread {        @Override        public void run() {          try {            ZooKeeperQueue queue = new ZooKeeperQueue(address, queueName);            for (int i = 0; i < count; i++) {              int elementValue = i + 1;                  long waitTime = random.nextInt(50) * 100;              log.info("生產對象 : {} , 然後等待 {} 毫秒", elementValue, waitTime);              queue.produce(elementValue);              Thread.sleep(waitTime);            }          } catch (IOException e) {            log.error("IOException", e);          } catch (InterruptedException e) {            log.error("InterruptedException", e);          } catch (KeeperException e) {            log.error("KeeperException", e);          }          connectedSemaphore.countDown();        }      }          /**       * 隊列的消費者       */      class QueueConsumer extends Thread {        @Override        public void run() {          try {            ZooKeeperQueue queue = new ZooKeeperQueue(address, queueName);                for (int i = 0; i < count; i++) {              try {                int elementValue = queue.consume();                    long waitTime = random.nextInt(50) * 100;                log.info("消費對象: {} , 然後等待 {} 毫秒", elementValue, waitTime);                Thread.sleep(waitTime);              } catch (KeeperException e) {                i--;                log.error("KeeperException", e);              } catch (InterruptedException e) {                log.error("InterruptedException", e);              }            }            connectedSemaphore.countDown();          } catch (IOException e) {            log.error("IOException", e);          }        }      }    }