淘先锋技术网

首页 1 2 3 4 5 6 7

生产者处理响应

前面我们分析了producer发送请求的过程,现在分析发送请求后,怎么处理响应的,producer的响应处理也是在Sender线程中处理的,再看Selector的pollSelectionKeys方法

class Selector{
void pollSelectionKeys(Set<SelectionKey> selectionKeys,
                           boolean isImmediatelyConnected,
                           long currentTimeNanos) {
        /**
         * 遍历所有的key
         */
        for (SelectionKey key : determineHandlingOrder(selectionKeys)) {
            /**
             * 获取对应的channel
             */
            KafkaChannel channel = channel(key);
            long channelStartTimeNanos = recordTimePerConnection ? time.nanoseconds() : 0;

            // register all per-connection metrics at once
            sensors.maybeRegisterConnectionMetrics(channel.id());
            if (idleExpiryManager != null)
                idleExpiryManager.update(channel.id(), currentTimeNanos);

            boolean sendFailed = false;
            try {

               .............
                /**
                 *读取响应的数据
                 */
                attemptRead(key, channel);
                   ....................
                }

                /* cancel any defunct sockets */
                if (!key.isValid())
                    close(channel, CloseMode.GRACEFUL);

            } catch (Exception e) {
               ...........
            } finally {
                maybeRecordTimePerConnection(channel, channelStartTimeNanos);
            }
    }
    
    private void attemptRead(SelectionKey key, KafkaChannel channel) throws IOException {
        //if channel is ready and has bytes to read from socket or buffer, and has no
        //previous receive(s) already staged or otherwise in progress then read from it
        if (channel.ready() && (key.isReadable() || channel.hasBytesBuffered()) && !hasStagedReceive(channel)
            && !explicitlyMutedChannels.contains(channel)) {
            NetworkReceive networkReceive;
            while ((networkReceive = channel.read()) != null) {
                madeReadProgressLastPoll = true;
                /**
                 * 有Broker返回的响应,添加
                 */
                addToStagedReceives(channel, networkReceive);
            }
            if (channel.isMute()) {
                outOfMemory = true; //channel has muted itself due to memory pressure.
            } else {
                madeReadProgressLastPoll = true;
            }
        }
    }
    
    private void  addToStagedReceives(KafkaChannel channel, NetworkReceive receive) {
        if (!stagedReceives.containsKey(channel))
            stagedReceives.put(channel, new ArrayDeque<NetworkReceive>());

        Deque<NetworkReceive> deque = stagedReceives.get(channel);
        //往队列里增加要处理的响应
        deque.add(receive);
    }
 }    
    

可以看到,响应被添加到stagedReceives对象中,stagedReceives的类型是Map<KafkaChannel, Deque>,一个连接对应一个队列,在上面看到Selector类的poll方法中有个addToCompletedReceives方法,这个方法就是读取stagedReceives中数据进行处理

class Selector{
/**
     * 客户端:在响应队列中读取响应
     * 服务端:在请求队列中读取请求
     */
    private void addToCompletedReceives() {
        if (!this.stagedReceives.isEmpty()) {
            Iterator<Map.Entry<KafkaChannel, Deque<NetworkReceive>>> iter = this.stagedReceives.entrySet().iterator();
            //遍历Map
            while (iter.hasNext()) {
                Map.Entry<KafkaChannel, Deque<NetworkReceive>> entry = iter.next();
                KafkaChannel channel = entry.getKey();
                if (!explicitlyMutedChannels.contains(channel)) {
                    Deque<NetworkReceive> deque = entry.getValue();
                    //一个连接对应一个deque
                    addToCompletedReceives(channel, deque);
                    if (deque.isEmpty())
                        iter.remove();
                }
            }
        }
    }
    
 private void addToCompletedReceives(KafkaChannel channel, Deque<NetworkReceive> stagedDeque) {
        //取队列的第一条数据
        NetworkReceive networkReceive = stagedDeque.poll();
        /**
         * 响应存储到已接收到的响应list
         */
        this.completedReceives.add(networkReceive);
        this.sensors.recordBytesReceived(channel.id(), networkReceive.payload().limit());
    }    
}    

从上面可以看到,在每个队列中取出一条数据添加到了completedReceives列表中,那completedReceives列表中的响应什么时候处理的,再看NetworkClient的poll方法

class NetworkClient{
public List<ClientResponse> poll(long timeout, long now) {
        if (!abortedSends.isEmpty()) {
            // If there are aborted sends because of unsupported version exceptions or disconnects,
            // handle them immediately without waiting for Selector#poll.
            List<ClientResponse> responses = new ArrayList<>();
            handleAbortedSends(responses);
            completeResponses(responses);
            return responses;
        }
        /**
         * 1.封装元数据请求
         */
        long metadataTimeout = metadataUpdater.maybeUpdate(now);
        try {
            /**
             * 2.发送网路请求
             */
            this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));
        } catch (IOException e) {
            log.error("Unexpected error during I/O", e);
        }

        // process completed actions
        long updatedNow = this.time.milliseconds();
        List<ClientResponse> responses = new ArrayList<>();
        handleCompletedSends(responses, updatedNow);
        /**
         * 处理broker返回的响应,包含元数据的响应
         */
        handleCompletedReceives(responses, updatedNow);
        ..........
        /**
         * 对于已接收的响应,调用回调函数处理
         */
        completeResponses(responses);

        return responses;
    }
    
 private void handleCompletedReceives(List<ClientResponse> responses, long now) {
        //遍历接受的响应
        for (NetworkReceive receive : this.selector.completedReceives()) {
            String source = receive.source();
            /**
             * inFlightRequests:表示已经发送,但还没有返回响应的请求
             * 移除已经接收到响应的请求
             */
            InFlightRequest req = inFlightRequests.completeNext(source);
            Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header,
                throttleTimeSensor, now);
            if (log.isTraceEnabled()) {
                log.trace("Completed receive from node {} for {} with correlation id {}, received {}", req.destination,
                    req.header.apiKey(), req.header.correlationId(), responseStruct);
            }
            // If the received response includes a throttle delay, throttle the connection.
            /**
             * broker返回的响应,有很多种类型
             */
            AbstractResponse body = AbstractResponse.parseResponse(req.header.apiKey(), responseStruct);
            maybeThrottle(body, req.header.apiVersion(), req.destination, now);
            if (req.isInternalRequest && body instanceof MetadataResponse)
            /**
             * 处理元数据的响应
             */
                metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body);
            else if (req.isInternalRequest && body instanceof ApiVersionsResponse)
                handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) body);
            else
            /**
             * 响应存储到list中
             */
                responses.add(req.completed(body, now));
        }
    }
    
}

从上面可以看到,在completedReceives列表中读取数据后,就移除了inFlightRequests对应的请求,然后把响应存储到了List responses中,现在看下Response怎么处理的,在上面poll方法中我们看到执行了 completeResponses(responses)方法,就是处理Reponse的

 private void completeResponses(List<ClientResponse> responses) {
        for (ClientResponse response : responses) {
            try {
                response.onComplete();
            } catch (Exception e) {
                log.error("Uncaught error in request completion:", e);
            }
        }
    }
    
    public void onComplete() {
        if (callback != null)
            callback.onComplete(this);
    }

从上面看出,针对每个响应调用了我们之前请求设置的回调函数,整个流程如下图所示
在这里插入图片描述

生产者对消息的返回结果处理

前面我们看到,producer对响应的处理是调用了封装请求时设置的回调函数,继续看执行回调函数做了什么操作

public class Sender implements Runnable {
private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
        .................
        for (ProducerBatch batch : batches) {
            if (batch.magic() < minUsedMagic)
                minUsedMagic = batch.magic();
        }

        for (ProducerBatch batch : batches) {
           .........................
        /**
         * 设置请求的回调函数
         */
        RequestCompletionHandler callback = new RequestCompletionHandler() {
            public void onComplete(ClientResponse response) {
                handleProduceResponse(response, recordsByPartition, time.milliseconds());
            }
        };
        ...............
    }
    
    /**
     * 回调函数处理响应
     */
    private void handleProduceResponse(ClientResponse response, Map<TopicPartition, ProducerBatch> batches, long now) {
        ..........
        else {
            if (response.hasResponse()) {
                //调用回调函数
                ProduceResponse produceResponse = (ProduceResponse) response.responseBody();
                for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) {
                    TopicPartition tp = entry.getKey();
                    ProduceResponse.PartitionResponse partResp = entry.getValue();
                    ProducerBatch batch = batches.get(tp);
                    completeBatch(batch, partResp, correlationId, now, receivedTimeMs + produceResponse.throttleTimeMs());
                }
                this.sensors.recordLatency(response.destination(), response.requestLatencyMs());
            } else {
                ...........
            }
        }
    }
    
    private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response) {
       ..........
        /**
         * RecoderAccumulator释放内存
          */
        if (batch.done(response.baseOffset, response.logAppendTime, null))
            this.accumulator.deallocate(batch);
    }
 }    

看ProducerBatch的done方法

public final class ProducerBatch {
 public boolean done(long baseOffset, long logAppendTime, RuntimeException exception) {
       ........
        /**
         * 调用回调函数
          */
        completeFutureAndFireCallbacks(baseOffset, logAppendTime, exception);
        return true;
    }
private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) {
        // Set the future before invoking the callbacks as we rely on its state for the `onCompletion` call
        produceFuture.set(baseOffset, logAppendTime, exception);
        /**
         * 发送数据时,一条消息代表一个thunk
         *
         */
        // execute callbacks
        for (Thunk thunk : thunks) {
            try {
                if (exception == null) {
                    RecordMetadata metadata = thunk.future.value();
                    /**
                     * 调用发送出去消息的回调函数
                     */
                    if (thunk.callback != null)
                        thunk.callback.onCompletion(metadata, null);
                } else {
                    if (thunk.callback != null)
                        thunk.callback.onCompletion(null, exception);
                }
            } catch (Exception e) {
                log.error("Error executing user-provided callback on message for topic-partition '{}'", topicPartition, e);
            }
        }

        produceFuture.done();
    }    
}    

可以看到,它对每条消息都执行了回调函数,并且会释放对应batch的内存