Kafka Producer源码解析一:整体架构
一、Producer整体架构
Kafka Producer端的架构整体也是一个生产者-消费者模式
- Producer线程调用send时,只是将数据序列化后放入对应TopicPartition的Deque尾部的ProducerBatch数据结构中
- Sender线程每次扫描所有Deque的尾部,得到需要发送的readyNodes,并确认所有的readyNodes都已建立好连接。
- 遍历readyNodes,再遍历每个Node上所有partition的Deque的队头,直到凑齐max.request.size或遍历完,并使用NIO进行发送
- BufferPool会进行整体内存管理与ProducerBatch内存复用,减少GC
二、标准Producer代码
可以看到Producer核心有3部分,初始化配置,new KafkaProducer,调用send发送消息。注意KafkaProducer是线程安全的,可以多线程共享同一实例
public class KafkaProducerTest {
public static void main(String[] args) throws FileNotFoundException {
Properties props = new Properties();
// 初始嗅探的服务器
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.0.40.4:6667,10.0.40.5:6667");
// ALL,代表写入leader和副本都成功才返回,1代表写入leader成功就返回,0代表直接返回,不保证server是否写入
props.put(ProducerConfig.ACKS_CONFIG, "all");
// ProducerBatch的大小,这里是100KB
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 102400);
// BuffPool的大小。当空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值通过max.block.ms设定,之后它将抛出一个TimeoutException
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
// KafkaProducer初始化
Producer<String, String> producer = new KafkaProducer<>(props);
// 异步发送,返回结果是一个RecordMetadata,它指定了消息发送的分区,分配的offset和消息的时间戳
for (int i = 0; i < 1000; i++) {
producer.send(new ProducerRecord<String, String>("topic-name", "this is a meesage"), (metadata, excp) -> {
if (excp != null) {
// 消息发送失败的处理逻辑
excp.printStackTrace();
}
});
}
producer.flush();
producer.close();
}
}
三、Producer线程逻辑
3.1 Producer核心发送逻辑解读
- 序列化key,value,计算出要发往哪个parition
- 将record根据TopicPartition append到RecordAccmulator内部Map对应的Deque尾部
- RecordAccmulator生成新batch时,会从内部BufferPool中申请内存,Batch被Sender线程使用完后,会将内存归还给BufferPool。
- Sender线程会遍历RecordAccmulator中全部队列,判断队首的batch是否发送
- RecordAccmulator原理参考第四章,BufferPool原理第五章
值得学习的点:
1. BufferPool复用ByteBuffer类,减少GC,本质是利用byte[]数据覆盖,而不是每次产生新的byte[],旧的byte[]交给GC
2. 数据准备与网络IO分离的架构,KafkaProducer线程安全,说明通常情况下瓶颈在数据准备上,可以多个线程生成Record,一个Sender线程发送。本质也是一个生产消费模型
2.2 new KafkaProducer()初始化逻辑
- metric、intercepters、partitioner、key、value序列化器初始化、配置初始化
- 初始化accumulator
- 初始化sender线程并启动
······
// 分区器,决定了每条消息发往哪个分区
this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
······
// 初始化accumulator,内部batches是一个Map封装了发往不同TopicPartition的不同队列
this.accumulator = new RecordAccumulator(···,batch_size,totalMemory,compressionType,···);
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
// 获取集群元数据
if (metadata != null) {
this.metadata = metadata;
} else {
this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
true, true, clusterResourceListeners);
this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds());
}
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics);
// NIO的包装类,sender线程内部使用此类进行网络IO
KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient(···
new Selector(···),
this.metadata,···);
// sender线程,封装了从队头中读取ProducerBatch并发送的逻辑
this.sender = new Sender(···,
client,
this.metadata,
this.accumulator,···);
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
// KafkaThread只是一个包装类,为了启动Sender线程
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
// 启动Sender线程
this.ioThread.start();
······
2.3 doSend逻辑
- 序列化key,value
- 调用partition()获取写入的partition id
- 调用ensureValidRecordSize()判断此条消息是否超过max.request.size与buffer.memrory
- 调用accumulator.append,将Record加入内存队列
- 根据返回结果,判断当前batch满了或新创建了一个batch则唤醒sender线程
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
······
// key序列化
byte[] serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
// value序列化
byte[] serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
// 计算出record发往的partition
int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
setReadOnly(record.headers());
Header[] headers = record.headers().toArray();
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
compressionType, serializedKey, serializedValue, headers);
// 校验序列化后大小是否超过max.request.size, buffer.memory。注意这里是压缩前大小
ensureValidRecordSize(serializedSize);
······
// 将record append到队列中
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs);
// 新产生一个batch或者batch满了都唤醒sender线程
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
// 返回异步future结果
return result.future;
······
}
2.4 partition逻辑
- 指明 partition的情况下,直接将指明的值直接作为 partiton 值;
- 没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值;
- 既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition 值,也就是常说的 round-robin 算法。
·····doSend调用parition的逻辑·····
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
// 1、若record本身设置了partition,则取此parition为目的partition
Integer partition = record.partition();
// 2、若未设置partition,调用分区器parititioner得到发往的分区id
return partition != null ?
partition :
partitioner.partition(
record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}
······DefaultPartitioner的逻辑······
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 1、从元数据中获取topic对应的全部partition
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
// 2、若没有设置key,nextValue会对每个topic产生一个随机数,以此为起点做round-robin轮询
int nextValue = nextValue(topic);
// 注意不设置key,只轮询可用的partition
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
// 求模映射
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
// 如果没有可用的partition,还是根据总parition数返回一个值
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
// 如果设置了key,严格按照key进行hash映射到对应的partition上
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
三、 Sender线程
//blog.csdn.net/bohu83/article/details/88853553
public void run() {
// 核心循环,定时轮询accumulator判断是否有record发送
while (running) {
try {
// 核心运行逻辑
run(time.milliseconds());
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
// okay we stopped accepting requests but there may still be
// requests in the accumulator or waiting for acknowledgment,
// wait until these are completed.
// 正常关闭,将内存队列中剩余项先发送完
while (!forceClose && (this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0)) {
try {
run(time.milliseconds());
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
// 强行关闭,也需要等待Producer线程退出
if (forceClose) {
// We need to fail all the incomplete batches and wake up the threads waiting on the futures.
this.accumulator.abortIncompleteBatches();
}
// 网络层client关闭
try {
this.client.close();
} catch (Exception e) {
log.error("Failed to close network client", e);
}
log.debug("Shutdown of Kafka producer I/O thread has completed.");
}
void run(long now) {
······
// 从accumulator的Deque中取数据,通过client异步发送
long pollTimeout = sendProducerData(now);
// NIO Reactor事件循环,网络事件发生时,处理网络事件
client.poll(pollTimeout, now);
}
private long sendProducerData(long now) {
Cluster cluster = metadata.fetch();
// get the list of partitions with data ready to send
// 遍历所有队列队首,得到可发送的batch所在的节点集合(并且是leader可知的)返回。
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
// if there are any partitions whose leaders are not known yet, force metadata update
// 结果中leader不可知的TopicPartition,再次发送元数据更新请求
if (!result.unknownLeaderTopics.isEmpty()) {
// The set of topics with unknown leader contains topics with leader election pending as well as
// topics which may have expired. Add the topic again to metadata to ensure it is included
// and request metadata update, since there are messages to send to the topic.
for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic);
this.metadata.requestUpdate();
}
// remove any nodes we aren't ready to send to
// 需要
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
// 返回true,如果已经和此节点建立好连接或建立连接成功
if (!this.client.ready(node, now)) {
// 从readNodes中移除此Node
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
}
}
// create produce requests
// 遍历每个已定的需要发送数据的Broker,遍历它上面所有TopicPartition队列的队首,组装直到满足max.request.size。
// 返回节点id与发往此节点的ProducerBatch集合
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes,
this.maxRequestSize, now);
// maxInflightRequests == 1代表需要保证顺序
if (guaranteeMessageOrder) {
// Mute all the partitions drained
// 发送前,将要发送的TopicPartition放到mute列表中,禁止后续再发送mute列表中的TopicPartition的Batch,避免网络乱序
for (List<ProducerBatch> batchList : batches.values()) {
for (ProducerBatch batch : batchList)
// 将TopicPartition加入accumulator中mute集合中
this.accumulator.mutePartition(batch.topicPartition);
}
}
// 得到所有队列中过期的batch集合
List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(this.requestTimeout, now);
// Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics
// for expired batches. see the documentation of @TransactionState.resetProducerId to understand why
// we need to reset the producer id here.
if (!expiredBatches.isEmpty())
log.trace("Expired {} batches in accumulator", expiredBatches.size());
// 超时的直接走Batch失败流程,Producer线程会拿到异常报错信息
for (ProducerBatch expiredBatch : expiredBatches) {
failBatch(expiredBatch, -1, NO_TIMESTAMP, expiredBatch.timeoutException(), false);
if (transactionManager != null && expiredBatch.inRetry()) {
// This ensures that no new batches are drained until the current in flight batches are fully resolved.
transactionManager.markSequenceUnresolved(expiredBatch.topicPartition);
}
}
sensors.updateProduceRequestMetrics(batches);
// If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
// loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
// that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
// with sendable data that aren't ready to send since they would cause busy looping.
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
if (!result.readyNodes.isEmpty()) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);
// if some partitions are already ready to be sent, the select time would be 0;
// otherwise if some partition already has some data accumulated but not ready yet,
// the select time will be the time difference between now and its linger expiry time;
// otherwise the select time will be the time difference between now and the metadata expiry time;
pollTimeout = 0;
}
// 封装请求,通过NetworkClient发送
sendProduceRequests(batches, now);
return pollTimeout;
}
四、accumulator结构分析
accumulator关键变量
// 内存池,ProducerBatch实际使用的内存来自于此
private final BufferPool free;
// CopyOnWriteMap,每个TopicPartition会维护一个Deque
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
// 若maxInflightRequests=1,将有正在发送数据的TopicPartition放入此集合,发送时会跳过
private final Set<TopicPartition> muted;
append()
ready()
drain()
expiredBatches()
mutePartition()
unmutePartition()
五、BufferPool实现内存复用
// 总内存大小=buffer.memory
private final long totalMemory;
// BufferPool中能重用的ByteBuffer大小=batch.size
private final int poolableSize;
// 可复用的ByteBuffer
private final Deque<ByteBuffer> free;
// 内存不够时,Producer线程会在这里等待内存释放,等待被唤醒
private final Deque<Condition> waiters;
public BufferPool(long memory, int poolableSize, Metrics metrics, Time time, String metricGrpName) {
// batch_size,只有batch_size
this.poolableSize = poolableSize;
this.lock = new ReentrantLock();
// ByteBuffer,用于缓存使用过的HeapByteBuffer,注意只有大小为poolableSize的HeapByteBuffer才会缓存
this.free = new ArrayDeque<>();
// 内存不够时,条件等待,这里是等待队列
this.waiters = new ArrayDeque<>();
// 总内存大小,就是buffer.memory,这个配置基本代表Producer的堆大小
this.totalMemory = memory;
// 剩余内存大小
this.availableMemory = memory;
this.metrics = metrics;
this.time = time;
this.waitTime = this.metrics.sensor(WAIT_TIME_SENSOR_NAME);
MetricName metricName = metrics.metricName("bufferpool-wait-ratio",
metricGrpName,
"The fraction of time an appender waits for space allocation.");
this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
}
public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
if (size > this.totalMemory)
throw new IllegalArgumentException("Attempt to allocate " + size
+ " bytes, but there is a hard limit of "
+ this.totalMemory
+ " on memory allocations.");
ByteBuffer buffer = null;
this.lock.lock();
try {
// check if we have a free buffer of the right size pooled
// 请求的size为batch_size,若free列表不为空,说明有可复用的HeapByteBuffer,直接复用
if (size == poolableSize && !this.free.isEmpty())
return this.free.pollFirst();
// now check if the request is immediately satisfiable with the
// memory on hand or if we need to block
int freeListSize = freeSize() * this.poolableSize;
// 若可用内存足够(可用内存=totalmeory - 有部分被申请了,没有归还的=this.nonPooledAvailableMemory + freeListSize)
if (this.nonPooledAvailableMemory + freeListSize >= size) {
// we have enough unallocated or pooled memory to immediately
// satisfy the request, but need to allocate the buffer
// 释放free list里的内存(说明申请的是非常规的Size,超过了Batch_size大小)
freeUp(size);
this.nonPooledAvailableMemory -= size;
} else {
// we are out of memory and will have to block
// 可用内存不够,阻塞,等待batch发送(因为外部做了校验,理论上等待足够长,总内存是肯定够的)
int accumulated = 0;
Condition moreMemory = this.lock.newCondition();
try {
long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
this.waiters.addLast(moreMemory);
// loop over and over until we have a buffer or have reserved
// enough memory to allocate one
while (accumulated < size) {
long startWaitNs = time.nanoseconds();
long timeNs;
boolean waitingTimeElapsed;
try {
// 阻塞,有内存归还时会被唤醒
waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
} finally {
long endWaitNs = time.nanoseconds();
timeNs = Math.max(0L, endWaitNs - startWaitNs);
this.waitTime.record(timeNs, time.milliseconds());
}
// 如果是因为超时退出阻塞,报错,分配内存失败
if (waitingTimeElapsed) {
throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
}
// 记录超时时间
remainingTimeToBlockNs -= timeNs;
// check if we can satisfy this request from the free list,
// otherwise allocate memory
// 如果分配的batch_size大小的内存,又有batch归还到了free list,直接复用
if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
// just grab a buffer from the free list
buffer = this.free.pollFirst();
accumulated = size;
} else {
// we'll need to allocate memory, but we may only get
// part of what we need on this iteration
freeUp(size - accumulated);
int got = (int) Math.min(size - accumulated, this.nonPooledAvailableMemory);
this.nonPooledAvailableMemory -= got;
accumulated += got;
}
}
// Don't reclaim memory on throwable since nothing was thrown
accumulated = 0;
} finally {
// When this loop was not able to successfully terminate don't loose available memory
// try块在最后一句执行accumulated = 0;若执行成功,下面语句无作用
// 若失败,说明发生异常,将已分配的内存“还”给BufferPool
this.nonPooledAvailableMemory += accumulated;
this.waiters.remove(moreMemory);
}
}
} finally {
// signal any additional waiters if there is more memory left
// over for them
try {
// 判断还有多余内存,唤醒下一个等待内存的线程
if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty())
this.waiters.peekFirst().signal();
} finally {
// Another finally... otherwise find bugs complains
lock.unlock();
}
}
if (buffer == null)
return safeAllocateByteBuffer(size);
else
return buffer;
}
public void deallocate(ByteBuffer buffer, int size) {
// 因为producer设计可以多线程访问,因此可能多线程访问,需要加锁
lock.lock();
try {
// 只有poolableSize大小的ByteBuffer才缓存
if (size == this.poolableSize && size == buffer.capacity()) {
buffer.clear();
this.free.add(buffer);
} else {
// 否则只是将内存放回,增加availableMemory
this.nonPooledAvailableMemory += size;
}
// memory增长了,因此通知因为内存不够等待的调用者,目前有新的可用内存
Condition moreMem = this.waiters.peekFirst();
if (moreMem != null)
moreMem.signal();
} finally {
lock.unlock();
}
}
private void freeUp(int size) {
// 当不能复用free中的HeapByteBuffer时
// 释放free的内存,以增加availableMemory中的内存,提供服务
// totalMemory=buffer.memory=free.size()*batch_size + availableMemory
while (!this.free.isEmpty() && this.nonPooledAvailableMemory < size)
this.nonPooledAvailableMemory += this.free.pollLast().capacity();
}
六、NIO网络层
七、其它
压缩
- Producer线程会压缩,再tryAppend调用时,写入HeapByteBuffer时会进行流式压缩(如lz4为64kb一个block进行压缩)