设为首页 加入收藏

TOP

Kafka源码之Sender分析
2019-04-24 02:34:50 】 浏览:38
Tags:Kafka 源码 Sender分析

我们先来介绍一下Sender发送消息的整个流程:首先根据RecordAccumulator的缓存情况,利用ready筛选出可以向哪些节点发送消息,然后根据生产者和各个节点的连接爱你概况,过滤Node节点,之后,生成相应的请求,这里要特别注意的是每个Node节点只生成一个请求,最后,调用NetWorkClient将请求发送出去。
在这里插入图片描述

1、从Metadata获取Kafka集群元数据
2、调用RecordAccumulator.ready方法,根据缓存情况返回可以向哪些Node节点发送信息,返回ReadyCheckResult对象
3、如果ReadyCheckResult中表示有unknownLeadersExist,则调用Metadata的requestUpdate方法,标记需要更新Kafka的集群消息
4、针对ReadyCheckResult的readyNodes集合,循环调用NetworkClient的ready方法,检查网络I/O方面是否符合发送消息的条件,不符合条件的Node将会从readyNodes集合里面删除
5、针对经过第4步处理的readyNodes集合,调用drain方法获取待发送的消息集合
6、调用abortExpireBatches方法处理超时的消息。遍历RecordAccumulator中保存的全部RecordBatch,调用mybeExpire方法进行处理。如果已超时,则调用done方法,其中会触发callback,并将RecordBatch从队列中移除,释放ByteBuffer空间
7、调用Sender.createProduceRequests方法将待发送的消息封装成ClientRequest
8、调用NetWorkClient.send方法,将ClientRequest写入KafkaChannel的send字段。
9、调用NetWorkClient.poll方法,将KafkaChannel.send字段中保存的ClientRequest发送出去,同时,还会处理服务端发回的响应、处理超时的请求、调用用户自定义Callback等
下面我们将对上面的步骤逐个分析
创建请求

private List<ClientRequest> createProduceRequests(Map<Integer, List<RecordBatch>> collated, long now) {
		//用来存储创建的ClientRequest
        List<ClientRequest> requests = new ArrayList<ClientRequest>(collated.size());
        //遍历,将collated中的每项都封装成ClientRequest
        for (Map.Entry<Integer, List<RecordBatch>> entry : collated.entrySet())
            requests.add(produceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue()));
        return requests;
    }
private ClientRequest produceRequest(long now, int destination, short acks, int timeout, List<RecordBatch> batches) {
		//用来存储每个分区的数据
        Map<TopicPartition, ByteBuffer> produceRecordsByPartition = new HashMap<TopicPartition, ByteBuffer>(batches.size());
        final Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<TopicPartition, RecordBatch>(batches.size());
        //遍历当前node上的需要发送的所有的数据
        for (RecordBatch batch : batches) {
        	//获取每项的分区
            TopicPartition tp = batch.topicPartition;
            //分别添加到两个Map中
            produceRecordsByPartition.put(tp, batch.records.buffer());
            recordsByPartition.put(tp, batch);
        }
        //将要发送到这node上的数据封装成一个request
        ProduceRequest request = new ProduceRequest(acks, timeout, produceRecordsByPartition);
        //创建RequestSend 
        RequestSend send = new RequestSend(Integer.toString(destination),
                                           this.client.nextRequestHeader(ApiKeys.PRODUCE),
                                           request.toStruct());
        //创建一个回调对象
        RequestCompletionHandler callback = new RequestCompletionHandler() {
            public void onComplete(ClientResponse response) {
                handleProduceResponse(response, recordsByPartition, time.milliseconds());
            }
        };
		//创建ClientRequest
        return new ClientRequest(now, acks != 0, send, callback);
    }

我们简单总结一下创建ClientRequest的流程:
1、将一个NodeId对应的RecordBatch集合,重新整理为produceRecordsByPartition和recordsByPartition两个集合
2、创建RequestSend和ProduceRequest
3、创建回调对象
4、创建并返回ClientRequest
在之后的流程中,发送的是RequestSend,会将ClientRequest放到缓存中,当请求收到响应或发现异常的时候,会通过缓存的ClientRequest调用其RequestCompletionHandler对象。
KSelector
KSelector使用NIO异步非阻塞模式实现网络I/O操作,KSelector使用一个单独的线程可以管理多条网络连接上的连接、读、写等操作。下面来看一下它的和新方法:

public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
		//检查是否已经和当前node建立连接了
        if (this.channels.containsKey(id))
            throw new IllegalStateException("There is already a connection for id " + id);
		//创建一个SocketChannel 
        SocketChannel socketChannel = SocketChannel.open();
        //设置为非阻塞模式
        socketChannel.configureBlocking(false);
        //设置为长连接
        Socket socket = socketChannel.socket();
        socket.setKeepAlive(true);
        //设置发送缓存和接收缓存的大小
        if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
            socket.setSendBufferSize(sendBufferSize);
        if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
            socket.setReceiveBufferSize(receiveBufferSize);
        socket.setTcpNoDelay(true);
        boolean connected;
        try {
        	//发起一个连接
            connected = socketChannel.connect(address);
        } catch (UnresolvedAddressException e) {
            socketChannel.close();
            throw new IOException("Can't resolve address: " + address, e);
        } catch (IOException e) {
            socketChannel.close();
            throw e;
        }
        //将这给socketChannel注册到nioSelector上,关注OP_CONNECT事件
        SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);
        //创建KafkaChannel
        KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
        //将KafkaChannel注册到key上
        key.attach(channel);
        //存储起来
        this.channels.put(id, channel);

        if (connected) {
            // OP_CONNECT won't trigger for immediately connected channels
            log.debug("Immediately connected to node {}", channel.id());
            immediatelyConnectedKeys.add(key);
            key.interestOps(0);
        }
    }

在这个方法中主要负责创建KafkaChannel并添加到channels集合中保存。KSelector的send方法是将之前创建的RequestSend对象缓存到KafkaChannel的send字段中,并开始关注此连接的OP_WRITE事件,并没有发生网络I/O。在下次调用poll方法时,才会将RequestSend对象发送出去,每个KafkaChannel一次poll过程只能发送一个Send请求。下面看一下这个方法:

public void poll(long timeout) throws IOException {
        if (timeout < 0)
            throw new IllegalArgumentException("timeout should be >= 0");
		//将上一次poll方法的结果全部清除掉
        clear();

        if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty())
            timeout = 0;

        //等待I/O事件
        long startSelect = time.nanoseconds();
        int readyKeys = select(timeout);
        long endSelect = time.nanoseconds();
        currentTimeNanos = endSelect;
        this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
		//处理I/O事件
        if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
            pollSelectionKeys(this.nioSelector.selectedKeys(), false);
            pollSelectionKeys(immediatelyConnectedKeys, true);
        }
		//将stagedReceives复制到completedReceives集合中
        addToCompletedReceives();

        long endIo = time.nanoseconds();
        this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
        //关闭长期空闲的连接
        maybeCloseOldestConnection();
    }

在这个方法中会调用select方法等待I/O事件的发生。当Channel可写时发送KafkaChannel.send字段,Channel可读时,读取数据到KafkaChannel.receive,读取一个完整的NetworkReceive后,会将其缓存到stagedReceives中,当一次pollSelectionKeys完成后将stagedReceives中的数据转移到completeReceives。最后会尝试关闭长期空闲的连接。
下面我们来看一下处理I/O事件的核心方法:

private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected) {
		//获取到所有的key的迭代器
        Iterator<SelectionKey> iterator = selectionKeys.iterator();
        //循环处理I/O事件
        while (iterator.hasNext()) {
            SelectionKey key = iterator.next();
            iterator.remove();
            //获取到之前注册的KafkaChannel
            KafkaChannel channel = channel(key);
            sensors.maybeRegisterConnectionMetrics(channel.id());
            //更新lru信息
            lruConnections.put(channel.id(), currentTimeNanos);

            try {

                //对connect方法返回true或OP_CONNECTION事件的处理
                if (isImmediatelyConnected || key.isConnectable()) {
                	//检测是否完成了连接,建立后会取消对OP_CONNECTION事件的关注,开始关注OP_READ事件
                    if (channel.finishConnect()) {
                    	//添加到已连接的集合中
                        this.connected.add(channel.id());
                        this.sensors.connectionCreated.record();
                    } else
                        continue;
                }

                //进行身份验证
                if (channel.isConnected() && !channel.ready())
                    channel.prepare();
				
                //OP_READ事件处理
                if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
                    NetworkReceive networkReceive;
                    while ((networkReceive = channel.read()) != null)
                    	//read方法读取到一个完整的NetworkReceive,则将其添加到stagedReceives里面保存,读取不到一个完整的networkReceive,则返回null,下次处理OP_READ事件时,继续读取,直至读取到一个完整的NetworkReceive
                        addToStagedReceives(channel, networkReceive);
                }

                //处理写事件
                if (channel.ready() && key.isWritable()) {
                	//将KafkaChannel.send字段发送出去,如果未完成发送,则返回null,如果发送完成则返回send,并添加到completeSends集合中
                    Send send = channel.write();
                    if (send != null) {
                        this.completedSends.add(send);
                        this.sensors.recordBytesSent(channel.id(), send.size());
                    }
                }

                /* cancel any defunct sockets */
                if (!key.isValid()) {
                    close(channel);
                    this.disconnected.add(channel.id());
                }

            } catch (Exception e) {
                String desc = channel.socketDescription();
                if (e instanceof IOException)
                    log.debug("Connection with {} disconnected", desc, e);
                else
                    log.warn("Unexpected error from {}; closing connection", desc, e);
                close(channel);
                this.disconnected.add(channel.id());
            }
        }
    }

最终读写操作交给了KafkaChannel方法:

private boolean send(Send send) throws IOException {
		//如果send在一次write调用时没有发送完,SelectionKey的OP_WRITE事件没有取消,还会继续监听此Channel的OP_WRITE事件,直到整个send请求发送完毕才取消
        send.writeTo(transportLayer);
        //判断是否发送完毕是根据是否还有剩余字节来判断的
        if (send.completed())
            transportLayer.removeInterestOps(SelectionKey.OP_WRITE);

        return send.completed();
    }
public void setSend(Send send) {
        if (this.send != null)
            throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
        this.send = send;
        //关注OP_WRITE事件
        this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
    }
public NetworkReceive read() throws IOException {
        NetworkReceive result = null;

        if (receive == null) {
            receive = new NetworkReceive(maxReceiveSize, id);
        }
		//初始化NetworkReceive,动transportLayer中读取数据到NetworkReceive对象中。假设并没有读完一个完整的NetworkReceive,则下次触发OP_READ事件时继续填充次NetworkReceive对象,如果读取了一个完成的NetworkReceive对象,则将次receive置空
        receive(receive);
        if (receive.complete()) {
            receive.payload().rewind();
            result = receive;
            receive = null;
        }
        return result;
    }

NetworkReceive从连接读取数据的时候,是先读取消息的头部,其中封装了消息长度,在按照其长度创建合适大小的ByteBuffer,然后读取消息体。
InFlightRequests队列的主要作用是缓存了已经发出去但没收到响应的ClientRequest

public boolean canSendMore(String node) {
		//判断是否可以向当前node发送请求
        Deque<ClientRequest> queue = requests.get(node);
        return queue == null || queue.isEmpty() ||
               (queue.peekFirst().request().completed() && queue.size() < this.maxInFlightRequestsPerConnection);
    }

queue.peekFirst().request().completed()为true的时候表示当前队头的请求已经发送完成了,如果对头的请求迟迟发送不出去,可能是网络出现问题,则不能继续向此Node发送请求。对头的消息和KafkaChannel.send字段指向的是同一个消息。
MetadataUpdater接口是一个辅助NetworkClient更新的Metadata的接口,我们看一下其中的三个字段:
metadata:指向记录了集群元数据的Metadata对象
metaFetchInProgress:用来表示是否已经发送了MetadataRequest请求更新Metadata
lastNoNodeAvailable:当检测到没有可用节点时,会用此字段记录时间戳。

public long maybeUpdate(long now) {
            //获得下一次更新元数据的时间戳
            long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
            //获取下一次尝试重新连接服务器的时间戳
            long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0);
            //检测是否已经发送了更新元数据的请求
            long waitForMetadataFetch = this.metadataFetchInProgress  Integer.MAX_VALUE : 0;
            //计算当前距离下次可以发送MetadataRequest请求的时间差
            long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt),
                    waitForMetadataFetch);
			//如果时间差为0
            if (metadataTimeout == 0) {
                // 找到负载最小的那个node
                Node node = leastLoadedNode(now);
                //创建并缓存MetadataRequest,等待下一次poll方法才会真正的发送
                maybeUpdate(now, node);
            }

            return metadataTimeout;
        }
private void maybeUpdate(long now, Node node) {
			//检测node是否可用
            if (node == null) {
                log.debug("Give up sending metadata request since no node is available");
                // mark the timestamp for no node available to connect
                this.lastNoNodeAvailableMs = now;
                return;
            }
            //获取到当前node的connectid
            String nodeConnectionId = node.idString();
			//检测是否可以发送请求
            if (canSendRequest(nodeConnectionId)) {
                this.metadataFetchInProgress = true;
                MetadataRequest metadataRequest;
                //指定需要更新元数据的Topic
                if (metadata.needMetadataForAllTopics())
                    metadataRequest = MetadataRequest.allTopics();
                else
                    metadataRequest = new MetadataRequest(new ArrayList<>(metadata.topics()));
                    //封装成ClientRequest
                ClientRequest clientRequest = request(now, nodeConnectionId, metadataRequest);
                log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
                //缓存请求,下次poll操作中会将其发送出去
                doSend(clientRequest, now);
            } else if (connectionStates.canConnect(nodeConnectionId, now)) {
                // 初始化连接
                initiateConnect(node, now);

            } else { // connected, but can't send more OR connecting
                //已成功连接,但不能发送请求,则更新lastNoNodeAvailableMs后等待
                this.lastNoNodeAvailableMs = now;
            }
        }

    }

MetadataRequest请求发送之前,要将metadataFetchInProgress置为true,然后从所有Node中选择负载最小的Node节点,向其发送更新请求,在收到MetadataResponse之后,先检测是否为MetadataResponse,如果是则解析响应,并构造Cluster对象更新Metadata.cluster字段:

public boolean maybeHandleCompletedReceive(ClientRequest req, long now, Struct body) {
            short apiKey = req.request().header().apiKey();
            //检测是否是MetadataReceive请求
            if (apiKey == ApiKeys.METADATA.id && req.isInitiatedByNetworkClient()) {
                handleResponse(req.request().header(), body, now);
                return true;
            }
            return false;
        }
private void handleResponse(RequestHeader header, Struct body, long now) {
			//修改更新状态
            this.metadataFetchInProgress = false;
            //解析响应
            MetadataResponse response = new MetadataResponse(body);
            //创建Cluster对象
            Cluster cluster = response.cluster();
            // check if any topics metadata failed to get updated
            Map<String, Errors> errors = response.errors();
            ...
            if (cluster.nodes().size() > 0) {
            	//在这个方法中,首先会通知Metadata上的监听器,之后更新cluster,最后唤醒等待Metadata更新的线程
                this.metadata.update(cluster, now);
            } else {
                log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId());
                this.metadata.failedUpdate(now);
            }
        }

经过这个步骤就可以发送下一个更新请求了
NetworkClient是一个通用的网络客户端实现,不只用于生产者发送消息,也可以用于消费者消费消息以及服务端Broker之间的通信

public void send(ClientRequest request, long now) {
		//检测是否能向指定的node发送请求
        String nodeId = request.request().destination();
        if (!canSendRequest(nodeId))
            throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
        //设置KafkaChannel.send字段,并将请求法到缓存中等待响应
        doSend(request, now);
    }
private boolean canSendRequest(String node) {
		//检测连接状态
        return connectionStates.isConnected(node) 
        //检测是否通过了身份认证
        && selector.isChannelReady(node) 
        //当前node负载的情况
        && inFlightRequests.canSendMore(node);
    }
private void doSend(ClientRequest request, long now) {
        request.setSendTimeMs(now);
        //添加到缓存中
        this.inFlightRequests.add(request);
        //设置到send字段
        selector.send(request.request());
    }

最后会通过selector的poll进行网络I/O处理

public List<ClientResponse> poll(long timeout, long now) {
        long metadataTimeout = metadataUpdater.maybeUpdate(now);
        try {
        	//执行I/O操作
            this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
        } catch (IOException e) {
            log.error("Unexpected error during I/O", e);
        }

        // 对KSelector.poll产生的各种数据和队列进行处理
        long updatedNow = this.time.milliseconds();
        List<ClientResponse> responses = new ArrayList<>();
        handleCompletedSends(responses, updatedNow);
        handleCompletedReceives(responses, updatedNow);
        handleDisconnections(responses, updatedNow);
        handleConnections();
        handleTimedOutRequests(responses, updatedNow);

        // 执行回调函数
        for (ClientResponse response : responses) {
            if (response.request().hasCallback()) {
                try {
                    response.request().callback().onComplete(response);
                } catch (Exception e) {
                    log.error("Uncaught error in request completion:", e);
                }
            }
        }

        return responses;
    }
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇通过python操作kafka 下一篇kafka 核心概念及快速入门

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目