TOP

Kafka消费者Heartbeat分析
2019-01-06 02:32:01 】 浏览:603
Tags:Kafka 消费者 Heartbeat分析

消费者会定期向GroupCoordinator发送HeartbeatRequest来确定 彼此在线,也就是说告诉GroupCoordinator我还活着,或者也判断GrooupCoordinator是否还活着

HeartbeatRequest的组成:它是由groupId,generationId,memberId.

HeartbeatResponse组成:它只有一个errorCode


HeartbeatThread是专门处理Heartbeat的一个线程类

源码分析:

一 Heartbeat

// session到期时间
private final longsessionTimeout;
//发送heartbeat的间隔
private final longheartbeatInterval;
//最大的poll间隔
private final longmaxPollInterval;
//重试时间
private final longretryBackoffMs;

//上一次发送heartbeat时间
private volatile longlastHeartbeatSend;// volatile since it is read by metrics
//
上一次接收heartbeat响应时间
private longlastHeartbeatReceive;
private longlastSessionReset;
private longlastPoll;
// heartbeat是否成功
private booleanheartbeatFailed;

// 更新lastPoll时间
public void poll(long now) {
 this.lastPoll = now;
}

// 更新上一次心跳发送时间
public void sentHeartbeat(long now) {
 this.lastHeartbeatSend = now;
 this.heartbeatFailed = false;
}

// 更新心跳状态为失败
public void failHeartbeat() {
 this.heartbeatFailed = true;
}
// 更新上次接收心跳时间
public void receiveHeartbeat(long now) {
 this.lastHeartbeatReceive = now;
}

// 更新上一次心跳发送时间
public boolean shouldHeartbeat(long now) {
 return timeToNextHeartbeat(now) == 0;
}

// 判断session是否过期
public boolean sessionTimeoutExpired(long now) {
 return now - Math.max(lastSessionReset, lastHeartbeatReceive) > sessionTimeout;
}

// 判断poll是否过期
public boolean pollTimeoutExpired(long now) {
 return now - lastPoll > maxPollInterval;
}

二 HeartbeatThread

public void run() {
 try {
 while (true) {
 synchronized (AbstractCoordinator.this) {
 if (closed)
 return;
 // 是否enable HeartbeatThread
 if (!enabled) {
 AbstractCoordinator.this.wait();
 continue;
 }
 // 如果消费者状态如果不是STABLE(消费者已经加入消费者组,并且开始发送心跳)
 if (state != MemberState.STABLE) {
 // 可能是消费者离开消费者组或者coordinator把我们踢了,所以需要disable heartbeats,等待主线程重新加入
 disable();
 continue;
 }

 client.pollNoWakeup();
 long now = time.milliseconds();
 // 检测GroupCoordinator是否已连接
 if (coordinatorUnknown()) {
 // 如果没有连接,则查找GroupCoordinator,并返回一个请求结果
 if (findCoordinatorFuture == null)
 lookupCoordinator();
 else
 AbstractCoordinator.this.wait(retryBackoffMs);
 } else if (heartbeat.sessionTimeoutExpired(now)) {// 检测HeartbeatRespose是否超时
 // 如果超时,则认为GroupCoordinator宕机,调用coordinatorDead方法清空unsent集合中的
 // 请求,将coordinator 设置为null,表示将重新选举GroupCoordinator
 coordinatorDead();
 } else if (heartbeat.pollTimeoutExpired(now)) {
 // the poll timeout has expired, which means that the foreground thread has stalled
 // in between calls to poll(), so we explicitly leave the group.
 maybeLeaveGroup();
 } else if (!heartbeat.shouldHeartbeat(now)) {// 没有到心跳请求的发送时间,等待
 // poll again after waiting for the retry backoff in case the heartbeat failed or the
 // coordinator disconnected
 AbstractCoordinator.this.wait(retryBackoffMs);
 } else {
 // 更新lastHeartbeatSend的时间,并且初始化heartbeatFailed
 heartbeat.sentHeartbeat(now);
 // 构造HeartbeatRequest对象,通过ConsumerClientNetwork添加到unsent队列,
 // 等待发送,结果HeartbeatResponseHandler处理后返回一个RequestFuture
 // 添加RequestFutureListener监听器,如果成功更新lastHeartbeatReceive时间
 // 如果失败,则需要看情况:
 // # 如果是正处于rebalance过程还是更新lastHeartbeatReceive时间
 // # 标记heartbeat请求失败
 sendHeartbeatRequest().addListener(new RequestFutureListener<Void>() {
 @Override
 public void onSuccess(Void value) {
 synchronized (AbstractCoordinator.this) {
 heartbeat.receiveHeartbeat(time.milliseconds());
 }
 }

 @Override
 public void onFailure(RuntimeException e) {
 synchronized (AbstractCoordinator.this) {
 if (e instanceof RebalanceInProgressException) {
 // it is valid to continue heartbeating while the group is rebalancing. This
 // ensures that the coordinator keeps the member in the group for as long
 // as the duration of the rebalance timeout. If we stop sending heartbeats,
 // however, then the session timeout may expire before we can rejoin.
 heartbeat.receiveHeartbeat(time.milliseconds());
 } else {
 heartbeat.failHeartbeat();

 // wake up the thread if it's sleeping to reschedule the heartbeat
 AbstractCoordinator.this.notify();
 }
 }
 }
 });
 }
 }
 }
 } catch (InterruptedException e) {
 log.error("Unexpected interrupt received in heartbeat thread for group {}", groupId, e);
 this.failed.set(new RuntimeException(e));
 } catch (RuntimeException e) {
 log.error("Heartbeat thread for group {} failed due to unexpected error" , groupId, e);
 this.failed.set(e);
 }
 }

}

三sendHeartbeatRequest
// 构造HeartbeatRequest对象,通过ConsumerClientNetwork添加到unsent队列,
// 等待发送,结果HeartbeatResponseHandler处理后返回一个RequestFuture
synchronized RequestFuture<Void> sendHeartbeatRequest() {
 HeartbeatRequest req = new HeartbeatRequest(this.groupId, this.generation.generationId, this.generation.memberId);
 return client.send(coordinator, ApiKeys.HEARTBEAT, req)
 .compose(new HeartbeatResponseHandler());
}

四 HeartbeatResponse的处理

private class HeartbeatResponseHandler extends CoordinatorResponseHandler<HeartbeatResponse, Void> {

 // ClientResponse转换成HeartbeatResponse
 @Override
 public HeartbeatResponse parse(ClientResponse response) {
 return new HeartbeatResponse(response.responseBody());
 }

 @Override
 public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) {
 sensors.heartbeatLatency.record(response.requestLatencyMs());
 Errors error = Errors.forCode(heartbeatResponse.errorCode());
 if (error == Errors.NONE) {// 心跳正常,没有错误
 log.debug("Received successful heartbeat response for group {}", groupId);
 future.complete(null);
 } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
 || error == Errors.NOT_COORDINATOR_FOR_GROUP) {// 找不到服务器端的GroupCoordinator
 log.debug("Attempt to heart beat failed for group {} since coordinator {} is either not started or not valid.",
 groupId, coordinator());
 coordinatorDead();// 清空unsent集合中请求与,并置空coordinator
 future.raise(error);
 } else if (error == Errors.REBALANCE_IN_PROGRESS) {// 如果正在rebalance
 log.debug("Attempt to heart beat failed for group {} since it is rebalancing.", groupId);
 requestRejoin();// 重新发送JoinGroupRequest
 future.raise(Errors.REBALANCE_IN_PROGRESS);
 } else if (error == Errors.ILLEGAL_GENERATION) {//如果Generation不合法
 log.debug("Attempt to heart beat failed for group {} since generation id is not legal.", groupId);
 resetGeneration();//重新设置Generation
 future.raise(Errors.ILLEGAL_GENERATION);
 } else if (error == Errors.UNKNOWN_MEMBER_ID) {// 如果member未知
 log.debug("Attempt to heart beat failed for group {} since member id is not valid.", groupId);
 resetGeneration();//重新设置Generation
 future.raise(Errors.UNKNOWN_MEMBER_ID);
 } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
 future.raise(new GroupAuthorizationException(groupId));
 } else {
 future.raise(new KafkaException("Unexpected error in heartbeat response: " + error.message()));
 }
 }
}


Kafka消费者Heartbeat分析 https://www.cppentry.com/bencandy.php?fid=120&id=202745

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇kafka学习五:开发consumer 下一篇Kafka单机模式搭建