我们先来介绍一下Sender发送消息的整个流程:首先根据RecordAccumulator的缓存情况,利用ready筛选出可以向哪些节点发送消息,然后根据生产者和各个节点的连接爱你概况,过滤Node节点,之后,生成相应的请求,这里要特别注意的是每个Node节点只生成一个请求,最后,调用NetWorkClient将请求发送出去。
1、从Metadata获取Kafka集群元数据
2、调用RecordAccumulator.ready方法,根据缓存情况返回可以向哪些Node节点发送信息,返回ReadyCheckResult对象
3、如果ReadyCheckResult中表示有unknownLeadersExist,则调用Metadata的requestUpdate方法,标记需要更新Kafka的集群消息
4、针对ReadyCheckResult的readyNodes集合,循环调用NetworkClient的ready方法,检查网络I/O方面是否符合发送消息的条件,不符合条件的Node将会从readyNodes集合里面删除
5、针对经过第4步处理的readyNodes集合,调用drain方法获取待发送的消息集合
6、调用abortExpireBatches方法处理超时的消息。遍历RecordAccumulator中保存的全部RecordBatch,调用mybeExpire方法进行处理。如果已超时,则调用done方法,其中会触发callback,并将RecordBatch从队列中移除,释放ByteBuffer空间
7、调用Sender.createProduceRequests方法将待发送的消息封装成ClientRequest
8、调用NetWorkClient.send方法,将ClientRequest写入KafkaChannel的send字段。
9、调用NetWorkClient.poll方法,将KafkaChannel.send字段中保存的ClientRequest发送出去,同时,还会处理服务端发回的响应、处理超时的请求、调用用户自定义Callback等
下面我们将对上面的步骤逐个分析
创建请求
private List<ClientRequest> createProduceRequests(Map<Integer, List<RecordBatch>> collated, long now) {
//用来存储创建的ClientRequest
List<ClientRequest> requests = new ArrayList<ClientRequest>(collated.size());
//遍历,将collated中的每项都封装成ClientRequest
for (Map.Entry<Integer, List<RecordBatch>> entry : collated.entrySet())
requests.add(produceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue()));
return requests;
}
private ClientRequest produceRequest(long now, int destination, short acks, int timeout, List<RecordBatch> batches) {
//用来存储每个分区的数据
Map<TopicPartition, ByteBuffer> produceRecordsByPartition = new HashMap<TopicPartition, ByteBuffer>(batches.size());
final Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<TopicPartition, RecordBatch>(batches.size());
//遍历当前node上的需要发送的所有的数据
for (RecordBatch batch : batches) {
//获取每项的分区
TopicPartition tp = batch.topicPartition;
//分别添加到两个Map中
produceRecordsByPartition.put(tp, batch.records.buffer());
recordsByPartition.put(tp, batch);
}
//将要发送到这node上的数据封装成一个request
ProduceRequest request = new ProduceRequest(acks, timeout, produceRecordsByPartition);
//创建RequestSend
RequestSend send = new RequestSend(Integer.toString(destination),
this.client.nextRequestHeader(ApiKeys.PRODUCE),
request.toStruct());
//创建一个回调对象
RequestCompletionHandler callback = new RequestCompletionHandler() {
public void onComplete(ClientResponse response) {
handleProduceResponse(response, recordsByPartition, time.milliseconds());
}
};
//创建ClientRequest
return new ClientRequest(now, acks != 0, send, callback);
}
我们简单总结一下创建ClientRequest的流程:
1、将一个NodeId对应的RecordBatch集合,重新整理为produceRecordsByPartition和recordsByPartition两个集合
2、创建RequestSend和ProduceRequest
3、创建回调对象
4、创建并返回ClientRequest
在之后的流程中,发送的是RequestSend,会将ClientRequest放到缓存中,当请求收到响应或发现异常的时候,会通过缓存的ClientRequest调用其RequestCompletionHandler对象。
KSelector
KSelector使用NIO异步非阻塞模式实现网络I/O操作,KSelector使用一个单独的线程可以管理多条网络连接上的连接、读、写等操作。下面来看一下它的和新方法:
public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
//检查是否已经和当前node建立连接了
if (this.channels.containsKey(id))
throw new IllegalStateException("There is already a connection for id " + id);
//创建一个SocketChannel
SocketChannel socketChannel = SocketChannel.open();
//设置为非阻塞模式
socketChannel.configureBlocking(false);
//设置为长连接
Socket socket = socketChannel.socket();
socket.setKeepAlive(true);
//设置发送缓存和接收缓存的大小
if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
socket.setSendBufferSize(sendBufferSize);
if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
socket.setReceiveBufferSize(receiveBufferSize);
socket.setTcpNoDelay(true);
boolean connected;
try {
//发起一个连接
connected = socketChannel.connect(address);
} catch (UnresolvedAddressException e) {
socketChannel.close();
throw new IOException("Can't resolve address: " + address, e);
} catch (IOException e) {
socketChannel.close();
throw e;
}
//将这给socketChannel注册到nioSelector上,关注OP_CONNECT事件
SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);
//创建KafkaChannel
KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
//将KafkaChannel注册到key上
key.attach(channel);
//存储起来
this.channels.put(id, channel);
if (connected) {
// OP_CONNECT won't trigger for immediately connected channels
log.debug("Immediately connected to node {}", channel.id());
immediatelyConnectedKeys.add(key);
key.interestOps(0);
}
}
在这个方法中主要负责创建KafkaChannel并添加到channels集合中保存。KSelector的send方法是将之前创建的RequestSend对象缓存到KafkaChannel的send字段中,并开始关注此连接的OP_WRITE事件,并没有发生网络I/O。在下次调用poll方法时,才会将RequestSend对象发送出去,每个KafkaChannel一次poll过程只能发送一个Send请求。下面看一下这个方法:
public void poll(long timeout) throws IOException {
if (timeout < 0)
throw new IllegalArgumentException("timeout should be >= 0");
//将上一次poll方法的结果全部清除掉
clear();
if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty())
timeout = 0;
//等待I/O事件
long startSelect = time.nanoseconds();
int readyKeys = select(timeout);
long endSelect = time.nanoseconds();
currentTimeNanos = endSelect;
this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
//处理I/O事件
if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
pollSelectionKeys(this.nioSelector.selectedKeys(), false);
pollSelectionKeys(immediatelyConnectedKeys, true);
}
//将stagedReceives复制到completedReceives集合中
addToCompletedReceives();
long endIo = time.nanoseconds();
this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
//关闭长期空闲的连接
maybeCloseOldestConnection();
}
在这个方法中会调用select方法等待I/O事件的发生。当Channel可写时发送KafkaChannel.send字段,Channel可读时,读取数据到KafkaChannel.receive,读取一个完整的NetworkReceive后,会将其缓存到stagedReceives中,当一次pollSelectionKeys完成后将stagedReceives中的数据转移到completeReceives。最后会尝试关闭长期空闲的连接。
下面我们来看一下处理I/O事件的核心方法:
private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, boolean isImmediatelyConnected) {
//获取到所有的key的迭代器
Iterator<SelectionKey> iterator = selectionKeys.iterator();
//循环处理I/O事件
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
//获取到之前注册的KafkaChannel
KafkaChannel channel = channel(key);
sensors.maybeRegisterConnectionMetrics(channel.id());
//更新lru信息
lruConnections.put(channel.id(), currentTimeNanos);
try {
//对connect方法返回true或OP_CONNECTION事件的处理
if (isImmediatelyConnected || key.isConnectable()) {
//检测是否完成了连接,建立后会取消对OP_CONNECTION事件的关注,开始关注OP_READ事件
if (channel.finishConnect()) {
//添加到已连接的集合中
this.connected.add(channel.id());
this.sensors.connectionCreated.record();
} else
continue;
}
//进行身份验证
if (channel.isConnected() && !channel.ready())
channel.prepare();
//OP_READ事件处理
if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
NetworkReceive networkReceive;
while ((networkReceive = channel.read()) != null)
//read方法读取到一个完整的NetworkReceive,则将其添加到stagedReceives里面保存,读取不到一个完整的networkReceive,则返回null,下次处理OP_READ事件时,继续读取,直至读取到一个完整的NetworkReceive
addToStagedReceives(channel, networkReceive);
}
//处理写事件
if (channel.ready() && key.isWritable()) {
//将KafkaChannel.send字段发送出去,如果未完成发送,则返回null,如果发送完成则返回send,并添加到completeSends集合中
Send send = channel.write();
if (send != null) {
this.completedSends.add(send);
this.sensors.recordBytesSent(channel.id(), send.size());
}
}
/* cancel any defunct sockets */
if (!key.isValid()) {
close(channel);
this.disconnected.add(channel.id());
}
} catch (Exception e) {
String desc = channel.socketDescription();
if (e instanceof IOException)
log.debug("Connection with {} disconnected", desc, e);
else
log.warn("Unexpected error from {}; closing connection", desc, e);
close(channel);
this.disconnected.add(channel.id());
}
}
}
最终读写操作交给了KafkaChannel方法:
private boolean send(Send send) throws IOException {
//如果send在一次write调用时没有发送完,SelectionKey的OP_WRITE事件没有取消,还会继续监听此Channel的OP_WRITE事件,直到整个send请求发送完毕才取消
send.writeTo(transportLayer);
//判断是否发送完毕是根据是否还有剩余字节来判断的
if (send.completed())
transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
return send.completed();
}
public void setSend(Send send) {
if (this.send != null)
throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
this.send = send;
//关注OP_WRITE事件
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}
public NetworkReceive read() throws IOException {
NetworkReceive result = null;
if (receive == null) {
receive = new NetworkReceive(maxReceiveSize, id);
}
//初始化NetworkReceive,动transportLayer中读取数据到NetworkReceive对象中。假设并没有读完一个完整的NetworkReceive,则下次触发OP_READ事件时继续填充次NetworkReceive对象,如果读取了一个完成的NetworkReceive对象,则将次receive置空
receive(receive);
if (receive.complete()) {
receive.payload().rewind();
result = receive;
receive = null;
}
return result;
}
NetworkReceive从连接读取数据的时候,是先读取消息的头部,其中封装了消息长度,在按照其长度创建合适大小的ByteBuffer,然后读取消息体。
InFlightRequests队列的主要作用是缓存了已经发出去但没收到响应的ClientRequest
public boolean canSendMore(String node) {
//判断是否可以向当前node发送请求
Deque<ClientRequest> queue = requests.get(node);
return queue == null || queue.isEmpty() ||
(queue.peekFirst().request().completed() && queue.size() < this.maxInFlightRequestsPerConnection);
}
queue.peekFirst().request().completed()为true的时候表示当前队头的请求已经发送完成了,如果对头的请求迟迟发送不出去,可能是网络出现问题,则不能继续向此Node发送请求。对头的消息和KafkaChannel.send字段指向的是同一个消息。
MetadataUpdater接口是一个辅助NetworkClient更新的Metadata的接口,我们看一下其中的三个字段:
metadata:指向记录了集群元数据的Metadata对象
metaFetchInProgress:用来表示是否已经发送了MetadataRequest请求更新Metadata
lastNoNodeAvailable:当检测到没有可用节点时,会用此字段记录时间戳。
public long maybeUpdate(long now) {
//获得下一次更新元数据的时间戳
long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
//获取下一次尝试重新连接服务器的时间戳
long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0);
//检测是否已经发送了更新元数据的请求
long waitForMetadataFetch = this.metadataFetchInProgress Integer.MAX_VALUE : 0;
//计算当前距离下次可以发送MetadataRequest请求的时间差
long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt),
waitForMetadataFetch);
//如果时间差为0
if (metadataTimeout == 0) {
// 找到负载最小的那个node
Node node = leastLoadedNode(now);
//创建并缓存MetadataRequest,等待下一次poll方法才会真正的发送
maybeUpdate(now, node);
}
return metadataTimeout;
}
private void maybeUpdate(long now, Node node) {
//检测node是否可用
if (node == null) {
log.debug("Give up sending metadata request since no node is available");
// mark the timestamp for no node available to connect
this.lastNoNodeAvailableMs = now;
return;
}
//获取到当前node的connectid
String nodeConnectionId = node.idString();
//检测是否可以发送请求
if (canSendRequest(nodeConnectionId)) {
this.metadataFetchInProgress = true;
MetadataRequest metadataRequest;
//指定需要更新元数据的Topic
if (metadata.needMetadataForAllTopics())
metadataRequest = MetadataRequest.allTopics();
else
metadataRequest = new MetadataRequest(new ArrayList<>(metadata.topics()));
//封装成ClientRequest
ClientRequest clientRequest = request(now, nodeConnectionId, metadataRequest);
log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
//缓存请求,下次poll操作中会将其发送出去
doSend(clientRequest, now);
} else if (connectionStates.canConnect(nodeConnectionId, now)) {
// 初始化连接
initiateConnect(node, now);
} else { // connected, but can't send more OR connecting
//已成功连接,但不能发送请求,则更新lastNoNodeAvailableMs后等待
this.lastNoNodeAvailableMs = now;
}
}
}
MetadataRequest请求发送之前,要将metadataFetchInProgress置为true,然后从所有Node中选择负载最小的Node节点,向其发送更新请求,在收到MetadataResponse之后,先检测是否为MetadataResponse,如果是则解析响应,并构造Cluster对象更新Metadata.cluster字段:
public boolean maybeHandleCompletedReceive(ClientRequest req, long now, Struct body) {
short apiKey = req.request().header().apiKey();
//检测是否是MetadataReceive请求
if (apiKey == ApiKeys.METADATA.id && req.isInitiatedByNetworkClient()) {
handleResponse(req.request().header(), body, now);
return true;
}
return false;
}
private void handleResponse(RequestHeader header, Struct body, long now) {
//修改更新状态
this.metadataFetchInProgress = false;
//解析响应
MetadataResponse response = new MetadataResponse(body);
//创建Cluster对象
Cluster cluster = response.cluster();
// check if any topics metadata failed to get updated
Map<String, Errors> errors = response.errors();
...
if (cluster.nodes().size() > 0) {
//在这个方法中,首先会通知Metadata上的监听器,之后更新cluster,最后唤醒等待Metadata更新的线程
this.metadata.update(cluster, now);
} else {
log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId());
this.metadata.failedUpdate(now);
}
}
经过这个步骤就可以发送下一个更新请求了
NetworkClient是一个通用的网络客户端实现,不只用于生产者发送消息,也可以用于消费者消费消息以及服务端Broker之间的通信
public void send(ClientRequest request, long now) {
//检测是否能向指定的node发送请求
String nodeId = request.request().destination();
if (!canSendRequest(nodeId))
throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
//设置KafkaChannel.send字段,并将请求法到缓存中等待响应
doSend(request, now);
}
private boolean canSendRequest(String node) {
//检测连接状态
return connectionStates.isConnected(node)
//检测是否通过了身份认证
&& selector.isChannelReady(node)
//当前node负载的情况
&& inFlightRequests.canSendMore(node);
}
private void doSend(ClientRequest request, long now) {
request.setSendTimeMs(now);
//添加到缓存中
this.inFlightRequests.add(request);
//设置到send字段
selector.send(request.request());
}
最后会通过selector的poll进行网络I/O处理
public List<ClientResponse> poll(long timeout, long now) {
long metadataTimeout = metadataUpdater.maybeUpdate(now);
try {
//执行I/O操作
this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
}
// 对KSelector.poll产生的各种数据和队列进行处理
long updatedNow = this.time.milliseconds();
List<ClientResponse> responses = new ArrayList<>();
handleCompletedSends(responses, updatedNow);
handleCompletedReceives(responses, updatedNow);
handleDisconnections(responses, updatedNow);
handleConnections();
handleTimedOutRequests(responses, updatedNow);
// 执行回调函数
for (ClientResponse response : responses) {
if (response.request().hasCallback()) {
try {
response.request().callback().onComplete(response);
} catch (Exception e) {
log.error("Uncaught error in request completion:", e);
}
}
}
return responses;
}