Kafka源碼研究–Comsumer獲取partition下標

  • 2019 年 10 月 27 日
  • 筆記

背景

由於項目上Flink在設置parallel多於1的情況下,job沒法正確地獲取watermark,所以周末來研究一下一部分,大概已經鎖定了原因:
雖然我們的topic只設置了1的partition,但是Kafka的Comsumer還是起了好幾個subtask去讀索引是2、3的partition,然後這幾個subtask的watermark一直不更新,導致我們job整體的watermark一直是Long.MIN_VALUE。現在需要去了解一下subtask獲取partition的流程,等上班的時候debug一遍應該就可以知道原因。

翻源碼的過程

通過log找到分配partition的大概位置

find partition

從圖中可以看到,在org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase這個類中可以找到一些關鍵資訊。

跟蹤源碼

log partition

往上翻翻,看有沒有有用資訊

all partitions

關鍵源碼,附上注釋

    public void open(Configuration configuration) throws Exception {          // determine the offset commit mode          this.offsetCommitMode = OffsetCommitModes.fromConfiguration(                  getIsAutoCommitEnabled(),                  enableCommitOnCheckpoints,                  ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());            // create the partition discoverer          this.partitionDiscoverer = createPartitionDiscoverer(                  topicsDescriptor,                  getRuntimeContext().getIndexOfThisSubtask(),                  getRuntimeContext().getNumberOfParallelSubtasks());          this.partitionDiscoverer.open();            subscribedPartitionsToStartOffsets = new HashMap<>();          // 重點函數,這個函數或獲取到subtask的所有partition。          final List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions();          if (restoredState != null) {              ...          } else {              // use the partition discoverer to fetch the initial seed partitions,              // and set their initial offsets depending on the startup mode.              // for SPECIFIC_OFFSETS and TIMESTAMP modes, we set the specific offsets now;              // for other modes (EARLIEST, LATEST, and GROUP_OFFSETS), the offset is lazily determined              // when the partition is actually read.              switch (startupMode) {                  ...                  default:                      for (KafkaTopicPartition seedPartition : allPartitions) {                          subscribedPartitionsToStartOffsets.put(seedPartition, startupMode.getStateSentinel());                      }              }                if (!subscribedPartitionsToStartOffsets.isEmpty()) {                  switch (startupMode) {                      ...                      case GROUP_OFFSETS:                          LOG.info("Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}",                              getRuntimeContext().getIndexOfThisSubtask(),                              subscribedPartitionsToStartOffsets.size(),                              subscribedPartitionsToStartOffsets.keySet());                  }              } else {                  LOG.info("Consumer subtask {} initially has no partitions to read from.",                      getRuntimeContext().getIndexOfThisSubtask());              }          }        public List<KafkaTopicPartition> discoverPartitions() throws WakeupException, ClosedException {          if (!closed && !wakeup) {              try {                  List<KafkaTopicPartition> newDiscoveredPartitions;                    // (1) get all possible partitions, based on whether we are subscribed to fixed topics or a topic pattern                  if (topicsDescriptor.isFixedTopics()) {                      // 對於沒有使用通配符的topic,直接獲取topic的所有partition                      newDiscoveredPartitions = getAllPartitionsForTopics(topicsDescriptor.getFixedTopics());                  } else {                      // 對於使用了通配符的topic, 先找到所有topic,再一一match                      List<String> matchedTopics = getAllTopics();                        // retain topics that match the pattern                      Iterator<String> iter = matchedTopics.iterator();                      while (iter.hasNext()) {                          if (!topicsDescriptor.isMatchingTopic(iter.next())) {                              iter.remove();                          }                      }                        if (matchedTopics.size() != 0) {                          // get partitions only for matched topics                          newDiscoveredPartitions = getAllPartitionsForTopics(matchedTopics);                      } else {                          newDiscoveredPartitions = null;                      }                  }                    // (2) eliminate partition that are old partitions or should not be subscribed by this subtask                  if (newDiscoveredPartitions == null || newDiscoveredPartitions.isEmpty()) {                      throw new RuntimeException("Unable to retrieve any partitions with KafkaTopicsDescriptor: " + topicsDescriptor);                  } else {                      Iterator<KafkaTopicPartition> iter = newDiscoveredPartitions.iterator();                      KafkaTopicPartition nextPartition;                      while (iter.hasNext()) {                          nextPartition = iter.next();                          // 只保留符合要求的partition,這就是我們要找的函數                          if (!setAndCheckDiscoveredPartition(nextPartition)) {                              iter.remove();                          }                      }                  }                    return newDiscoveredPartitions;              }...          }...      }        public boolean setAndCheckDiscoveredPartition(KafkaTopicPartition partition) {          if (isUndiscoveredPartition(partition)) {              discoveredPartitions.add(partition);                // 在這              return KafkaTopicPartitionAssigner.assign(partition, numParallelSubtasks) == indexOfThisSubtask;          }            return false;      }          public static int assign(KafkaTopicPartition partition, int numParallelSubtasks) {          // 先算出此topic的hash(partition.getTopic().hashCode() * 31),這裡不知道為什麼不直接用hash,還要再*31,然後取正數(& 0x7FFFFFFF),最後獲取到此topic的起始位置。          int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks;            // here, the assumption is that the id of Kafka partitions are always ascending          // starting from 0, and therefore can be used directly as the offset clockwise from the start index          // 計算當前的partition應該屬於哪個subtask。例如:一共有20個subtask,算出來的起始位置是5,partition是5,那麼最後就是          // (5 + 5) % 20 = 10, 這個partition應該分給10號subtask。          return (startIndex + partition.getPartition()) % numParallelSubtasks;      }

思考

某topic的每個partition會分給哪個subtask其實是確定的

topic名字是確定的 -> topic的hashCode是確定的 && subtask的數量是確定的 -> startIndex是確定的 -> 某partition會分給哪個subtask其實是確定的

為什麼要算startIndex

大概是為了平均分配不同的topic,如果topic很多,每個topic都只從0開始,那麼subtask 0,1,2之類的靠前subtask就需要讀大量的partition。