Eureka 系列(08)心跳续约与自动过期

Spring Cloud 系列目录 - Eureka 篇

在上一篇 Eureka 系列(07)服务注册与主动下线 中对服务的注册与下线进行了分析,本文继续分析 Eureka 是如何进行心跳续约的。

1. 心跳续约

心跳续约有两种情况:一是客户端发起的心跳续约(isReplication=false);二是服务器消息广播时发起的心跳续约(isReplication=true)。这两种心跳续约的处理稍有不同。

1.1 心跳续约机制

当服务器收到客户端的心跳续约后,首先在当着服务器上更新租约时间,如果成功,则将心跳广播给其它服务器。

图1:Eureka 心跳续约机制

sequenceDiagram
participant InstanceResource
participant PeerAwareInstanceRegistryImpl
participant AbstractInstanceRegistry
participant PeerEurekaNode
note over InstanceResource: PUT:/euraka/apps/{appName}/{id}<br/>renewLease
InstanceResource ->> PeerAwareInstanceRegistryImpl : 心跳请求:renew(appName,id,isReplication)
PeerAwareInstanceRegistryImpl ->> AbstractInstanceRegistry : 1. 本地数据更新:renew(appName,id,isReplication)
loop 同步到其它 Eureka Server 节点
PeerAwareInstanceRegistryImpl ->> PeerAwareInstanceRegistryImpl : 2.1 数据同步:replicateToPeers
PeerAwareInstanceRegistryImpl ->> PeerEurekaNode : 2.2 heartbeat -> PUT:/euraka/apps/{appName}/{id}
alt 3.1 failure: 404则更新对方节点
PeerEurekaNode -->> PeerEurekaNode : register(info)
else 3.2 failure: 否则更新自己节点
PeerEurekaNode -->> PeerEurekaNode : syncInstancesIfTimestampDiffers
PeerEurekaNode -->> PeerEurekaNode : register(infoFromPeer, true)
end
end

总结:

  1. renewLease 心跳续约请求是 InstanceResource#renewLease 方法进行处理。isReplication=false 则是客户端请求,true 则是消息广播请求。
  2. renew 本地服务器心跳处理。处理成功则进行心跳消息广播。
  3. heartbeat 心跳消息广播给其它服务器。需要注意心跳广播失败的处理机制:
    • 如果对方服务器不存在该实例或 PK 失败,需要重新注册更新对方服务的实例信息。
    • 如果对方服务器 PK 成功,则需要反过来更新本地服务的注册信息。

1.2 接收心跳续约 - renewLease

InstanceResource#renewLease 处理心跳续约请求,路径是 PUT /apps/{appName}/{id}

  1. 如果本地服务端处理失败(包括实例不存在或实例的状态是UNKNOWN),就返回 NOT_FOUND,也是需要重新注册,更新实例信息。
  2. 服务端和客户端实例的 lastDirtyTimestamp 进行 PK。结果两种情况:一是服务端实例 PK 失败,返回 NOT_FOUND,客户端重新注册,从而更新服务端实例信息;二是服务端实例 PK 成功,返回实例信息给客户端,从而更新客户端实例信息。
@PUT
public Response renewLease(
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
@QueryParam("overriddenstatus") String overriddenStatus,
@QueryParam("status") String status,
@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
boolean isFromReplicaNode = "true".equals(isReplication);
// 1. 心跳处理,本地心跳处理成功后进行消息广播。
// 由于消息广播是异步的,实际返回的结果是本地心跳处理的结果。
boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode); // 2. 心跳处理失败分两种情况:一是本地服务器不存在该服务实例;
// 二是本地服务实例和lastDirtyTimestamp进行PK失败,则说明本地服务实例信息不是最新的
if (!isSuccess) {
return Response.status(Status.NOT_FOUND).build();
} // 3. 本地服务实例和请求的lastDirtyTimestamp进行PK失败,则说明本地服务实例信息不是最新的
// 后面有时间专门介绍一下 OverriddenStatus
Response response;
if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
&& (overriddenStatus != null)
&& !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
&& isFromReplicaNode) {
registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
}
} else {
response = Response.ok().build();
}
return response;
}

总结: 下面分三部分说明心跳续约的整个流程:

  1. 本地服务器是如何处理续约的?主要是 AbstractInstanceRegistry#renew 方法。
  2. 本地服务器和客户端实例的 lastDirtyTimestamp 如何进行 PK ?主要是 InstanceResource#validateDirtyTimestamp 方法。
  3. Eureka Client 是如何发起心跳续约请求,并处理请求结果?主要是 DiscoveryClient。
  4. 心跳续约消息广播如何处理?主要是 PeerEurekaNode#heartbeat 方法。

1.3 本地续约处理 - renew

本地服务端续约,如果实例不存在或实例状态是 UNKNOWN 时返回 false,表示需要客户端重新注册,更新服务端实例信息。当然返回 true 时,也不意味着数据是最新的,需要在下一步继续校验脏数据。

public boolean renew(String appName, String id, boolean isReplication) {
RENEW.increment(isReplication);
// 1. 获取服务端注册的实例
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToRenew = null;
if (gMap != null) {
leaseToRenew = gMap.get(id);
}
// 2.1 服务实例不存在,返回404
if (leaseToRenew == null) {
RENEW_NOT_FOUND.increment(isReplication);
return false;
// 2.2 服务实例存在,
} else {
InstanceInfo instanceInfo = leaseToRenew.getHolder();
if (instanceInfo != null) {
// 实例的状态是 UNKNOWN 时返回 false,否则返回 true
InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
instanceInfo, leaseToRenew, isReplication);
if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
RENEW_NOT_FOUND.increment(isReplication);
return false;
}
if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
}
}
renewsLastMin.increment();
// 3. 更新最后一次的心跳时间(核心)
leaseToRenew.renew();
return true;
}
}

总结: 如果实例存在且状态不是 UNKNOWN 时就需要在下一步继续校验脏数据。其中最核心的一名代码就是 leaseToRenew.renew() 更新最后一次的心跳时间,Eureka 的租约管理都是在 Lease 完成的。

1.4 脏数据校验 - validateDirtyTimestamp

validateDirtyTimestamp 方法主要是将客户端实例和服务端本地实例进行 PK。PK 的原则就是:服务实例 lastDirtyTimestamp 大的代表是最新的注册信息。 其实原因也很简单,每次服务实例更新时都会更新时间戳,这样时间戳大的就代表最后更新的实例,其它服务节点的实例信息都要这个服务实例进行同步。

private Response validateDirtyTimestamp(Long lastDirtyTimestamp,
boolean isReplication) {
// 1. 获取本地注册的实例,和客户端的实例进行 PK
InstanceInfo appInfo = registry.getInstanceByAppAndId(app.getName(), id, false);
if (appInfo != null) {
// 2. 客户端和服务端的实例更新的时间戳发生了变化,说明实例信息不同步了,进行PK
if ((lastDirtyTimestamp != null) && (!lastDirtyTimestamp.equals(appInfo.getLastDirtyTimestamp()))) {
// 3.1 客户端 PK 成功,客户端需要重新将实例注册一次,更新服务端的实例信息
if (lastDirtyTimestamp > appInfo.getLastDirtyTimestamp()) {
return Response.status(Status.NOT_FOUND).build();
// 3.2 服务端 PK 成功,将实例信息返回给客户端,更新客户端的实例信息
} else if (appInfo.getLastDirtyTimestamp() > lastDirtyTimestamp) {
// ture表示Eureka内部之间同步数据,需要更新实例信息
// 集群内部数据要一致,肯定要同步数据
if (isReplication) {
return Response.status(Status.CONFLICT).entity(appInfo).build();
// false表示EurekaClient的心跳,不需要同步实例信息给EurekaClient?
} else {
return Response.ok().build();
}
}
}
}
return Response.ok().build();
}

总结: 就一句话,lastDirtyTimestamp 大代表是最新的注册信息。

注意: 集群内部消息广播和 EurekaClient 心跳续约的处理不一样(3.2):

  • 集群内部消息广播:如果数据不一致,肯定要进行数据同步处理,达到最终一致性。
  • EurekaClient 心跳续约,如果服务端是最新的数据,不需要同步给客户端。

1.5 客户端处理 - renew

EurekaClient 心跳续约时,如果客户端的实例信息是最新的,需要发起重新注册,更新服务端的实例信息,但服务端的实例信息是最新的,不会更新客户端的实例信息。

// DiscoveryClient
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
// 404时重新发起注册,更新服务端的实例信息
if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
REREGISTER_COUNTER.increment();
long timestamp = instanceInfo.setIsDirtyWithTime();
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
return httpResponse.getStatusCode() == Status.OK.getStatusCode();
} catch (Throwable e) {
return false;
}
}

1.6 心跳广播 - heartbeat

心跳广播重点需要关注失败时的处理逻辑:一是返回 404,也就是客户端的实例信息是最新的,重新发起注册,更新服务端的实例信息;二是其它异常,则需要根据服务端返回的实例更新客户端的注册信息。其中第二点是和 EurekaClient 心跳续约不同的地方。

public void heartbeat(final String appName, final String id,
final InstanceInfo info, final InstanceStatus overriddenStatus,
boolean primeConnection) throws Throwable {
// 1. primeConnection时不关心心跳续约的结果,发送请求后直接返回
if (primeConnection) {
replicationClient.sendHeartBeat(appName, id, info, overriddenStatus);
return;
}
// 2. 关注请求结果,A -> B 发送心跳,成功就不说了
// 3. 心跳续约失败有两种情况:一是 B 节点不存在该实例或 PK 失败,A -> B 重新发起注册请求;
// 二是 B 节点存在该实例且 PK 成功,则反过来需要更新 A 节点该实例的注册信息。
ReplicationTask replicationTask = new InstanceReplicationTask(targetHost, Action.Heartbeat, info, overriddenStatus, false) {
@Override
public EurekaHttpResponse<InstanceInfo> execute() throws Throwable {
return replicationClient.sendHeartBeat(appName, id, info, overriddenStatus);
} @Override
public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
super.handleFailure(statusCode, responseEntity);
// 一是 B 节点不存在该实例,A -> B 重新发起注册请求
if (statusCode == 404) {
if (info != null) {
register(info);
}
// 二是 B 节点存在该实例且 PK 赢了,则反过来需要更新 A 节点该实例的注册信息
} else if (config.shouldSyncWhenTimestampDiffers()) {
InstanceInfo peerInstanceInfo = (InstanceInfo) responseEntity;
if (peerInstanceInfo != null) {
syncInstancesIfTimestampDiffers(appName, id, info, peerInstanceInfo);
}
}
}
};
long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
batchingDispatcher.process(taskId("heartbeat", info), replicationTask, expiryTime);
}

总结: 心跳广播是保证 Eureka 数据最终一致性的重要一环,只要集群内部一直发送心跳广播,如果数据出现不一致的情况就会进行数据同步,从而保证数据的最终一致性。

// 更新本地实例注册信息
private void syncInstancesIfTimestampDiffers(
String appName, String id, InstanceInfo info, InstanceInfo infoFromPeer) {
try {
if (infoFromPeer != null) {
// 1. 更新overriddenStatus状态
if (infoFromPeer.getOverriddenStatus() != null && !InstanceStatus.UNKNOWN.equals(infoFromPeer.getOverriddenStatus())) {
registry.storeOverriddenStatusIfRequired(appName, id, infoFromPeer.getOverriddenStatus());
}
// 2. 更新本地实例注册信息
registry.register(infoFromPeer, true);
}
} catch (Throwable e) {
}
}

2. 自动过期

还记得在 Eureka 系列(03)Spring Cloud 自动装配原理 中分析EurekaServerBootstrap 启动时会调用 registry.openForTraffic() 方法启动自动过期的定时任务 EvictionTask 吗?本文就从 EvictionTask 开始分析起。

2.1 启动 EvictionTask 定时任务

图2:启动自动过期定时任务

graph LR
EurekaServerBootstrap -- openForTraffic --> PeerAwareInstanceRegistryImpl
PeerAwareInstanceRegistryImpl -- postInit --> AbstractInstanceRegistry
AbstractInstanceRegistry -- start --> EvictionTask
// 启动自动过期定时任务 EvictionTask,默认每 60s 执行一次
protected void postInit() {
renewsLastMin.start();
if (evictionTaskRef.get() != null) {
evictionTaskRef.get().cancel();
}
evictionTaskRef.set(new EvictionTask());
evictionTimer.schedule(evictionTaskRef.get(),
serverConfig.getEvictionIntervalTimerInMs(),
serverConfig.getEvictionIntervalTimerInMs());
}

总结: 默认 EvictionTask 每 60s 执行一次,客户端每 30s 进行一次心跳续约,如果心跳续约超过 90s 则下线。

2.2 EvictionTask执行原理

图2:EvictionTask执行原理

sequenceDiagram
participant EvictionTask
participant AbstractInstanceRegistry
participant Lease
note left of EvictionTask : 60s定时任务
EvictionTask ->> AbstractInstanceRegistry : evict
loop 自动过期
AbstractInstanceRegistry ->> Lease : isExpired
AbstractInstanceRegistry ->> AbstractInstanceRegistry : internalCancel
end

2.2.1 如何判断是否过期

首先对 Lease 几个重要属性进行说明:

private long evictionTimestamp;		// 服务下线时间
private long registrationTimestamp; // 服务注册时间
private long serviceUpTimestamp; // 服务UP时间
private volatile long lastUpdateTimestamp; // 最后一次心跳续约时间
private long duration; // 心跳过期时间,默认 90s

Lease 每次心跳续约时都会更新最后一次续约时间 lastUpdateTimestamp。如果服务下线则会更新下线时间 evictionTimestamp,这样 evictionTimestamp > 0 就表示服务已经下线了。默认心跳续约时间超过 90s 服务就自动过期。

public boolean isExpired(long additionalLeaseMs) {
return (evictionTimestamp > 0 || System.currentTimeMillis() >
(lastUpdateTimestamp + duration + additionalLeaseMs));
}

总结: additionalLeaseMs 是一种补偿机制,可以当成默认值 0ms。

2.2.2 服务下线

服务下线时首先判断是否开启了自我保护机制,再计算出一次最多下线的实例个数,最后调用 internalCancel 将实例下线。

public void evict(long additionalLeaseMs) {
// 1. 是否开启自我保护机制
if (!isLeaseExpirationEnabled()) {
return;
} // 2. 调用 lease.isExpired 筛选出所有过期的实例
List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
if (leaseMap != null) {
for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
Lease<InstanceInfo> lease = leaseEntry.getValue();
if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
expiredLeases.add(lease);
}
}
}
} // 3. 计算一次最多下线的实例个数 toEvict
int registrySize = (int) getLocalRegistrySize();
int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
int evictionLimit = registrySize - registrySizeThreshold; int toEvict = Math.min(expiredLeases.size(), evictionLimit);
if (toEvict > 0) {
Random random = new Random(System.currentTimeMillis());
for (int i = 0; i < toEvict; i++) {
int next = i + random.nextInt(expiredLeases.size() - i);
Collections.swap(expiredLeases, i, next);
Lease<InstanceInfo> lease = expiredLeases.get(i); String appName = lease.getHolder().getAppName();
String id = lease.getHolder().getId();
EXPIRED.increment();
// 4. 和自动下线一样,调用internalCancel进行下线
internalCancel(appName, id, false);
}
}
}

总结: 自动过期和主动下线的区别是自动过期会考虑服务的自我保护,计算一次最多下线的实例个数,其余的都一样。


每天用心记录一点点。内容也许不重要,但习惯很重要!

最新文章

  1. compass color 颜色 对比色[Sass和compass学习笔记]
  2. 【ASP.NET 插件】zyUpload的HTML5上传插件
  3. HDU 4782 Beautiful Soup --模拟
  4. P1010 幂次方
  5. java事务的类型——面试被问到
  6. void指针(void*)用法
  7. STL笔记(6)标准库:标准库中的排序算法
  8. Extjs事件继承注意事项
  9. Sql Server trace flags
  10. 通过JavaScript判断当前浏览器版本
  11. Simple Automated Backups for MongoDB Replica Sets
  12. 可以部署在广域网执行QQ高仿版 GG2014 (源代码)
  13. 关于FragmentManager动态管理Fragment时Fragment生命周期的探究
  14. linux命令行传递参数定期执行PHP文件
  15. 如何在Anoconda Prompt 安装pytorch
  16. 新萌渗透测试入门DVWA 教程1:环境搭建
  17. 百度网盘提交提取密码:根据cookies获取loginId 的js
  18. PHP 函数获取文件名
  19. spark-mllib 密集向量和稀疏向量
  20. ASP.NET MVC5 支持PUT 和DELETE

热门文章

  1. 如何判断索引是否生效--explain
  2. luoguP1313 计算系数 题解(NOIP2011)
  3. opencv图像的基本操作3
  4. Ptyhon 合并列表
  5. 【BZOJ2138】stone
  6. 42.Flatten Binary Tree to Linked List
  7. form 表单的name
  8. SQL数据库&mdash;&lt;1&gt;SQL语言
  9. NOIP2016D1T3 换教室 (概率DP)
  10. cglib代理与jdk动态代理示例