org.apache.kafka.clients.producer.KafkaProducer#doSend
try{
//第一段:
clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
//第二段:
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs);
}catch (ApiException e) {
log.debug(“Exception occurred during message send:”, e);
if (callback != null)
callback.onCompletion(null, e);
this.errors.record();
this.interceptors.onSendError(record, tp, e);
return new FutureFailure(e);
}
其中,第二段:
org.apache.kafka.clients.producer.internals.RecordAccumulator#append{
buffer = free.allocate(size, maxTimeToBlock);
}
其中关键行buffer = free.allocate(size, maxTimeToBlock);会抛出异常:
if (waitingTimeElapsed) {
throw new TimeoutException(“Failed to allocate memory within the configured max blocking time ” + maxTimeToBlockMs + ” ms.”);
}
TimeoutException属于KafkaException中的一种: