Redis(十一):哨兵模式架構設計分析

  業務最初的應用場景中,我們也許使用單機redis就可以應付業務要求,但並非一直可行。

  比如單機的讀寫能力問題,單機的可用性問題,單機的數據安全性問題。這些都是許多互聯網應用經常會遇到的問題,也基本上都有一套理論去解決它,只是百花齊放。

  哨兵是Redis中解決高可用問題的解決方案之一,我們就一起來看看 Redis是如何實現的吧!不過此方案,僅提供思路供參考,不要以此為標準方案。

  前面介紹的主從複製功能,可以說已經一定程度上解決了數據安全性問題問題,即有了備份數據,我們可以可以做讀寫分離了。只是,可用性問題還未解決,即當 master 宕機或出現其他故障時,整個寫服務就不可用了。解決方法是,手動操作,要麼重啟master使其恢復服務,要麼把master切換為其他slave機器。

  如果服務的可用性需要人工介入的話,那就算不得高可用了,所以我們需要一個自動處理機制。這就是哨兵模式。

一、哨兵系統介紹

  哨兵系統要解決的問題核心,自然是高可用問題。而如何解決,則是其設計問題。而最終呈現給用戶的,應該一個個的功能單元,即其提供的能力。如下:

    監控(Monitoring): Sentinel 會不斷地檢查你的主伺服器和從伺服器是否運作正常。
    提醒(Notification): 當被監控的某個 Redis 伺服器出現問題時, Sentinel 可以通過 API 向管理員或者其他應用程式發送通知。
    自動故障遷移(Automatic failover): 當一個主伺服器不能正常工作時, Sentinel 會開始一次自動故障遷移操作, 它會將失效主伺服器的其中一個從伺服器升級為新的主伺服器, 並讓失效主伺服器的其他從伺服器改為複製新的主伺服器;
    配置提供者: Sentinel充當客戶端服務發現的授權來源:客戶端連接到Sentinels,以詢問負責給定服務的當前Redis主伺服器的地址。 如果發生故障轉移,Sentinels將報告新地址。(這也是客戶端接入入口)

  哨兵系統的架構圖如下:

  (一)服務端架構

 

 

  (二)請求處理流程圖

 

 

二、哨兵系統搭建步驟

  哨兵可以搭建在 redis服務所在機器,也可以在單獨的機器實例上搭建。

  1. 有多個在運行的 redis master/slave 實例;

    主從服務的搭建,slaveof 設置,請參照主從配置篇。

  2. 編寫哨兵配置文件;

# Example sentinel.conf  # 定義sentinel 服務埠號  port 26379    # 針對 使用埠映射的方式的啟動,指定ip:port  # sentinel announce-ip <ip>  # sentinel announce-port <port>  # 工作目錄定義  dir /tmp    # 要監視的redis master 定義, 可配置多個 master-name 不同即可  # sentinel monitor <master-name> <ip> <redis-port> <quorum>  sentinel monitor mymaster 127.0.0.1 6379 2    # 定義master/slave 的密碼,要求同一主從服務所有密碼必須保持一致  # sentinel auth-pass <master-name> <password>    # 定義master 不可達持續多少毫秒後開始定義為節點下線,默認30s  sentinel down-after-milliseconds mymaster 30000    # sentinel parallel-syncs <master-name> <numslaves>  # 在故障轉移期間同時與新的master同步的slave數量  sentinel parallel-syncs mymaster 1    # 定義進行故障轉移的超時時間,默認3分鐘  sentinel failover-timeout mymaster 180000    # 發生故障轉移時調用的通知腳本,被調用時會傳遞兩個參數: eventType, eventDescription  # sentinel notification-script mymaster /var/redis/notify.sh    # master 變更時調用腳本配置  # 調用時會傳遞如下參數  # <master-name> <role> <state> <from-ip> <from-port> <to-ip> <to-port>  # sentinel client-reconfig-script mymaster /var/redis/reconfig.sh

  3. 啟動哨兵節點;

    # 使用 redis-sentinel 程式啟動, 這個程式不一定會有,需要自己編譯      redis-sentinel /path/to/sentinel.conf      # 使用 redis-server 程式啟動, 一定可用      # 測試時可以加上   --protected-mode no, 在不設置密碼情況下訪問redis      redis-server /path/to/sentinel.conf --sentinel

  4. 驗證哨兵運行情況

        通過redis-cli 連接到sentinel 服務內部:          redis-cli -p 26379   # 連接到sentinel          info sentinel         # 查看哨兵資訊          SENTINEL slaves mymaster    # 查看master下的slave伺服器情況          SENTINEL sentinels mymaster    # 查看master的哨兵伺服器列表          SENTINEL get-master-addr-by-name mymaster    # 獲取master地址資訊

  5. 故障模擬

  將master節點關閉後,等待一段時間,再獲取master地址看看。master已經切換了。

   SENTINEL get-master-addr-by-name mymaster    # 獲取master地址資訊

 

三、客戶端使用哨兵系統

  哨兵系統搭建好之後,就可以提供服務了。那麼,如何提供服務呢?從最前面的兩張架構圖中,我們可以看到,sentinel 差不多是作為一個配置中心或者存在的,它只會為客戶端提供master/slave的相關資訊,而並不會直接代替redis實例進行存取操作。所以,哨兵模式,需要客戶端做更多的工作,原來的直接連接redis變為間接從sentinel獲取資訊,再連接,還要維護可能的資訊變更。

  當然,這種工作一般是要交給sdk做的,實現原理也差不多,我們就以 jedis 作為切入點,詳解下客戶端如何使用sentinel.

  1. 引入pom依賴

        <dependency>              <groupId>redis.clients</groupId>              <artifactId>jedis</artifactId>              <version>2.9.0</version>          </dependency>

  2. 單元測試

public class RedisSentinelTest {        @Test      public void testSentinel() throws Exception {          String masterName = "mymaster";          // 只需設置sentinel資訊,真實的 redis實例資訊由 sentinel 提供          Set<String> sentinels = new HashSet<>();          sentinels.add("127.0.0.1:26379");          sentinels.add("127.0.0.1:26378");          sentinels.add("127.0.0.1:26377");            JedisSentinelPool pool = new JedisSentinelPool(masterName, sentinels);          Jedis jedis = pool.getResource();          String key = "key1";          String value = "Value1";          // set get 測試哨兵系統是否可用          jedis.set(key, value);          System.out.println("set a value to Redis over. " + key + "->" + value);          value = jedis.get("key1");          System.out.println("get a value from Redis over. " + key + "->" + value);          pool.close();      }  }

  3. sentinel 處理過程解析

  jedis的sdk中已經將哨兵封裝得和普通的redis實例請求差不多了,所以,我們需要深入理解下其處理過程。

  首先是在初始化 JedisSentinelPool 時,其會與sentinel列表中選擇一個與其建立連接:

    // redis.clients.jedis.JedisSentinelPool#JedisSentinelPool    public JedisSentinelPool(String masterName, Set<String> sentinels) {      this(masterName, sentinels, new GenericObjectPoolConfig(), Protocol.DEFAULT_TIMEOUT, null,          Protocol.DEFAULT_DATABASE);    }    public JedisSentinelPool(String masterName, Set<String> sentinels,        final GenericObjectPoolConfig poolConfig, int timeout, final String password,        final int database) {      this(masterName, sentinels, poolConfig, timeout, timeout, password, database);    }    public JedisSentinelPool(String masterName, Set<String> sentinels,        final GenericObjectPoolConfig poolConfig, final int timeout, final int soTimeout,        final String password, final int database) {      this(masterName, sentinels, poolConfig, timeout, soTimeout, password, database, null);    }    public JedisSentinelPool(String masterName, Set<String> sentinels,        final GenericObjectPoolConfig poolConfig, final int connectionTimeout, final int soTimeout,        final String password, final int database, final String clientName) {      this.poolConfig = poolConfig;      this.connectionTimeout = connectionTimeout;      this.soTimeout = soTimeout;      this.password = password;      this.database = database;      this.clientName = clientName;      // 從sentinel中獲取master資訊,關鍵      HostAndPort master = initSentinels(sentinels, masterName);      // 初始化連接池,非本文重點      initPool(master);    }      private HostAndPort initSentinels(Set<String> sentinels, final String masterName) {        HostAndPort master = null;      boolean sentinelAvailable = false;        log.info("Trying to find master from available Sentinels...");      // 依次遍歷 sentinels, 直到找到一個可用的sentinel      for (String sentinel : sentinels) {        final HostAndPort hap = HostAndPort.parseString(sentinel);          log.fine("Connecting to Sentinel " + hap);          Jedis jedis = null;        try {          jedis = new Jedis(hap.getHost(), hap.getPort());          // 向sentinel發送命令請求: SENTINEL get-master-addr-by-name mymaster, 獲取master地址資訊          List<String> masterAddr = jedis.sentinelGetMasterAddrByName(masterName);            // connected to sentinel...          sentinelAvailable = true;            if (masterAddr == null || masterAddr.size() != 2) {            log.warning("Can not get master addr, master name: " + masterName + ". Sentinel: " + hap                + ".");            continue;          }            master = toHostAndPort(masterAddr);          log.fine("Found Redis master at " + master);          break;        } catch (JedisException e) {          // resolves #1036, it should handle JedisException there's another chance          // of raising JedisDataException          log.warning("Cannot get master address from sentinel running @ " + hap + ". Reason: " + e              + ". Trying next one.");        } finally {          if (jedis != null) {            jedis.close();          }        }      }        if (master == null) {        if (sentinelAvailable) {          // can connect to sentinel, but master name seems to not          // monitored          throw new JedisException("Can connect to sentinel, but " + masterName              + " seems to be not monitored...");        } else {          throw new JedisConnectionException("All sentinels down, cannot determine where is "              + masterName + " master is running...");        }      }        log.info("Redis master running at " + master + ", starting Sentinel listeners...");      // 為每個 sentinel, 建立一個監聽執行緒, 監聽 sentinel 的 +switch-master 資訊      // 當master發生變化時,重新初始化連接池      for (String sentinel : sentinels) {        final HostAndPort hap = HostAndPort.parseString(sentinel);        MasterListener masterListener = new MasterListener(masterName, hap.getHost(), hap.getPort());        // whether MasterListener threads are alive or not, process can be stopped        masterListener.setDaemon(true);        masterListeners.add(masterListener);        masterListener.start();      }        return master;    }      // 每個 sentinel 監聽執行緒事務處理流程如下      // redis.clients.jedis.JedisSentinelPool.MasterListener#run      @Override      public void run() {          running.set(true);          while (running.get()) {            j = new Jedis(host, port);            try {            // double check that it is not being shutdown            if (!running.get()) {              break;            }              // SUBSCRIBE +switch-master            j.subscribe(new JedisPubSub() {              @Override              public void onMessage(String channel, String message) {                log.fine("Sentinel " + host + ":" + port + " published: " + message + ".");                  String[] switchMasterMsg = message.split(" ");                  // 格式為: masterName xx xx masterHost masterPort                if (switchMasterMsg.length > 3) {                    if (masterName.equals(switchMasterMsg[0])) {                    initPool(toHostAndPort(Arrays.asList(switchMasterMsg[3], switchMasterMsg[4])));                  } else {                    log.fine("Ignoring message on +switch-master for master name "                        + switchMasterMsg[0] + ", our master name is " + masterName);                  }                  } else {                  log.severe("Invalid message received on Sentinel " + host + ":" + port                      + " on channel +switch-master: " + message);                }              }            }, "+switch-master");            } catch (JedisConnectionException e) {              if (running.get()) {              log.log(Level.SEVERE, "Lost connection to Sentinel at " + host + ":" + port                  + ". Sleeping 5000ms and retrying.", e);              try {                Thread.sleep(subscribeRetryWaitTimeMillis);              } catch (InterruptedException e1) {                log.log(Level.SEVERE, "Sleep interrupted: ", e1);              }            } else {              log.fine("Unsubscribing from Sentinel at " + host + ":" + port);            }          } finally {            j.close();          }        }      }

  從上面流程我們也就可以看出客戶端是如何處理 sentinel 和 redis 的關係的了。簡單來說就是通過 sentinel get-master-addr-by-name xxx, 獲取master地址資訊,然後連接過去就可以了。在master發生變化時,通過pub/sub訂閱sentinel資訊,從而進行連接池的重置。

  這個連接池又是如何處理的呢?我們可以簡單看一下:

    // redis.clients.jedis.JedisSentinelPool#initPool    private void initPool(HostAndPort master) {      if (!master.equals(currentHostMaster)) {        currentHostMaster = master;        if (factory == null) {          factory = new JedisFactory(master.getHost(), master.getPort(), connectionTimeout,              soTimeout, password, database, clientName, false, null, null, null);          initPool(poolConfig, factory);        } else {          factory.setHostAndPort(currentHostMaster);          // although we clear the pool, we still have to check the          // returned object          // in getResource, this call only clears idle instances, not          // borrowed instances          internalPool.clear();        }          log.info("Created JedisPool to master at " + master);      }    }    // redis.clients.util.Pool#initPool    public void initPool(final GenericObjectPoolConfig poolConfig, PooledObjectFactory<T> factory) {        if (this.internalPool != null) {        try {          closeInternalPool();        } catch (Exception e) {        }      }        this.internalPool = new GenericObjectPool<T>(factory, poolConfig);    }

  當要向redis寫入數據時,會先從連接池裡獲取一個連接實例,其池化框架使用的是 GenericObjectPool 的通用能力,調用 JedisFactory 的 makeObject() 方法進行創建 :

  // redis.clients.jedis.JedisSentinelPool#getResource    @Override    public Jedis getResource() {      while (true) {        // 調用父類方法獲取實例        Jedis jedis = super.getResource();        jedis.setDataSource(this);          // get a reference because it can change concurrently        final HostAndPort master = currentHostMaster;        final HostAndPort connection = new HostAndPort(jedis.getClient().getHost(), jedis.getClient()            .getPort());        // host:port 比對,如果master未變化,說明獲取到了正確的連接,返回        if (master.equals(connection)) {          // connected to the correct master          return jedis;        }        // 如果master 發生了切換,則將當前連接釋放,繼續嘗試獲取master連接        else {          returnBrokenResource(jedis);        }      }    }    // redis.clients.util.Pool#getResource    public T getResource() {      try {        return internalPool.borrowObject();      } catch (NoSuchElementException nse) {        throw new JedisException("Could not get a resource from the pool", nse);      } catch (Exception e) {        throw new JedisConnectionException("Could not get a resource from the pool", e);      }    }      // org.apache.commons.pool2.impl.GenericObjectPool#borrowObject()      @Override      public T borrowObject() throws Exception {          return borrowObject(getMaxWaitMillis());      }      // org.apache.commons.pool2.impl.GenericObjectPool#borrowObject(long)      public T borrowObject(final long borrowMaxWaitMillis) throws Exception {          assertOpen();            final AbandonedConfig ac = this.abandonedConfig;          if (ac != null && ac.getRemoveAbandonedOnBorrow() &&                  (getNumIdle() < 2) &&                  (getNumActive() > getMaxTotal() - 3) ) {              removeAbandoned(ac);          }            PooledObject<T> p = null;            // Get local copy of current config so it is consistent for entire          // method execution          final boolean blockWhenExhausted = getBlockWhenExhausted();            boolean create;          final long waitTime = System.currentTimeMillis();            while (p == null) {              create = false;              p = idleObjects.pollFirst();              if (p == null) {                  // 沒有獲取到連接時,主動創建一個                  p = create();                  if (p != null) {                      create = true;                  }              }              if (blockWhenExhausted) {                  if (p == null) {                      if (borrowMaxWaitMillis < 0) {                          p = idleObjects.takeFirst();                      } else {                          p = idleObjects.pollFirst(borrowMaxWaitMillis,                                  TimeUnit.MILLISECONDS);                      }                  }                  if (p == null) {                      throw new NoSuchElementException(                              "Timeout waiting for idle object");                  }              } else {                  if (p == null) {                      throw new NoSuchElementException("Pool exhausted");                  }              }              if (!p.allocate()) {                  p = null;              }                if (p != null) {                  try {                      // 確保激活當前資料庫                      factory.activateObject(p);                  } catch (final Exception e) {                      try {                          destroy(p);                      } catch (final Exception e1) {                          // Ignore - activation failure is more important                      }                      p = null;                      if (create) {                          final NoSuchElementException nsee = new NoSuchElementException(                                  "Unable to activate object");                          nsee.initCause(e);                          throw nsee;                      }                  }                  if (p != null && (getTestOnBorrow() || create && getTestOnCreate())) {                      boolean validate = false;                      Throwable validationThrowable = null;                      try {                          validate = factory.validateObject(p);                      } catch (final Throwable t) {                          PoolUtils.checkRethrow(t);                          validationThrowable = t;                      }                      if (!validate) {                          try {                              destroy(p);                              destroyedByBorrowValidationCount.incrementAndGet();                          } catch (final Exception e) {                              // Ignore - validation failure is more important                          }                          p = null;                          if (create) {                              final NoSuchElementException nsee = new NoSuchElementException(                                      "Unable to validate object");                              nsee.initCause(validationThrowable);                              throw nsee;                          }                      }                  }              }          }            updateStatsBorrow(p, System.currentTimeMillis() - waitTime);            return p.getObject();      }        /**       * Attempts to create a new wrapped pooled object.       * <p>       * If there are {@link #getMaxTotal()} objects already in circulation       * or in process of being created, this method returns null.       *       * @return The new wrapped pooled object       *       * @throws Exception if the object factory's {@code makeObject} fails       */      private PooledObject<T> create() throws Exception {          int localMaxTotal = getMaxTotal();          // This simplifies the code later in this method          if (localMaxTotal < 0) {              localMaxTotal = Integer.MAX_VALUE;          }            // Flag that indicates if create should:          // - TRUE:  call the factory to create an object          // - FALSE: return null          // - null:  loop and re-test the condition that determines whether to          //          call the factory          Boolean create = null;          while (create == null) {              synchronized (makeObjectCountLock) {                  final long newCreateCount = createCount.incrementAndGet();                  if (newCreateCount > localMaxTotal) {                      // The pool is currently at capacity or in the process of                      // making enough new objects to take it to capacity.                      createCount.decrementAndGet();                      if (makeObjectCount == 0) {                          // There are no makeObject() calls in progress so the                          // pool is at capacity. Do not attempt to create a new                          // object. Return and wait for an object to be returned                          create = Boolean.FALSE;                      } else {                          // There are makeObject() calls in progress that might                          // bring the pool to capacity. Those calls might also                          // fail so wait until they complete and then re-test if                          // the pool is at capacity or not.                          makeObjectCountLock.wait();                      }                  } else {                      // The pool is not at capacity. Create a new object.                      makeObjectCount++;                      create = Boolean.TRUE;                  }              }          }            if (!create.booleanValue()) {              return null;          }            final PooledObject<T> p;          try {              // 調用指定factory的 makeObject() 方法              p = factory.makeObject();          } catch (final Exception e) {              createCount.decrementAndGet();              throw e;          } finally {              synchronized (makeObjectCountLock) {                  makeObjectCount--;                  makeObjectCountLock.notifyAll();              }          }            final AbandonedConfig ac = this.abandonedConfig;          if (ac != null && ac.getLogAbandoned()) {              p.setLogAbandoned(true);          }            createdCount.incrementAndGet();          allObjects.put(new IdentityWrapper<T>(p.getObject()), p);          return p;      }    // 使用 JedisFactory 創建一個連接到 master    // redis.clients.jedis.JedisFactory#makeObject    @Override    public PooledObject<Jedis> makeObject() throws Exception {      final HostAndPort hostAndPort = this.hostAndPort.get();      final Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort(), connectionTimeout,          soTimeout, ssl, sslSocketFactory, sslParameters, hostnameVerifier);        try {        jedis.connect();        // 如果存在密碼設置,則進行 auth xxx 操作        // redis 配置: requirepass xxx        if (null != this.password) {          jedis.auth(this.password);        }        if (database != 0) {          jedis.select(database);        }        if (clientName != null) {          jedis.clientSetname(clientName);        }      } catch (JedisException je) {        jedis.close();        throw je;      }        return new DefaultPooledObject<Jedis>(jedis);      }    // redis.clients.jedis.JedisFactory#activateObject    @Override    public void activateObject(PooledObject<Jedis> pooledJedis) throws Exception {      final BinaryJedis jedis = pooledJedis.getObject();      if (jedis.getDB() != database) {        jedis.select(database);      }      }

  獲取到client連接後,主可以任意地通過網路io與真實redis進行交互了。哨兵也不會成為性能問題了。

 

四、思考

  哨兵模式的出現,僅為了解決單機的高可用問題,而並不會解決單機容量問題(集群模式會處理這個問題)。在當前的互聯網環境中,應用面也許沒有那麼廣。但思路是值得借鑒的。

  Sentinel 在配置時只需配置master地址即可,其slave資訊,sentinel資訊,都是通過master來推斷的。所以,一定要確保在啟動時master是可用的,否則系統本身必須無法啟動。

  如果redis中設置了密碼,則要求必須保持全部一致,這在一定程度上會有些誤會。

  redis Sentinel 本身是一個對等集群系統,連接任意節點結果都是一樣的,節點間保持通過pub/sub兩兩通訊。