Kafka源碼研究–Comsumer獲取partition下標
- 2019 年 10 月 27 日
- 筆記
背景
由於項目上Flink在設置parallel多於1的情況下,job沒法正確地獲取watermark,所以周末來研究一下一部分,大概已經鎖定了原因:
雖然我們的topic只設置了1的partition,但是Kafka的Comsumer還是起了好幾個subtask去讀索引是2、3的partition,然後這幾個subtask的watermark一直不更新,導致我們job整體的watermark一直是Long.MIN_VALUE。現在需要去了解一下subtask獲取partition的流程,等上班的時候debug一遍應該就可以知道原因。
翻源碼的過程
通過log找到分配partition的大概位置
從圖中可以看到,在org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase
這個類中可以找到一些關鍵資訊。
跟蹤源碼
往上翻翻,看有沒有有用資訊
關鍵源碼,附上注釋
public void open(Configuration configuration) throws Exception { // determine the offset commit mode this.offsetCommitMode = OffsetCommitModes.fromConfiguration( getIsAutoCommitEnabled(), enableCommitOnCheckpoints, ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled()); // create the partition discoverer this.partitionDiscoverer = createPartitionDiscoverer( topicsDescriptor, getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks()); this.partitionDiscoverer.open(); subscribedPartitionsToStartOffsets = new HashMap<>(); // 重點函數,這個函數或獲取到subtask的所有partition。 final List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions(); if (restoredState != null) { ... } else { // use the partition discoverer to fetch the initial seed partitions, // and set their initial offsets depending on the startup mode. // for SPECIFIC_OFFSETS and TIMESTAMP modes, we set the specific offsets now; // for other modes (EARLIEST, LATEST, and GROUP_OFFSETS), the offset is lazily determined // when the partition is actually read. switch (startupMode) { ... default: for (KafkaTopicPartition seedPartition : allPartitions) { subscribedPartitionsToStartOffsets.put(seedPartition, startupMode.getStateSentinel()); } } if (!subscribedPartitionsToStartOffsets.isEmpty()) { switch (startupMode) { ... case GROUP_OFFSETS: LOG.info("Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}", getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets.keySet()); } } else { LOG.info("Consumer subtask {} initially has no partitions to read from.", getRuntimeContext().getIndexOfThisSubtask()); } } public List<KafkaTopicPartition> discoverPartitions() throws WakeupException, ClosedException { if (!closed && !wakeup) { try { List<KafkaTopicPartition> newDiscoveredPartitions; // (1) get all possible partitions, based on whether we are subscribed to fixed topics or a topic pattern if (topicsDescriptor.isFixedTopics()) { // 對於沒有使用通配符的topic,直接獲取topic的所有partition newDiscoveredPartitions = getAllPartitionsForTopics(topicsDescriptor.getFixedTopics()); } else { // 對於使用了通配符的topic, 先找到所有topic,再一一match List<String> matchedTopics = getAllTopics(); // retain topics that match the pattern Iterator<String> iter = matchedTopics.iterator(); while (iter.hasNext()) { if (!topicsDescriptor.isMatchingTopic(iter.next())) { iter.remove(); } } if (matchedTopics.size() != 0) { // get partitions only for matched topics newDiscoveredPartitions = getAllPartitionsForTopics(matchedTopics); } else { newDiscoveredPartitions = null; } } // (2) eliminate partition that are old partitions or should not be subscribed by this subtask if (newDiscoveredPartitions == null || newDiscoveredPartitions.isEmpty()) { throw new RuntimeException("Unable to retrieve any partitions with KafkaTopicsDescriptor: " + topicsDescriptor); } else { Iterator<KafkaTopicPartition> iter = newDiscoveredPartitions.iterator(); KafkaTopicPartition nextPartition; while (iter.hasNext()) { nextPartition = iter.next(); // 只保留符合要求的partition,這就是我們要找的函數 if (!setAndCheckDiscoveredPartition(nextPartition)) { iter.remove(); } } } return newDiscoveredPartitions; }... }... } public boolean setAndCheckDiscoveredPartition(KafkaTopicPartition partition) { if (isUndiscoveredPartition(partition)) { discoveredPartitions.add(partition); // 在這 return KafkaTopicPartitionAssigner.assign(partition, numParallelSubtasks) == indexOfThisSubtask; } return false; } public static int assign(KafkaTopicPartition partition, int numParallelSubtasks) { // 先算出此topic的hash(partition.getTopic().hashCode() * 31),這裡不知道為什麼不直接用hash,還要再*31,然後取正數(& 0x7FFFFFFF),最後獲取到此topic的起始位置。 int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks; // here, the assumption is that the id of Kafka partitions are always ascending // starting from 0, and therefore can be used directly as the offset clockwise from the start index // 計算當前的partition應該屬於哪個subtask。例如:一共有20個subtask,算出來的起始位置是5,partition是5,那麼最後就是 // (5 + 5) % 20 = 10, 這個partition應該分給10號subtask。 return (startIndex + partition.getPartition()) % numParallelSubtasks; }
思考
某topic的每個partition會分給哪個subtask其實是確定的
topic名字是確定的 -> topic的hashCode是確定的 && subtask的數量是確定的 -> startIndex是確定的 -> 某partition會分給哪個subtask其實是確定的
為什麼要算startIndex
大概是為了平均分配不同的topic,如果topic很多,每個topic都只從0開始,那麼subtask 0,1,2之類的靠前subtask就需要讀大量的partition。