Dyno-queues 分散式延遲隊列 之 輔助功能

Dyno-queues 分散式延遲隊列 之 輔助功能

0x00 摘要

本系列我們會以設計分散式延遲隊列時重點考慮的模組為主線,穿插灌輸一些消息隊列的特性實現方法,通過分析Dyno-queues 分散式延遲隊列的源碼來具體看看設計實現一個分散式延遲隊列的方方面面。

0x01 前文回顧

前面兩篇文章介紹了設計思路,消息的產生和消費。本文介紹一些輔助功能,有了這些功能可以讓系統更加完善。

0x2 Ack機制

前面提到,從Redis角度來看,Dyno-queues 對於每個隊列,維護三組Redis數據結構:

  • 包含隊列元素和分數的有序集合;
  • 包含消息內容的Hash集合,其中key為消息ID;
  • 包含客戶端已經消費但尚未確認的消息有序集合,Un-ack集合

這裡的第三組數據結構,就是支援我們的 Ack 機制。

2.1 加入Un-ack集合

前面提到,_pop 是消費消息,具體 _pop 的邏輯如下:

  • 計算當前時間為最大分數。
  • 獲取分數在 0 和 最大分數 之間的消息。
  • 將 messageID 添加到 unack 集合中,並從隊列的有序集中刪除這個 messageID
  • 如果上一步成功,則根據messageID從Redis集合中檢索消息。

這就是涉及到 包含客戶端已經消費但尚未確認的消息有序集合,Un-ack集合

程式碼如下:

private List<Message> _pop(String shard, int messageCount,
                           ConcurrentLinkedQueue<String> prefetchedIdQueue) throws Exception {
    String queueShardName = getQueueShardKey(queueName, shard);
    String unackShardName = getUnackKey(queueName, shard);
    double unackScore = Long.valueOf(clock.millis() + unackTime).doubleValue();

    // NX option indicates add only if it doesn't exist.
    // //redis.io/commands/zadd#zadd-options-redis-302-or-greater
    ZAddParams zParams = ZAddParams.zAddParams().nx();

    List<Message> popped = new LinkedList<>();
    for (;popped.size() != messageCount;) {
        String msgId = prefetchedIdQueue.poll();

        //將messageID添加到unack集合中
        long added = quorumConn.zadd(unackShardName, unackScore, msgId, zParams);
        if(added == 0){
            monitor.misses.increment();
            continue;
        }

        long removed = quorumConn.zrem(queueShardName, msgId);
        if (removed == 0) {
            monitor.misses.increment();
            continue;
        }

        String json = quorumConn.hget(messageStoreKey, msgId);
        if (json == null) {
            monitor.misses.increment();
            continue;
        }
        Message msg = om.readValue(json, Message.class);
        popped.add(msg);

        if (popped.size() == messageCount) {
            return popped;
        }
    }
    return popped;
}

此時邏輯如下:

                  message list



zset  +----------+----------+----------+-----+----------+  _pop (msg id 9)
      |          |          |          |     |          |
      | msg id 1 | msg id 2 | msg id 3 | ... | msg id 9 | +----+
      |          |          |          |     |          |      |
      +---+------+----+-----+----+-----+-----+----+-----+      |
          |           |          |                |            |
          |           |          |                |            |
          v           v          v                v            |
hash  +---+---+   +---+---+   +--+----+        +--+--+         |
      | msg 1 |   | msg 2 |   | msg 3 |        |msg 9|         |
      +-------+   +-------+   +-------+        +-----+         |
                                                               |
                                                               |
                                                               |
                                                               |
                                                               |
                                                               |
                  unack list                                   |
       +------------+-------------+--------------+             |
zset   |            |             |              |             |
       |  msg id 11 |   msg id 12 |   msg id 13  |  <----------+
       |            |             |              |
       +------------+-------------+--------------+

2.2 ACK

用戶當得到消息之後,需要Ack消息,比如:

List pushed_msgs = V1Queue.push(payloads);

Message poppedWithPredicate = V1Queue.popMsgWithPredicate("searchable pay*", false);

V1Queue.ack(poppedWithPredicate.getId());

Ack的邏輯是:

  • 從unack集合中刪除messageID。
  • 因為此時已經是ack了,所以此消息就徹底沒有意義了,所以從Message有效集合中刪除messageID。

程式碼如下:

@Override
public boolean ack(String messageId) {
    try {
        return execute("ack", "(a shard in) " + queueName, () -> {

            for (String shard : allShards) {
                String unackShardKey = getUnackKey(queueName, shard);
                Long removed = quorumConn.zrem(unackShardKey, messageId);
                if (removed > 0) {
                    quorumConn.hdel(messageStoreKey, messageId);
                    return true;
                }
            }
            return false;
        });
    } 
}

private String getUnackKey(String queueName, String shard) {
		return redisKeyPrefix + ".UNACK." + queueName + "." + shard;
}

具體如下:

                  message list



zset  +----------+----------+----------+------
      |          |          |          |     |
      | msg id 1 | msg id 2 | msg id 3 | ... |
      |          |          |          |     |
      +---+------+----+-----+----+-----+-----+
          |           |          |
          |           |          |
          v           v          v                         delete
hash  +---+---+   +---+---+   +--+----+        +-----+
      | msg 1 |   | msg 2 |   | msg 3 |        |msg 9|    <----+  ACK(msg id 9)
      +-------+   +-------+   +-------+        +-----+                  +
                                                                        |
                                                                        |
                                                                        |
                                                                        |
                                                                        |
                                                                        |
                  unack list                                            |
       +------------+-------------+--------------+-------------+  delete|
zset   |            |             |              |             |        |
       |  msg id 11 |   msg id 12 |   msg id 13  |   msg id 9  |  <-----+
       |            |             |              |             |
       +------------+-------------+--------------+-------------+

2.3 處理Un-ACK的消息

後台進程會定時做檢測,即 監視 UNACK 集合中的消息,這些消息在給定時間內未被客戶端確認(每個隊列可配置)。這些消息將移回到隊列中。

2.3.1 定時任務

定時任務是如下程式碼來啟動:

schedulerForUnacksProcessing = Executors.newScheduledThreadPool(1);

if (this.singleRingTopology) {
    schedulerForUnacksProcessing.scheduleAtFixedRate(() -> atomicProcessUnacks(), unackScheduleInMS, unackScheduleInMS, TimeUnit.MILLISECONDS);
} else {
    schedulerForUnacksProcessing.scheduleAtFixedRate(() -> processUnacks(), unackScheduleInMS, unackScheduleInMS, TimeUnit.MILLISECONDS);
}

2.3.2 Un-ACK

如下程式碼,就是把未確認消息退回到隊列中。

@Override
public void processUnacks() {
    try {

        long queueDepth = size();
        monitor.queueDepth.record(queueDepth);

        String keyName = getUnackKey(queueName, shardName);
        
        execute("processUnacks", keyName, () -> {

            int batchSize = 1_000;
            String unackShardName = getUnackKey(queueName, shardName);

            double now = Long.valueOf(clock.millis()).doubleValue();
            int num_moved_back = 0;
            int num_stale = 0;

            Set<Tuple> unacks = nonQuorumConn.zrangeByScoreWithScores(unackShardName, 0, now, 0, batchSize);

            for (Tuple unack : unacks) {

                double score = unack.getScore();
                String member = unack.getElement();

                String payload = quorumConn.hget(messageStoreKey, member);
                if (payload == null) {
                    quorumConn.zrem(unackShardName, member);
                    ++num_stale;
                    continue;
                }

                long added_back = quorumConn.zadd(localQueueShard, score, member);
                long removed_from_unack = quorumConn.zrem(unackShardName, member);
                if (added_back > 0 && removed_from_unack > 0) ++num_moved_back;
            }
            return null;
        });

    } 
}

此時邏輯如下:

                             message list



           zset  +----------+----------+----------+-----+
                 |          |          |          |     |
+------------->  | msg id 1 | msg id 2 | msg id 3 | ... |
|                |          |          |          |     |
|                +---+------+----+-----+----+-----+-----+
|                    |           |          |
|                    |           |          |
|                    v           v          v
|          hash  +---+---+   +---+---+   +--+----+
|                | msg 1 |   | msg 2 |   | msg 3 |
|                +-------+   +-------+   +-------+
|
|
|
|                           unack list
|                +------------+-------------+--------------+
|         zset   |            |             |              |
|                |  msg id 11 |   msg id 12 |   msg id 13  |
+-------------+  |            |             |              |
  msg id 11      +-------+----+-------------+--------------+
                         ^
                         |  msg id 11
                         |
                 +-------+---------+
                 |                 |
                 | ScheduledThread |
                 |                 |
                 +-----------------+

0x03 防止重複消費

對於防止重複消費,系統做了如下努力:

  • 每個節點(上圖中的N1…Nn)與可用性區域具有關聯性,並且與該區域中的redis伺服器進行通訊。
  • Dynomite / Redis節點一次只能提供一個請求,Dynomite可以允許數千個並發連接,但是請求是由Redis中的單個執行緒處理,這確保了當發出兩個並發調用從隊列輪詢元素時,是由Redis伺服器順序執行,從而避免任何本地或分散式鎖。
  • 在發生故障轉移的情況下,確保沒有兩個客戶端連接從隊列中獲取相同的消息。

0x04 防止消息丟失

4.1 消息丟失的可能

4.1.1 生產者弄丟了數據

生產者將數據發送到 MQ 的時候,可能數據就在半路給搞丟了,因為網路問題啥的,都有可能。

比如,如下就是簡單的插入,缺少必要的保證。

List pushed_msgs = V1Queue.push(payloads);

4.1.2 MQ 弄丟了數據

這種情況就是 MQ 自己弄丟了數據,這個你必須開啟MQ 的持久化,就是消息寫入之後會持久化到磁碟,哪怕是 MQ 自己掛了,恢復之後會自動讀取之前存儲的數據,一般數據不會丟。

4.2 Dyno-queues 保證

Dyno-queues 使用ensure來確認消息完全寫入到所有分區

簡單來說,就是:

  1. 對於所有分區,逐一進行:”寫數據(就是message id),讀出寫入的數據” 這樣的操作。如果有一個分區寫出錯,就返回失敗。
  2. 如果把 message id 都已經寫入到所有的分區,再寫入消息內容。

Enqueues ‘message’ if it doesn’t exist in any of the shards or unack sets.

@Override
public boolean ensure(Message message) {
    return execute("ensure", "(a shard in) " + queueName, () -> {

        String messageId = message.getId();
        for (String shard : allShards) {

            String queueShard = getQueueShardKey(queueName, shard);
            Double score = quorumConn.zscore(queueShard, messageId);
            if (score != null) {
                return false;
            }
            String unackShardKey = getUnackKey(queueName, shard);
            score = quorumConn.zscore(unackShardKey, messageId);
            if (score != null) {
                return false;
            }
            
        }
        push(Collections.singletonList(message));
        return true;
    });
}

0x05 過期消息

針對過期消息,Dyno-queues 的處理方式是一次性找出過期消息給用戶處理,其中過期時間由用戶在參數中設定。

所以 findStaleMessages 就是利用 lua 腳本找出過期消息。

@Override
public List<Message> findStaleMessages() {
    return execute("findStaleMessages", localQueueShard, () -> {

        List<Message> stale_msgs = new ArrayList<>();

        int batchSize = 10;

        double now = Long.valueOf(clock.millis()).doubleValue();
        long num_stale = 0;

        for (String shard : allShards) {
            String queueShardName = getQueueShardKey(queueName, shard);
            
            Set<String> elems = nonQuorumConn.zrangeByScore(queueShardName, 0, now, 0, batchSize);

            if (elems.size() == 0) {
                continue;
            }

            String findStaleMsgsScript = "local hkey=KEYS[1]\n" +
                    "local queue_shard=ARGV[1]\n" +
                    "local unack_shard=ARGV[2]\n" +
                    "local num_msgs=ARGV[3]\n" +
                    "\n" +
                    "local stale_msgs={}\n" +
                    "local num_stale_idx = 1\n" +
                    "for i=0,num_msgs-1 do\n" +
                    "  local msg_id=ARGV[4+i]\n" +
                    "\n" +
                    "  local exists_hash = redis.call('hget', hkey, msg_id)\n" +
                    "  local exists_queue = redis.call('zscore', queue_shard, msg_id)\n" +
                    "  local exists_unack = redis.call('zscore', unack_shard, msg_id)\n" +
                    "\n" +
                    "  if (exists_hash and exists_queue) then\n" +
                    "  elseif (not (exists_unack)) then\n" +
                    "    stale_msgs[num_stale_idx] = msg_id\n" +
                    "    num_stale_idx = num_stale_idx + 1\n" +
                    "  end\n" +
                    "end\n" +
                    "\n" +
                    "return stale_msgs\n";

            String unackKey = getUnackKey(queueName, shard);
            ImmutableList.Builder builder = ImmutableList.builder();
            builder.add(queueShardName);
            builder.add(unackKey);
            builder.add(Integer.toString(elems.size()));
            for (String msg : elems) {
                builder.add(msg);
            }

            ArrayList<String> stale_msg_ids = (ArrayList) ((DynoJedisClient)quorumConn).eval(findStaleMsgsScript, Collections.singletonList(messageStoreKey), builder.build());
            num_stale = stale_msg_ids.size();

            for (String m : stale_msg_ids) {
                Message msg = new Message();
                msg.setId(m);
                stale_msgs.add(msg);
            }
        }

        return stale_msgs;
    });
}

0x6 消息刪除

Dyno-queues 支援消息刪除:業務使用方可以隨時刪除指定消息。

具體刪除是 從 unack隊列 和 正常隊列中刪除。

@Override
public boolean remove(String messageId) {
		return execute("remove", "(a shard in) " + queueName, () -> {

            for (String shard : allShards) {

                String unackShardKey = getUnackKey(queueName, shard);
                quorumConn.zrem(unackShardKey, messageId);

                String queueShardKey = getQueueShardKey(queueName, shard);
                Long removed = quorumConn.zrem(queueShardKey, messageId);

                if (removed > 0) {
                    // Ignoring return value since we just want to get rid of it.
                    Long msgRemoved = quorumConn.hdel(messageStoreKey, messageId);
                    return true;
                }
            }
            return false;
        });
}

0x07 批量處理以增加吞吐

Dyno-queues 利用lua腳本來進行批量處理,這樣可以增加吞吐。

7.1 Lua腳本

Redis中為什麼引入Lua腳本?

Redis提供了非常豐富的指令集,官網上提供了200多個命令。但是某些特定領域,需要擴充若干指令原子性執行時,僅使用原生命令便無法完成。

Redis 為這樣的用戶場景提供了 lua 腳本支援,用戶可以向伺服器發送 lua 腳本來執行自定義動作,獲取腳本的響應數據。Redis 伺服器會單執行緒原子性執行 lua 腳本,保證 lua 腳本在處理的過程中不會被任意其它請求打斷。

使用腳本的好處如下:

  • 減少網路開銷。可以將多個請求通過腳本的形式一次發送,減少網路時延。
  • 原子操作。Redis會將整個腳本作為一個整體執行,中間不會被其他請求插入。因此在腳本運行過程中無需擔心會出現競態條件,無需使用事務。
  • 復用。客戶端發送的腳本會永久存在redis中,這樣其他客戶端可以復用這一腳本,而不需要使用程式碼完成相同的邏輯。

7.2 實現

具體程式碼如下,可以看到就是採用了lua腳本一次性寫入:

// TODO: Do code cleanup/consolidation
private List<Message> atomicBulkPopHelper(int messageCount,
                      ConcurrentLinkedQueue<String> prefetchedIdQueue, boolean localShardOnly) throws IOException {

    double now = Long.valueOf(clock.millis() + 1).doubleValue();
    double unackScore = Long.valueOf(clock.millis() + unackTime).doubleValue();

    // The script requires the scores as whole numbers
    NumberFormat fmt = NumberFormat.getIntegerInstance();
    fmt.setGroupingUsed(false);
    String nowScoreString = fmt.format(now);
    String unackScoreString = fmt.format(unackScore);

    List<String> messageIds = new ArrayList<>();
    for (int i = 0; i < messageCount; ++i) {
        messageIds.add(prefetchedIdQueue.poll());
    }

    String atomicBulkPopScriptLocalOnly="local hkey=KEYS[1]\n" +
            "local num_msgs=ARGV[1]\n" +
            "local peek_until=ARGV[2]\n" +
            "local unack_score=ARGV[3]\n" +
            "local queue_shard_name=ARGV[4]\n" +
            "local unack_shard_name=ARGV[5]\n" +
            "local msg_start_idx = 6\n" +
            "local idx = 1\n" +
            "local return_vals={}\n" +
            "for i=0,num_msgs-1 do\n" +
            "  local message_id=ARGV[msg_start_idx + i]\n" +
            "  local exists = redis.call('zscore', queue_shard_name, message_id)\n" +
            "  if (exists) then\n" +
            "    if (exists <=peek_until) then\n" +
            "      local value = redis.call('hget', hkey, message_id)\n" +
            "      if (value) then\n" +
            "        local zadd_ret = redis.call('zadd', unack_shard_name, 'NX', unack_score, message_id)\n" +
            "        if (zadd_ret) then\n" +
            "          redis.call('zrem', queue_shard_name, message_id)\n" +
            "          return_vals[idx]=value\n" +
            "          idx=idx+1\n" +
            "        end\n" +
            "      end\n" +
            "    end\n" +
            "  else\n" +
            "    return {}\n" +
            "  end\n" +
            "end\n" +
            "return return_vals";

    String atomicBulkPopScript="local hkey=KEYS[1]\n" +
            "local num_msgs=ARGV[1]\n" +
            "local num_shards=ARGV[2]\n" +
            "local peek_until=ARGV[3]\n" +
            "local unack_score=ARGV[4]\n" +
            "local shard_start_idx = 5\n" +
            "local msg_start_idx = 5 + (num_shards * 2)\n" +
            "local out_idx = 1\n" +
            "local return_vals={}\n" +
            "for i=0,num_msgs-1 do\n" +
            "  local found_msg=false\n" +
            "  local message_id=ARGV[msg_start_idx + i]\n" +
            "  for j=0,num_shards-1 do\n" +
            "    local queue_shard_name=ARGV[shard_start_idx + (j*2)]\n" +
            "    local unack_shard_name=ARGV[shard_start_idx + (j*2) + 1]\n" +
            "    local exists = redis.call('zscore', queue_shard_name, message_id)\n" +
            "    if (exists) then\n" +
            "      found_msg=true\n" +
            "      if (exists <=peek_until) then\n" +
            "        local value = redis.call('hget', hkey, message_id)\n" +
            "        if (value) then\n" +
            "          local zadd_ret = redis.call('zadd', unack_shard_name, 'NX', unack_score, message_id)\n" +
            "          if (zadd_ret) then\n" +
            "            redis.call('zrem', queue_shard_name, message_id)\n" +
            "            return_vals[out_idx]=value\n" +
            "            out_idx=out_idx+1\n" +
            "            break\n" +
            "          end\n" +
            "        end\n" +
            "      end\n" +
            "    end\n" +
            "  end\n" +
            "  if (found_msg == false) then\n" +
            "    return {}\n" +
            "  end\n" +
            "end\n" +
            "return return_vals";

    List<Message> payloads = new ArrayList<>();
    if (localShardOnly) {
        String unackShardName = getUnackKey(queueName, shardName);

        ImmutableList.Builder builder = ImmutableList.builder();
        builder.add(Integer.toString(messageCount));
        builder.add(nowScoreString);
        builder.add(unackScoreString);
        builder.add(localQueueShard);
        builder.add(unackShardName);
        for (int i = 0; i < messageCount; ++i) {
            builder.add(messageIds.get(i));
        }

        List<String> jsonPayloads;
        // Cast from 'JedisCommands' to 'DynoJedisClient' here since the former does not expose 'eval()'.
        jsonPayloads = (List) ((DynoJedisClient) quorumConn).eval(atomicBulkPopScriptLocalOnly,
                Collections.singletonList(messageStoreKey), builder.build());

        for (String p : jsonPayloads) {
            Message msg = om.readValue(p, Message.class);
            payloads.add(msg);
        }
    } else {
        ImmutableList.Builder builder = ImmutableList.builder();
        builder.add(Integer.toString(messageCount));
        builder.add(Integer.toString(allShards.size()));
        builder.add(nowScoreString);
        builder.add(unackScoreString);
        for (String shard : allShards) {
            String queueShard = getQueueShardKey(queueName, shard);
            String unackShardName = getUnackKey(queueName, shard);
            builder.add(queueShard);
            builder.add(unackShardName);
        }
        for (int i = 0; i < messageCount; ++i) {
            builder.add(messageIds.get(i));
        }

        List<String> jsonPayloads;
        // Cast from 'JedisCommands' to 'DynoJedisClient' here since the former does not expose 'eval()'.
        jsonPayloads = (List) ((DynoJedisClient) quorumConn).eval(atomicBulkPopScript,
                Collections.singletonList(messageStoreKey), builder.build());

        for (String p : jsonPayloads) {
            Message msg = om.readValue(p, Message.class);
            payloads.add(msg);
        }
    }

    return payloads;
}

0x08 V2

最新版本是 V2,有三個類,我們看看具體是什麼作用。

  • QueueBuilder

  • MultiRedisQueue

  • RedisPipelineQueue

8.1 QueueBuilder

就是封裝,對外統一提供API。

public class QueueBuilder {

    private Clock clock;

    private String queueName;

    private String redisKeyPrefix;

    private int unackTime;

    private String currentShard;

    private ShardSupplier shardSupplier;

    private HostSupplier hs;

    private EurekaClient eurekaClient;

    private String applicationName;

    private Collection<Host> hosts;

    private JedisPoolConfig redisPoolConfig;

    private DynoJedisClient dynoQuorumClient;

    private DynoJedisClient dynoNonQuorumClient;
}

8.2 MultiRedisQueue

該類也是為了提高速度,其內部包括多個RedisPipelineQueue,每個queue代表一個分區,利用 round robin 方式寫入。

/**
 * MultiRedisQueue exposes a single queue using multiple redis queues.  Each RedisQueue is a shard.
 * When pushing elements to the queue, does a round robin to push the message to one of the shards.
 * When polling, the message is polled from the current shard (shardName) the instance is associated with.
 */
public class MultiRedisQueue implements DynoQueue {
    private List<String> shards;
    private String name;
    private Map<String, RedisPipelineQueue> queues = new HashMap<>();
    private RedisPipelineQueue me;
}

8.3 RedisPipelineQueue

這個類就是使用pipeline來提升吞吐。

Queue implementation that uses Redis pipelines that improves the throughput under heavy load.。

public class RedisPipelineQueue implements DynoQueue {

    private final Logger logger = LoggerFactory.getLogger(RedisPipelineQueue.class);

    private final Clock clock;

    private final String queueName;

    private final String shardName;

    private final String messageStoreKeyPrefix;

    private final String myQueueShard;

    private final String unackShardKeyPrefix;

    private final int unackTime;

    private final QueueMonitor monitor;

    private final ObjectMapper om;

    private final RedisConnection connPool;

    private volatile RedisConnection nonQuorumPool;

    private final ScheduledExecutorService schedulerForUnacksProcessing;

    private final HashPartitioner partitioner = new Murmur3HashPartitioner();

    private final int maxHashBuckets = 32;

    private final int longPollWaitIntervalInMillis = 10;
}

0xFF 參考

乾貨分享 | 如何從零開始設計一個消息隊列

消息隊列的理解,幾種常見消息隊列對比,新手也能看得懂!—-分散式中間件消息隊列

消息隊列設計精要

有贊延遲隊列設計

基於Dynomite的分散式延遲隊列

//blog.mikebabineau.com/2013/02/09/delay-queues-in-redis/

//stackoverflow.com/questions/17014584/how-to-create-a-delayed-queue-in-rabbitmq

//activemq.apache.org/delay-and-schedule-message-delivery.html

源碼分析] Dynomite 分散式存儲引擎 之 DynoJedisClient(1)

源碼分析] Dynomite 分散式存儲引擎 之 DynoJedisClient(2)

原創 Amazon Dynamo系統架構

Netlix Dynomite性能基準測試,基於AWS和Redis

為什麼分散式一定要有延時任務?

Exit mobile version