
  • 2019 年 10 月 3 日
  • 筆記

当我们使用kafka向指定Topic发送消息时,如果该Topic具有多个partition,无论消费者有多少,最终都会保证一个partition内的消息只会被一个Consumer group中的一个Consumer消费,也就是说同一Consumer group中的多个Consumer自动会起到负载均衡的效果。


下面我们就针对调用kafka API发送消息到Topic时partition的分配策略,分析下其内部具体的源码码实现。

首先看下kafka API中消息体ProducerRecord类的构造函数,可以看到构造消息时可指定该消息要发送的Topic、partition、key、value等关键信息。

    /**       * Creates a record to be sent to a specified topic and partition       *       * @param topic The topic the record will be appended to       * @param partition The partition to which the record should be sent       * @param key The key that will be included in the record       * @param value The record contents       * @param headers The headers that will be included in the record       */      public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {          this(topic, partition, null, key, value, headers);      }        /**       * Creates a record to be sent to a specified topic and partition       *       * @param topic The topic the record will be appended to       * @param partition The partition to which the record should be sent       * @param key The key that will be included in the record       * @param value The record contents       */      public ProducerRecord(String topic, Integer partition, K key, V value) {          this(topic, partition, null, key, value, null);      }        /**       * Create a record to be sent to Kafka       *       * @param topic The topic the record will be appended to       * @param key The key that will be included in the record       * @param value The record contents       */      public ProducerRecord(String topic, K key, V value) {          this(topic, null, null, key, value, null);      }



producer.send(new ProducerRecord<Object, Object>(topic, key, data));



public class DefaultPartitioner implements Partitioner {        private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();        public void configure(Map<String, ?> configs) {}        /**       * Compute the partition for the given record.       *       * @param topic The topic name       * @param key The key to partition on (or null if no key)       * @param keyBytes serialized key to partition on (or null if no key)       * @param value The value to partition on or null       * @param valueBytes serialized value to partition on or null       * @param cluster The current cluster metadata       */      public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {          //获取该topic的分区列表          List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);          int numPartitions = partitions.size();          //如果key值为null          if (keyBytes == null) {              //维护一个key为topic的ConcurrentHashMap,并通过CAS操作的方式对value值执行递增+1操作              int nextValue = nextValue(topic);              //获取该topic的可用分区列表              List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);              if (availablePartitions.size() > 0) {//如果可用分区大于0                  //执行求余操作,保证消息落在可用分区上                  int part = Utils.toPositive(nextValue) % availablePartitions.size();                  return availablePartitions.get(part).partition();              } else {                  // 没有可用分区的话,就给出一个不可用分区                  return Utils.toPositive(nextValue) % numPartitions;              }          } else {              // 通过计算key的hash,确定消息分区              return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;          }      }        private int nextValue(String topic) {          //获取一个AtomicInteger对象          AtomicInteger counter = topicCounterMap.get(topic);          if (null == counter) {//如果为空              //生成一个随机数              counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());              //维护到topicCounterMap中              AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);              if (currentCounter != null) {                  counter = currentCounter;              }          }          //返回值并执行递增          return counter.getAndIncrement();      }        public void close() {}    }




/**   * 自定义实现Partitioner接口   *   */  public class KeyPartitioner implements Partitioner {        /**       * 实现具体分发策略       */      @Override      public int partition(String topic, Object key, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {          List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);//拉取可用的partition          if (key == null||key.equals("")) {              int random =  (int) (Math.random() * 10);              int part = random % availablePartitions.size();              return availablePartitions.get(part).partition();          }          return  Math.abs(key.toString().hashCode() % 6);      }        @Override      public void configure(Map<String, ?> configs) {          // TODO Auto-generated method stub        }        @Override      public void close() {          // TODO Auto-generated method stub        }    }


Properties properties = new Properties();  properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,KeyPartitioner.class); //加入自定义的配置  producer = new KafkaProducer<Object, Object>(properties);



