Kafka ProducerRecord如何写入到RecordAccumulator
创始人
2024-03-02 21:47:42
0

文章目录

  • 1.从CopyOnWriteMap中获取Deque队列
  • 2.尝试写入消息
    • 2.1基于NIO ByteBuffer分配内存
    • 2.2尝试写入消息
  • 3.底层写入
    • 3.1 如果RecordBatch剩余可用内存空间充足,可以写入
    • 3.2 如果RecordBatch剩余可用内存空间不足

Kafka ProducerRecord通过Partitioner组件得到分区号之后,就要将消息写入到RecordAccumulator中

// 将消息追加到内存缓冲中去
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);public RecordAppendResult append(TopicPartition tp,long timestamp,byte[] key,byte[] value,Callback callback,long maxTimeToBlock) throws InterruptedException {// 因为KafkaProducer是线程安全的,有可能在同一时刻会有多个线程调用该方法将消息写入Kafka// 这一步就是通过AtomicInteger,看看当前有多少个线程正常尝试将消息写入内存缓冲区appendsInProgress.incrementAndGet();try {// 根据TopicPartition从ConcurrentHashMap中取出对应的Deque队列,没有就new(第一次肯定new)// 有可能是多个线程并发调用该方法!!!Deque dq = getOrCreateDeque(tp);// 对刚刚得到的Deque加锁synchronized (dq) {if (closed)throw new IllegalStateException("Cannot send after the producer is closed.");// 第1次尝试:尝试将消息放到Deque中// 第一次进来会失败,因为虽然有了Deque,但是Deque队列中并没有RecordBatchRecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);// appendResult不为null,说明已经将消息放到Deque中了,该方法到此结束if (appendResult != null)// 返回消息追加操作的结果return appendResult;}// 能走到这,说明是第一次进入,虽然有了Deque但是队列中还没有对应的Batch,// 所以将消息放到Deque的操作才没有结果。因此,下面才要申请ByteBuffer用来创建RecordBatch// 确定即将要创建的RecordBatch size:默认的batchSize 16k和消息的size之间取MAXint size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());// 让BufferPool给Batch分配一块内存空间,内存空间的来源来自于2部分:availableMemory和Deque// Deque内缓存了一块一块的、可复用的、固定大小的ByteBuffer,对于不足16k的消息可以直接复用// availableMemory是自由发挥的内存,可以针对“超大”消息定制ByteBuffer。如果availableMemory不够用,还可以“拆东墙补西墙”ByteBuffer buffer = free.allocate(size, maxTimeToBlock);// 再次锁住Deque(这回Batch不缺了,再次尝试写入)synchronized (dq) {if (closed)throw new IllegalStateException("Cannot send after the producer is closed.");// 第2次尝试:这回有了Batch,再次尝试将消息放入Deque中RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);// 如果第2次尝试写入的操作成功,就完成了// 这里利用了double-check思想if (appendResult != null) {// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...// 因为并发,会申请到多余的ByteBuffer。只有一块ByteBuffer用来构建RecordBatch并add到Deque。// 其余多申请的ByteBuffer,会根据ByteBuffer大小,决定将其交给Deque or availableMemoryfree.deallocate(buffer);// 返回消息追加操作的结果return appendResult;}// 如果是第一次写入消息,上面经历了double-check后,创建了空的Deque和ByteBuffer,// 现在要利用申请到的ByteBuffer,构建出RecordBatch,并将其添加到空的Deque中// 先将“停留在内存中的消息”包装成MemoryRecordsMemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);// 搞一个新的RecordBatch出来RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());// 将包装好的MemoryRecords放到RecordBatch中FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));// 消息已经放到Batch中了,就把这个Batch添加到Deque中dq.addLast(batch);// 将Batch也添加到IncompleteRecordBatches中,IncompleteRecordBatches表示当前还没有将Batch发送出去的列表incomplete.add(batch);// 返回消息追加操作的结果return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);}} finally {// 将当前正在执行append操作的线程数量,递减appendsInProgress.decrementAndGet();}
}

1.从CopyOnWriteMap中获取Deque队列

多线程并发的调用以下方法从CopyOnWriteMap中获取Deque,如果get不到就new新的Deque出来,这里可能会并发的new出好几个Deque出来。

/*** 根据TopicPartition从ConcurrentHashMap中取出对应的Deque队列* ConcurrentMap是基于CopyOnWriteMap实现的,适合读多写少的场景。update时会先copy出来一个副本,更新副本。* 好处:读写之间不会长时间的锁互斥,写的时候不会阻塞读。  坏处:copy副本会占用大量内存空间* Deque正是利用了读多写少的特点,因为一个Partition对应一个Deque,写操作本就是很少的。* 主要操作(大量的)还是从ConcurrentMap中将Deque读出来,后面频繁更新的就只是Deque了,跟ConcurrentMap没关系了*/
private Deque getOrCreateDeque(TopicPartition tp) {// 最核心的数据结构:ConcurrentMap>// 一个TopicPartition对应一个Deque,Deque中放的就是RecordBatch// 这里就是直接去volatile修饰的map中将Deque取出来Deque d = this.batches.get(tp);// 如果能拿到Deque,就直接返回if (d != null)return d;// 多个线程并发的创建出多个ArrayDeque,但是CopyOnWriteMap#putIfAbsent()方法是线程安全的,且没有key的时候才会put// 因此虽然这里new了很多Deque但是最终put到CopyOnWriteMap中的只有1个d = new ArrayDeque<>();// CopyOnWriteMap#putIfAbsent()方法被synchronized修饰,是线程安全的Deque previous = this.batches.putIfAbsent(tp, d);if (previous == null)return d;elsereturn previous;
}

各个Thread各自new了Deque后,就会将其put到CopyOnWriteMap中。由于CopyOnWriteMap的大部分方法都是由关键字synchronized修饰的线程安全的,因此同一时间只会有一个Thread争抢到锁,从而执行put操作。巧的是在put时还会判断是否containsKey,因此虽然有多个(因为并发而new出来的)Deque等着put到CopyOnWriteMap中,但是最终只会有1个Deque被put到CopyOnWriteMap中。

// 用关键字volatile修饰的原因:如果有人put了新元素,put完毕后,立马就能被读到
private volatile Map map;@Override
public V get(Object k) {// 直接从volatile修饰的map中读,如果有人put了新元素进来,volatile就能立马感知到return map.get(k);
}/*** 上一步在ConcurrentMap中没找到Deque,于是new了一个新的ArrayDeque* 让TopicPartition作为Key,让新ArrayDeque作为Value,put到ConcurrentMap中* 注意:该方法被synchronized修饰,是线程安全的!!!*/
@Override
public synchronized V putIfAbsent(K k, V v) {// 由于本方法是线程安全的,同一时间只会有一个线程争抢到锁、执行本方法。// 如果ConcurrentMap中没有才会put,而put也是被synchronized修饰的线程安全的方法。// 如果Thread-1先手抢到锁执行了put操作,随后Thread-1释放锁、Thread-2抢到锁,// 经过判断ConcurrentMap中已经有了对应的键值对,直接get就行if (!containsKey(k))return put(k, v);elsereturn get(k);
}@Override
public synchronized V put(K k, V v) {// 将内部的(用volatile修饰的)map,copy出来一份副本Map copy = new HashMap(this.map);// 基于副本执行put操作V prev = copy.put(k, v);// 将副本内的数据,写回到(volatile修饰的)map中,保证写完之后立马就能读this.map = Collections.unmodifiableMap(copy);return prev;
}

put操作是基于copy出来的副本进行的,这充分体现了COW思想。因为COW适合读多写少的场景,创建Deque并将其put到CopyOnWriteMap中就是写,获取Deque就是读。写的时候先copy一个副本,更新到副本中。好处:读写之间不会相互阻塞;坏处:内存占用大;

最终,copy副本会被写入到由关键字volatile修饰的map变量中。volatile可以保证内存可见性,一旦有人基于copy副本机制更新了这个引用变量对应的实际的map对象的地址,它立马就会被别人看到。所以在get()的时候完全不需要加锁。即使在同一时刻多线程并发读,也没有锁的阻塞。

2.尝试写入消息

由于消息是第一次写入,因此到目前为止,刚刚创建的Deque< Record>队列还是空的。因此锁住Deque后的tryAppend操作一定会失败,也就是返回null

private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, Deque deque) {// Deque中最后的RecordBatch,之所以取最后的RecordBatch是因为前面的RecordBatch都装满了,// 而最后的这个是新add的,还是新的RecordBatch last = deque.peekLast();if (last != null) {// 直接调用RecordBatch#tryAppend()方法,将消息写入到这个RecordBatch对应的ByteBuffer中FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());// 如果某个RecordBatch的剩余可用内存空间,不足以将手里的这条消息写入,这个future一定为nullif (future == null)// 将MemoryRecords关闭掉,这辆大巴车就得关门了last.records.close();else// 如果写入成功,返回结果信息return new RecordAppendResult(future, deque.size() > 1 || last.records.isFull(), false);}// 第一次写入消息,只有Deque,RecordBatch肯定没有。因此写入失败,返回nullreturn null;
}

外层append()方法在收到RecordAppendResult为null后,就得开始着手准备申请ByteBuffer以便能创建RecordBatch了。

// 确定即将要创建的RecordBatch size:默认的batchSize 16k和消息的size之间取MAX
int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
// 让BufferPool给Batch分配一块内存空间,内存空间的来源来自于2部分:availableMemory和Deque
// Deque内缓存了一块一块的、可复用的、固定大小的ByteBuffer,对于不足16k的消息可以直接复用
// availableMemory是自由发挥的内存,可以针对“超大”消息定制ByteBuffer。如果availableMemory不够用,还可以“拆东墙补西墙”
ByteBuffer buffer = free.allocate(size, maxTimeToBlock);

先计算出需要用到的RecordBatch的size,然后再去申请ByteBuffer。

2.1基于NIO ByteBuffer分配内存

BufferPool中负责管理ByteBuffer内存分配工作的,是由两部分组成:Deque< ByteBuffer >和availableMemory 。

/*** 为即将要被创建出来的RecordBatch分配内存空间*/
public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {if (size > this.totalMemory)throw new IllegalArgumentException("Attempt to allocate " + size+ " bytes, but there is a hard limit of "+ this.totalMemory+ " on memory allocations.");// 使用ReentrantLock锁住以下代码块,选它是因为它更灵活的加锁、释放锁this.lock.lock();try {// 如果我需要的内存大小正好等于BufferPool内“中规中矩”的RecordBatch的大小(poolableSize也就是默认的batch size,16k),并且BufferPool内的Deque不为空// 这一步是为了复用BufferPool内的内存块,第一次进来肯定全是空的,没有可复用的ByteBufferif (size == poolableSize && !this.free.isEmpty())// 将Deque队列中的first取出来复用即可return this.free.pollFirst();// 能往下走,说明Deque提供的的可复用的内存满足不了需求,因此必须找availableMemory申请// 计算出Deque的大小 = Deque内ByteBuffer的个数 * “中规中矩”的16kint freeListSize = this.free.size() * this.poolableSize;// 如果创建这个RecordBatch所需的内存 < “自由创建”的内存 + “中规中矩”复用的内存if (this.availableMemory + freeListSize >= size) {// “拆东墙补西墙”,Deque就是东墙,availableMemory就是西墙// 这里是说availableMemory不够用,就从Deque中一块一块的拆,直到availableMemory满足使用需求freeUp(size);// 从“剩余可用的内存”中扣除本次创建RecordBatch要用到的内存大小this.availableMemory -= size;// 解除锁lock.unlock();// 本次内存申请已成功,内存来源于availableMemoryreturn ByteBuffer.allocate(size);} else {// 可用内存严重不足,不能再允许申请ByteBuffer了(需要16k,但此时就剩15.9k可用了)// 其他RecordBatch因为发送成功而空闲出来的ByteBuffer的大小int accumulated = 0;ByteBuffer buffer = null;// 获取到正在使用的ReentrantLock的ConditionCondition moreMemory = this.lock.newCondition();// 剩余(因等待可用内存而)阻塞的时间long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);// 将这个Condition添加到Deque中,排队等待“可用内存恢复可用容量”this.waiters.addLast(moreMemory);// 等待RecordBatch因为发送成功而空闲出来的ByteBuffer恢复可用内存空间while (accumulated < size) {long startWaitNs = time.nanoseconds();long timeNs;boolean waitingTimeElapsed;try {// 调用Condition.await()方法,让当前线程进入休眠等待状态waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);} catch (InterruptedException e) {// 出现异常,就将本次等待的Condition从等待队列中移除this.waiters.remove(moreMemory);throw e;} finally {long endWaitNs = time.nanoseconds();timeNs = Math.max(0L, endWaitNs - startWaitNs);this.waitTime.record(timeNs, time.milliseconds());}// 如果某个Condition在“有限的规定时间”内没等来可用内存,那就只能放弃了if (waitingTimeElapsed) {// 将这个Condition从Deque等待队列中移除this.waiters.remove(moreMemory);// 抛出超时异常throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");}// 刷新“剩余阻塞等待时间”remainingTimeToBlockNs -= timeNs;if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {// 把Deque中的第一个ByteBuffer弹出来buffer = this.free.pollFirst();// 可用内存容量“恢复”了size大小accumulated = size;} else {freeUp(size - accumulated);int got = (int) Math.min(size - accumulated, this.availableMemory);this.availableMemory -= got;accumulated += got;}}// 一旦RecordBatch因为发送成功而空闲出来的ByteBuffer,大于要申请的内存大小,// 比如等了一会ByteBuffer空闲了32k出来,此时我只需要申请16k,就会跳出while循环// 将Deque中的第一个Condition移除掉,唤醒第一个等待的人,开始为他分配ByteBufferCondition removed = this.waiters.removeFirst();if (removed != moreMemory)throw new IllegalStateException("Wrong condition: this shouldn't happen.");if (this.availableMemory > 0 || !this.free.isEmpty()) {if (!this.waiters.isEmpty())// 唤醒因调用Condition.await()方法而进入休眠等待状态的线程this.waiters.peekFirst().signal();}// 释放锁lock.unlock();// 直接返回ByteBufferif (buffer == null)// 分配ByteBufferreturn ByteBuffer.allocate(size);elsereturn buffer;}} finally {if (lock.isHeldByCurrentThread())lock.unlock();}

**Deque< ByteBuffer >可以理解为“中规中矩”,它缓存了一堆大小固定(等于batch size的默认 16k)、可以复用的ByteBuffer **。如果接下来需要创建的这个RecordBatch要求的内存大小不超过默认的16k,那么完全可以复用Deque< ByteBuffer >中的First ByteBuffer。

availableMemory 可以理解为“自由发挥”,当Deque< ByteBuffer >中没有可复用的ByteBuffer,就得需要让availableMemory 来“自由、灵活的”分配内存。availableMemory的初始值在RecordAccumulator构造时就已经指定,默认大小为:32M

当然,第一次进来,这个Deque< ByteBuffer >是空的。那就得找availableMemory(初始值为默认的32M)要内存了!

availableMemory负责分配内存时,也得结合Deque。因为availableMemory毕竟空间有限,万一不够用呢?所以,但凡不是简单的内存复用就能解决的,都得两者结合。

一旦availableMemory自己手里的内存也满足不了使用需求,那就得找Deque帮忙,“拆东墙补西墙”,(while循环中)将Deque中复用的ByteBuffer,从后往前一块一块的补充给availableMemory,直到availableMemory的内存空间恢复到满足使用需求为止!

/*** 将Deque中的内存块,从后往前一块一块的补充给availableMemory,* 这样一来这条“特殊大小”的消息就能利用availableMemory创建RecordBatch了。*/
private void freeUp(int size) {// 如果“剩余可以用来自由发挥”的内存不足以创建这个RecordBatch,// 同时"缓存好用来复用的中规中矩"的Deque还有足够的内存空间(n * 16k),// 那就从Deque中,从最后一个ByteBuffer开始,“拆东墙补西墙”的方式一块一块的将ByteBuffer补充给“剩余可以用来自由发挥”的内存while (!this.free.isEmpty() && this.availableMemory < size)this.availableMemory += this.free.pollLast().capacity();
}

当然了,ByteBuffer的分配方法不是线程安全的,多线程并发执行时,会“额外”的申请多余而不用的ByteBuffer。例如3个Thread同一时间各自分别申请了一个16k大小的ByteBuffer。没关系,在外层消息写入经历double-check的tryAppend时,会通过关键字synchronized锁住Deque。同一时间只会有一个Thread,允许它将自己申请到的ByteBuffer用于构建RecordBatch并将其添加到Deque< RecordBatch >中。

其他线程虽然手里也攥着各自申请到的ByteBuffer,但是在tryAppend时,由于Deque< RecordBatch >中的最后一个RecordBatch就是新创建的。因此这些“多余”的ByteBuffer就会被释放掉。根据ByteBuffer的大小,决定将其交给Deque or availableMemory。

这一套处理逻辑,正式基于double-check模式保证的!

2.2尝试写入消息

经历了double-check以后,Deque< RecordBatch >有了,用来构建ByteBuffer也有了,这回可以往RecordBatch中写入消息了。

// 如果是第一次写入消息,上面经历了double-check后,创建了空的Deque和ByteBuffer,
// 现在要利用申请到的ByteBuffer,构建出RecordBatch,并将其添加到空的Deque中
// 先将“停留在内存中的消息”包装成MemoryRecords
MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
// 搞一个新的RecordBatch出来
RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
// 将包装好的MemoryRecords放到RecordBatch中
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));// 消息已经放到Batch中了,就把这个Batch添加到Deque中
dq.addLast(batch);
// 将Batch也添加到IncompleteRecordBatches中,IncompleteRecordBatches表示当前还没有将Batch发送出去的列表
incomplete.add(batch);

如果是第一次写入的消息,首先会将消息包装成MemoryRecords,用MemoryRecords创建RecordBatch。用RecordBatch提供的tryAppend()方法将消息“放到”RecordBatch中。

public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) {// 如果这个RecordBatch已经没有足够的空间来容纳这个MemoryRecords了,那就返回null。// 外层判空方法收到null,会抛出NullPointerExceptionif (!this.records.hasRoomFor(key, value)) {return null;} else {// 调用MemoryRecords#append()方法通过Compressor将消息放到RecordBatch中,得到crc值long checksum = this.records.append(offsetCounter++, timestamp, key, value);// 当前RecordBatch中最大的那条消息的sizethis.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));// 最后一次写入时间设为:nowthis.lastAppendTime = now;FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,timestamp, checksum,key == null ? -1 : key.length,value == null ? -1 : value.length);if (callback != null)thunks.add(new Thunk(callback, future));this.recordCount++;return future;}
}public boolean hasRoomFor(byte[] key, byte[] value) {if (!this.writable)return false;// Compressor组件已经写了多少条消息了return this.compressor.numRecordsWritten() == 0 ?this.initialCapacity >= Records.LOG_OVERHEAD + Record.recordSize(key, value) :// ByteBuffer的大小 >= Compressor组件已经写入进去的消息字节数量(估算值) + 消息的大小(消息头 + 消息本身大小)// 假设ByteBuffer 16k,Compressor组件已经写了15.9k,目前还有一条消息 1k,此时一定为false,// 说明当前RecordBatch的剩余可用内存空间已经不足以写入这一条消息了,就得申请新的ByteBuffer构建新的RecordBatch了this.writeLimit >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD + Record.recordSize(key, value);
}

3.底层写入

不管是第一次写入消息,还是第N次写入消息,还是在double-check的哪个阶段写入消息,最终都得通过RecordBatch#tryAppend()方法将消息按照Kafka约定的二进制协议,通过Compressor组件将消息以流的形式写入到RecordBatch中。

如果是第一次写入消息,经过double-check后,会先后获取到Deque< RecordBatch >和ByteBuffer,因为double-check中的两次tryAppend是RecordAccumulator执行的操作,它首先会将Deque< RecordBatch >中的最后的那个RecordBatch拿出来,通过RecordBatch#tryAppend()方法将消息“放到”RecordBatch中。

但是对于第一次写入,只是申请到了ByteBuffer,并没有构建相应的RecordBatch,因此double-check中的2次尝试写入都会失败。这样一来,就会将“停留在内存的消息”包装成MemoryRecords,并利用MemoryRecords实例化RecordBatch对象。最后通过RecordBatch#tryAppend()方法完成消息写入的任务。对了,这个RecordBatch最后还得添加到Deque< RecordBatch >中,因为后续的消息得往里写。

如果不是第1次写入,那就再double-check模式中RecordAccumulator提供的2次尝试写入操作中,取Deque的最后一个RecordBatch,将消息按照二进制协议写入。

3.1 如果RecordBatch剩余可用内存空间充足,可以写入

不管是经历了九九八十一难中的哪一难,只要消息顺利抵达RecordBatch,就得先判断一下当前RecordBatch剩余的可用内存是否还能将眼前的这条消息写入其中。

/*** 不管是第一次写入,还是第N次写入;不管是经历double-check的哪个tryAppend,* 最终都得调用RecordBatch#tryAppend()方法,将消息“放到”RecordBatch中。* 写入之前要经过判断,看这个RecordBatch是否还能足够的剩余空间写下这一条消息。如果有,就通过Compressor组件写入;否则,就return null*/
public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, long now) {// 如果这个RecordBatch已经没有足够的空间来容纳这个MemoryRecords了,那就返回null。// 外层判空方法收到null,会抛出NullPointerExceptionif (!this.records.hasRoomFor(key, value)) {return null;} else {// 核心写入操作:调用MemoryRecords#append()方法通过Compressor将消息放到RecordBatch中,得到crc值long checksum = this.records.append(offsetCounter++, timestamp, key, value);// 当前RecordBatch中最大的那条消息的sizethis.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));// 最后一次写入时间设为:nowthis.lastAppendTime = now;FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,timestamp, checksum,key == null ? -1 : key.length,value == null ? -1 : value.length);if (callback != null)thunks.add(new Thunk(callback, future));this.recordCount++;return future;}
}

如果是第一次写入的话,那么这个RecordBatch是刚刚才添加到Deque< RecordBatch >中的,RecordBatch的剩余可用空间绝对能写下这条消息,那就让MemoryRecords将消息按照约定好的二进制协议进行转换,完事通过Compressor组件将消息写入到RecordBatch中

/*** MemoryRecords先将消息按照Kafka约定好的二进制协议,将消息进行转换。* 最后,通过Compressor组件将消息写入到RecordBatch中*/
public long append(long offset, long timestamp, byte[] key, byte[] value) {if (!writable)throw new IllegalStateException("Memory records is not writable");// 计算出这个MemoryRecords的大小int size = Record.recordSize(key, value);// 设置消息对应的offset(利用Java的DataOutputStream实现)compressor.putLong(offset);// 设置消息大小compressor.putInt(size);// 计算出crc的值long crc = compressor.putRecord(timestamp, key, value);// 通过Compressor完成写入操作compressor.recordWritten(size + Records.LOG_OVERHEAD);return crc;
}

通过Compressor组件的初始化可以看出,Compressor.putxxx方法实际上是借助Java BIO实现的。这个appendStream就是Java里的DataOutputStream

public Compressor(ByteBuffer buffer, CompressionType type) {// 省略部分代码...// 由ByteBufferOutputStream包裹着ByteBuffer,也就是持有了一个对ByteBuffer的输出流bufferStream = new ByteBufferOutputStream(buffer);// (视压缩情况,决定如何包裹)再让DataOutputStream包裹着ByteBufferOutputStreamappendStream = wrapForOutput(bufferStream, type, COMPRESSION_DEFAULT_BUFFER_SIZE);
}

数据由Java的DataOutputStream接入,当执行DataOutputStream#writeLong()等方法时会将数据转换成二进制的字节数组。(如果开启了压缩,字节数组先进压缩流的缓冲区,按照相应的压缩算法压缩后)然后再将其写入到ByteBufferOutputStream中

3.2 如果RecordBatch剩余可用内存空间不足

如果RecordBatch判断当前剩余可用的内存空间不足以写下这一条消息,比如经过MemoryRecords的判断,发现当前ByteBuffer的大小为16k,估算了一下已经Compressor组件已经写了15.9k了,现在还有一条1K大小的消息待写入,写不进了。那么就会返回null

public boolean hasRoomFor(byte[] key, byte[] value) {if (!this.writable)return false;// Compressor组件已经写了多少条消息了return this.compressor.numRecordsWritten() == 0 ?this.initialCapacity >= Records.LOG_OVERHEAD + Record.recordSize(key, value) :// ByteBuffer的大小 >= Compressor组件已经写入进去的消息字节数量(估算值) + 消息的大小(消息头 + 消息本身大小)// 假设ByteBuffer 16k,Compressor组件已经写了15.9k,目前还有一条消息 1k,此时一定为false,// 说明当前RecordBatch的剩余可用内存空间已经不足以写入这一条消息了,就得申请新的ByteBuffer构建新的RecordBatch了this.writeLimit >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD + Record.recordSize(key, value);
}

RecordAccumulator收到null后,就得调用这个已经濒临写满状态的RecordBatch对应的MemoryRecords#close()方法,将当前RecordBatch对应的ByteBuffer的Compressor组件关掉,将针对ByteBuffer的I/O流也关掉,将当前的MemoryRecords里的“写标志位”设为false:不能再写了。然后第1次尝试的结果返就是null

// 直接调用RecordBatch#tryAppend()方法,将消息写入到这个RecordBatch对应的ByteBuffer中
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
// 如果某个RecordBatch的剩余可用内存空间,不足以将手里的这条消息写入,这个future一定为null
if (future == null)// 剩余内存空间不够写这条消息了,就将MemoryRecords关闭掉,这辆大巴车就得关门了last.records.close();

由于第1次尝试的写入结果得到的就是个null,所以继续往下走,申请新的ByteBuffer,使用synchronized锁住Deque后再一次的尝试写入,重复double-check的写入流程…

相关内容

热门资讯

美国2年期国债收益率上涨15个... 原标题:美国2年期国债收益率上涨15个基点 美国2年期国债收益率上涨15个基...
汽车油箱结构是什么(汽车油箱结... 本篇文章极速百科给大家谈谈汽车油箱结构是什么,以及汽车油箱结构原理图解对应的知识点,希望对各位有所帮...
嵌入式 ADC使用手册完整版 ... 嵌入式 ADC使用手册完整版 (188977万字)💜&#...
重大消息战皇大厅开挂是真的吗... 您好:战皇大厅这款游戏可以开挂,确实是有挂的,需要了解加客服微信【8435338】很多玩家在这款游戏...
盘点十款牵手跑胡子为什么一直... 您好:牵手跑胡子这款游戏可以开挂,确实是有挂的,需要了解加客服微信【8435338】很多玩家在这款游...
senator香烟多少一盒(s... 今天给各位分享senator香烟多少一盒的知识,其中也会对sevebstars香烟进行解释,如果能碰...
终于懂了新荣耀斗牛真的有挂吗... 您好:新荣耀斗牛这款游戏可以开挂,确实是有挂的,需要了解加客服微信8435338】很多玩家在这款游戏...
盘点十款明星麻将到底有没有挂... 您好:明星麻将这款游戏可以开挂,确实是有挂的,需要了解加客服微信【5848499】很多玩家在这款游戏...
总结文章“新道游棋牌有透视挂吗... 您好:新道游棋牌这款游戏可以开挂,确实是有挂的,需要了解加客服微信【7682267】很多玩家在这款游...
终于懂了手机麻将到底有没有挂... 您好:手机麻将这款游戏可以开挂,确实是有挂的,需要了解加客服微信【8435338】很多玩家在这款游戏...