org.apache.kafka.clients.producer.KafkaProducer#doSend
try{
//第一段:
                clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
//第二段:
                RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                    serializedValue, headers, interceptCallback, remainingWaitMs);
}catch (ApiException e) {
            log.debug(“Exception occurred during message send:”, e);
            if (callback != null)
                callback.onCompletion(null, e);
            this.errors.record();
            this.interceptors.onSendError(record, tp, e);
            return new FutureFailure(e);
}
其中,第二段:
org.apache.kafka.clients.producer.internals.RecordAccumulator#append{
    buffer = free.allocate(size, maxTimeToBlock);
}
其中关键行buffer = free.allocate(size, maxTimeToBlock);会抛出异常:
                        if (waitingTimeElapsed) {
                            throw new TimeoutException(“Failed to allocate memory within the configured max blocking time ” + maxTimeToBlockMs + ” ms.”);
                        }
TimeoutException属于KafkaException中的一种: