生产者处理响应
前面我们分析了producer发送请求的过程,现在分析发送请求后,怎么处理响应的,producer的响应处理也是在Sender线程中处理的,再看Selector的pollSelectionKeys方法
class Selector{
void pollSelectionKeys(Set<SelectionKey> selectionKeys,
boolean isImmediatelyConnected,
long currentTimeNanos) {
/**
* 遍历所有的key
*/
for (SelectionKey key : determineHandlingOrder(selectionKeys)) {
/**
* 获取对应的channel
*/
KafkaChannel channel = channel(key);
long channelStartTimeNanos = recordTimePerConnection ? time.nanoseconds() : 0;
// register all per-connection metrics at once
sensors.maybeRegisterConnectionMetrics(channel.id());
if (idleExpiryManager != null)
idleExpiryManager.update(channel.id(), currentTimeNanos);
boolean sendFailed = false;
try {
.............
/**
*读取响应的数据
*/
attemptRead(key, channel);
....................
}
/* cancel any defunct sockets */
if (!key.isValid())
close(channel, CloseMode.GRACEFUL);
} catch (Exception e) {
...........
} finally {
maybeRecordTimePerConnection(channel, channelStartTimeNanos);
}
}
private void attemptRead(SelectionKey key, KafkaChannel channel) throws IOException {
//if channel is ready and has bytes to read from socket or buffer, and has no
//previous receive(s) already staged or otherwise in progress then read from it
if (channel.ready() && (key.isReadable() || channel.hasBytesBuffered()) && !hasStagedReceive(channel)
&& !explicitlyMutedChannels.contains(channel)) {
NetworkReceive networkReceive;
while ((networkReceive = channel.read()) != null) {
madeReadProgressLastPoll = true;
/**
* 有Broker返回的响应,添加
*/
addToStagedReceives(channel, networkReceive);
}
if (channel.isMute()) {
outOfMemory = true; //channel has muted itself due to memory pressure.
} else {
madeReadProgressLastPoll = true;
}
}
}
private void addToStagedReceives(KafkaChannel channel, NetworkReceive receive) {
if (!stagedReceives.containsKey(channel))
stagedReceives.put(channel, new ArrayDeque<NetworkReceive>());
Deque<NetworkReceive> deque = stagedReceives.get(channel);
//往队列里增加要处理的响应
deque.add(receive);
}
}
可以看到,响应被添加到stagedReceives对象中,stagedReceives的类型是Map<KafkaChannel, Deque>,一个连接对应一个队列,在上面看到Selector类的poll方法中有个addToCompletedReceives方法,这个方法就是读取stagedReceives中数据进行处理
class Selector{
/**
* 客户端:在响应队列中读取响应
* 服务端:在请求队列中读取请求
*/
private void addToCompletedReceives() {
if (!this.stagedReceives.isEmpty()) {
Iterator<Map.Entry<KafkaChannel, Deque<NetworkReceive>>> iter = this.stagedReceives.entrySet().iterator();
//遍历Map
while (iter.hasNext()) {
Map.Entry<KafkaChannel, Deque<NetworkReceive>> entry = iter.next();
KafkaChannel channel = entry.getKey();
if (!explicitlyMutedChannels.contains(channel)) {
Deque<NetworkReceive> deque = entry.getValue();
//一个连接对应一个deque
addToCompletedReceives(channel, deque);
if (deque.isEmpty())
iter.remove();
}
}
}
}
private void addToCompletedReceives(KafkaChannel channel, Deque<NetworkReceive> stagedDeque) {
//取队列的第一条数据
NetworkReceive networkReceive = stagedDeque.poll();
/**
* 响应存储到已接收到的响应list
*/
this.completedReceives.add(networkReceive);
this.sensors.recordBytesReceived(channel.id(), networkReceive.payload().limit());
}
}
从上面可以看到,在每个队列中取出一条数据添加到了completedReceives列表中,那completedReceives列表中的响应什么时候处理的,再看NetworkClient的poll方法
class NetworkClient{
public List<ClientResponse> poll(long timeout, long now) {
if (!abortedSends.isEmpty()) {
// If there are aborted sends because of unsupported version exceptions or disconnects,
// handle them immediately without waiting for Selector#poll.
List<ClientResponse> responses = new ArrayList<>();
handleAbortedSends(responses);
completeResponses(responses);
return responses;
}
/**
* 1.封装元数据请求
*/
long metadataTimeout = metadataUpdater.maybeUpdate(now);
try {
/**
* 2.发送网路请求
*/
this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
}
// process completed actions
long updatedNow = this.time.milliseconds();
List<ClientResponse> responses = new ArrayList<>();
handleCompletedSends(responses, updatedNow);
/**
* 处理broker返回的响应,包含元数据的响应
*/
handleCompletedReceives(responses, updatedNow);
..........
/**
* 对于已接收的响应,调用回调函数处理
*/
completeResponses(responses);
return responses;
}
private void handleCompletedReceives(List<ClientResponse> responses, long now) {
//遍历接受的响应
for (NetworkReceive receive : this.selector.completedReceives()) {
String source = receive.source();
/**
* inFlightRequests:表示已经发送,但还没有返回响应的请求
* 移除已经接收到响应的请求
*/
InFlightRequest req = inFlightRequests.completeNext(source);
Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header,
throttleTimeSensor, now);
if (log.isTraceEnabled()) {
log.trace("Completed receive from node {} for {} with correlation id {}, received {}", req.destination,
req.header.apiKey(), req.header.correlationId(), responseStruct);
}
// If the received response includes a throttle delay, throttle the connection.
/**
* broker返回的响应,有很多种类型
*/
AbstractResponse body = AbstractResponse.parseResponse(req.header.apiKey(), responseStruct);
maybeThrottle(body, req.header.apiVersion(), req.destination, now);
if (req.isInternalRequest && body instanceof MetadataResponse)
/**
* 处理元数据的响应
*/
metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body);
else if (req.isInternalRequest && body instanceof ApiVersionsResponse)
handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) body);
else
/**
* 响应存储到list中
*/
responses.add(req.completed(body, now));
}
}
}
从上面可以看到,在completedReceives列表中读取数据后,就移除了inFlightRequests对应的请求,然后把响应存储到了List responses中,现在看下Response怎么处理的,在上面poll方法中我们看到执行了 completeResponses(responses)方法,就是处理Reponse的
private void completeResponses(List<ClientResponse> responses) {
for (ClientResponse response : responses) {
try {
response.onComplete();
} catch (Exception e) {
log.error("Uncaught error in request completion:", e);
}
}
}
public void onComplete() {
if (callback != null)
callback.onComplete(this);
}
从上面看出,针对每个响应调用了我们之前请求设置的回调函数,整个流程如下图所示
生产者对消息的返回结果处理
前面我们看到,producer对响应的处理是调用了封装请求时设置的回调函数,继续看执行回调函数做了什么操作
public class Sender implements Runnable {
private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
.................
for (ProducerBatch batch : batches) {
if (batch.magic() < minUsedMagic)
minUsedMagic = batch.magic();
}
for (ProducerBatch batch : batches) {
.........................
/**
* 设置请求的回调函数
*/
RequestCompletionHandler callback = new RequestCompletionHandler() {
public void onComplete(ClientResponse response) {
handleProduceResponse(response, recordsByPartition, time.milliseconds());
}
};
...............
}
/**
* 回调函数处理响应
*/
private void handleProduceResponse(ClientResponse response, Map<TopicPartition, ProducerBatch> batches, long now) {
..........
else {
if (response.hasResponse()) {
//调用回调函数
ProduceResponse produceResponse = (ProduceResponse) response.responseBody();
for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) {
TopicPartition tp = entry.getKey();
ProduceResponse.PartitionResponse partResp = entry.getValue();
ProducerBatch batch = batches.get(tp);
completeBatch(batch, partResp, correlationId, now, receivedTimeMs + produceResponse.throttleTimeMs());
}
this.sensors.recordLatency(response.destination(), response.requestLatencyMs());
} else {
...........
}
}
}
private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response) {
..........
/**
* RecoderAccumulator释放内存
*/
if (batch.done(response.baseOffset, response.logAppendTime, null))
this.accumulator.deallocate(batch);
}
}
看ProducerBatch的done方法
public final class ProducerBatch {
public boolean done(long baseOffset, long logAppendTime, RuntimeException exception) {
........
/**
* 调用回调函数
*/
completeFutureAndFireCallbacks(baseOffset, logAppendTime, exception);
return true;
}
private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) {
// Set the future before invoking the callbacks as we rely on its state for the `onCompletion` call
produceFuture.set(baseOffset, logAppendTime, exception);
/**
* 发送数据时,一条消息代表一个thunk
*
*/
// execute callbacks
for (Thunk thunk : thunks) {
try {
if (exception == null) {
RecordMetadata metadata = thunk.future.value();
/**
* 调用发送出去消息的回调函数
*/
if (thunk.callback != null)
thunk.callback.onCompletion(metadata, null);
} else {
if (thunk.callback != null)
thunk.callback.onCompletion(null, exception);
}
} catch (Exception e) {
log.error("Error executing user-provided callback on message for topic-partition '{}'", topicPartition, e);
}
}
produceFuture.done();
}
}
可以看到,它对每条消息都执行了回调函数,并且会释放对应batch的内存