Eureka核心源码解析(一):应用实例注册、续约、下线

阅读: 评论:0

Eureka核心源码解析(一):应用实例注册、续约、下线

Eureka核心源码解析(一):应用实例注册、续约、下线

本文主要来解析Eureka应用实例注册、续约、下线的核心源码,基于1.9.8版本

一、应用实例注册

1、Eureka Client发起注册

Eureka Client向Eureka Server发起注册应用实例需要符合如下条件:

  • 配置,Eureka Client向Eureka Server发起注册应用实例的开关
  • InstanceInfo在Eureka Client和Eureka Server数据不一致

每次InstanceInfo发生属性变化时,标记isInstanceInfoDirty属性为true,表示InstanceInfo在Eureka Client和Eureka Server数据不一致,需要注册。另外,InstanceInfo刚被创建时,在Eureka Server不存在,也会被注册

当符合条件时,InstanceInfo不会立即向Eureka Server注册,而是后台线程定时注册

当InstanceInfo的状态(status)属性发生变化时,并且配置eureka.shouldOnDemandUpdateStatusChange=true(默认为true)时,立即向Eureka Server注册

1)、应用实例信息复制
public class DiscoveryClient implements EurekaClient {/*** 应用实例状态变更监听器*/private ApplicationInfoManager.StatusChangeListener statusChangeListener;/*** 应用实例信息复制器*/private InstanceInfoReplicator instanceInfoReplicator;private void initScheduledTasks() {if (clientConfig.shouldFetchRegistry()) {// registry cache refresh timerint registryFetchIntervalSeconds = RegistryFetchIntervalSeconds();int expBackOffBound = CacheRefreshExecutorExponentialBackOffBound();scheduler.schedule(new TimedSupervisorTask("cacheRefresh",scheduler,cacheRefreshExecutor,registryFetchIntervalSeconds,TimeUnit.SECONDS,expBackOffBound,new CacheRefreshThread()),registryFetchIntervalSeconds, TimeUnit.SECONDS);}// 向EurekaServer心跳执行器if (clientConfig.shouldRegisterWithEureka()) {int renewalIntervalInSecs = LeaseInfo().getRenewalIntervalInSecs();int expBackOffBound = HeartbeatExecutorExponentialBackOffBound();logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);// Heartbeat timerscheduler.schedule(new TimedSupervisorTask("heartbeat",scheduler,heartbeatExecutor,renewalIntervalInSecs,TimeUnit.SECONDS,expBackOffBound,new HeartbeatThread()),renewalIntervalInSecs, TimeUnit.SECONDS);// 创建应用实例状态变更监听器// InstanceInfo replicatorinstanceInfoReplicator = new InstanceInfoReplicator(this,InstanceInfoReplicationIntervalSeconds(),2); // burstSizestatusChangeListener = new ApplicationInfoManager.StatusChangeListener() {@Overridepublic String getId() {return "statusChangeListener";}@Overridepublic void notify(StatusChangeEvent statusChangeEvent) {if (InstanceStatus.DOWN == Status() ||InstanceStatus.DOWN == PreviousStatus()) {// log at warn level if DOWN was involvedlogger.warn("Saw local status change event {}", statusChangeEvent);} else {logger.info("Saw local status change event {}", statusChangeEvent);}DemandUpdate();}};// 注册应用实例状态变更监听器if (clientConfig.shouldOnDemandUpdateStatusChange()) {isterStatusChangeListener(statusChangeListener);}// 开启应用实例信息复制器instanceInfoReplicator.InitialInstanceInfoReplicationIntervalSeconds());} else {logger.info("Not registering with Eureka server per configuration");}}

调用InstanceInfoReplicator的start(int initialDelayMs) 方法,开启应用实例信息复制器。实现代码如下:

class InstanceInfoReplicator implements Runnable {public void start(int initialDelayMs) {if (startedpareAndSet(false, true)) {// 设置应用实例信息数据不一致instanceInfo.setIsDirty();  // for initial register// 提交任务Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);scheduledPeriodicRef.set(next);}}public void run() {try {// 刷新应用实例信息freshInstanceInfo();// 判断应用实例信息是否数据不一致Long dirtyTimestamp = instanceInfo.isDirtyWithTime();if (dirtyTimestamp != null) {// 发起注册ister();// 设置应用实例信息数据一致instanceInfo.unsetIsDirty(dirtyTimestamp);}} catch (Throwable t) {logger.warn("There was a problem with the instance info replicator", t);} finally {// 提交任务 不断循环定时执行任务Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);scheduledPeriodicRef.set(next);}}

调用DiscoveryClient的register() 方法,EurekaClient向Eureka Server注册应用实例

2)、发起注册应用实例
public class DiscoveryClient implements EurekaClient {boolean register() throws Throwable {logger.info(PREFIX + "{}: ", appPathIdentifier);EurekaHttpResponse<Void> httpResponse;try {// 调用AbstractJerseyEurekaHttpClient的register方法httpResponse = ister(instanceInfo);} catch (Exception e) {logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);throw e;}if (logger.isInfoEnabled()) {logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, StatusCode());}StatusCode() == Status.StatusCode();}
public abstract class AbstractJerseyEurekaHttpClient implements EurekaHttpClient {@Overridepublic EurekaHttpResponse<Void> register(InstanceInfo info) {String urlPath = "apps/" + AppName();ClientResponse response = null;try {Builder resourceBuilder = source(serviceUrl).path(urlPath).getRequestBuilder();addExtraHeaders(resourceBuilder);response = resourceBuilder.header("Accept-Encoding", "gzip").type(MediaType.APPLICATION_JSON_TYPE).accept(MediaType.APPLICATION_JSON).post(ClientResponse.class, info);return Status()).headers(headersOf(response)).build();} finally {if (logger.isDebugEnabled()) {logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, Id(),response == null ? "N/A" : Status());}if (response != null) {response.close();}}}

AbstractJerseyEurekaHttpClient的register()方法使用POST请求调用Eureka Server的apps/${APP_NAME}接口,参数为InstanceInfo,实现注册实例信息的注册

2、Eureka Server接收注册

Eureka Server接收注册核心流程如下图:

1)、接收注册请求
@Produces({"application/xml", "application/json"})
public class ApplicationResource {@POST@Consumes({"application/json", "application/xml"})public Response addInstance(InstanceInfo info,@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {logger.debug("Registering instance {} (replication={})", Id(), isReplication);// 参数合法性校验// validate that the instanceinfo contains all the necessary required fieldsif (Id())) {return Response.status(400).entity("Missing instanceId").build();} else if (HostName())) {return Response.status(400).entity("Missing hostname").build();} else if (IPAddr())) {return Response.status(400).entity("Missing ip address").build();} else if (AppName())) {return Response.status(400).entity("Missing appName").build();} else if (!appName.AppName())) {return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + AppName()).build();} else if (DataCenterInfo() == null) {return Response.status(400).entity("Missing dataCenterInfo").build();} else if (DataCenterInfo().getName() == null) {return Response.status(400).entity("Missing dataCenterInfo Name").build();}// handle cases where clients may be registering with bad DataCenterInfo with missing dataDataCenterInfo dataCenterInfo = DataCenterInfo();if (dataCenterInfo instanceof UniqueIdentifier) {String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();if (isBlank(dataCenterInfoId)) {boolean experimental = "true".Experimental("registration.validation.dataCenterInfoId"));if (experimental) {String entity = "DataCenterInfo of type " + Class() + " must contain a valid id";return Response.status(400).entity(entity).build();} else if (dataCenterInfo instanceof AmazonInfo) {AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;String effectiveId = (AmazonInfo.MetaDataKey.instanceId);if (effectiveId == null) {Metadata().put(AmazonInfo.Name(), Id());}} else {logger.warn("Registering DataCenterInfo of type {} without an appropriate id", Class());}}}// 注册应用实例信息ister(info, "true".equals(isReplication));// 返回204状态码return Response.status(204).build();  // 204 to be backwards compatible}  
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {@Overridepublic void register(final InstanceInfo info, final boolean isReplication) {// 续约过期时间int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;if (LeaseInfo() != null && LeaseInfo().getDurationInSecs() > 0) {leaseDuration = LeaseInfo().getDurationInSecs();}// 注册应用实例信息ister(info, leaseDuration, isReplication);// Eureka Server复制replicateToPeers(Action.Register, AppName(), Id(), info, null, isReplication);}

PeerAwareInstanceRegistryImpl中调用了父类AbstractInstanceRegistry的register(...)方法注册实例信息

2)、Lease
public class Lease<T> {enum Action {Register, Cancel, Renew};public static final int DEFAULT_DURATION_IN_SECS = 90;/*** InstanceInfo实体*/private T holder;/*** 取消注册时间戳*/private long evictionTimestamp;/*** 注册时间戳*/private long registrationTimestamp;/*** 开始服务时间戳*/private long serviceUpTimestamp;/*** 租约最后更新时间戳*/// Make it volatile so that the expiration task would see this quickerprivate volatile long lastUpdateTimestamp;/*** 租约持续时长(毫秒)*/private long duration;public Lease(T r, int durationInSecs) {holder = r;registrationTimestamp = System.currentTimeMillis();lastUpdateTimestamp = registrationTimestamp;duration = (durationInSecs * 1000);}
3)、注册应用实例信息

调用了AbstractInstanceRegistry的register(...)方法,注册实例信息,代码如下:

public abstract class AbstractInstanceRegistry implements InstanceRegistry {/*** 租约映射* key1:应用名* key2:应用实例信息编号* value:租约*/private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry= new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {try {// 获取读锁read.lock();Map<String, Lease<InstanceInfo>> gMap = (AppName());// 增加注册次数到监控REGISTER.increment(isReplication);// 获得应用实例信息对应的租约if (gMap == null) {final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();// 添加应用gMap = registry.AppName(), gNewMap);if (gMap == null) {gMap = gNewMap;}}Lease<InstanceInfo> existingLease = (Id());// Retain the last dirty timestamp without overwriting it, if there is already a leaseif (existingLease != null && (Holder() != null)) {Long existingLastDirtyTimestamp = Holder().getLastDirtyTimestamp();Long registrationLastDirtyTimestamp = LastDirtyTimestamp();logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);// this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted// InstanceInfo instead of the server local copy.if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");registrant = Holder();}} else {// The lease does not exist and hence it is a new registrationsynchronized (lock) {if (pectedNumberOfClientsSendingRenews > 0) {// Since the client wants to register it, increase the number of clients pectedNumberOfClientsSendingRenews = pectedNumberOfClientsSendingRenews + 1;updateRenewsPerMinThreshold();}}logger.debug("No previous lease information found; it is new registration");}// 创建租约Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);if (existingLease != null) {lease.ServiceUpTimestamp());}// 添加到租约gMap.Id(), lease);// 添加到最近注册的调试队列synchronized (recentRegisteredQueue) {recentRegisteredQueue.add(new Pair<Long, String>(System.currentTimeMillis(),AppName() + "(" + Id() + ")"));}// 添加到应用实例覆盖状态映射// This is where the initial state transfer of overridden status happensif (!InstanceStatus.UNKNOWN.OverriddenStatus())) {logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "+ "overrides", OverriddenStatus(), Id());if (!Id())) {logger.info("Not found overridden id {} and hence adding it", Id());overriddenInstanceStatusMap.Id(), OverriddenStatus());}}InstanceStatus overriddenStatusFromMap = (Id());if (overriddenStatusFromMap != null) {logger.info("Storing overridden status {} from map", overriddenStatusFromMap);registrant.setOverriddenStatus(overriddenStatusFromMap);}// 获得应用实例最终状态,并设置应用实例的状态// Set the status based on the overridden status rulesInstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);registrant.setStatusWithoutDirty(overriddenInstanceStatus);// 设置租约的开始服务的时间戳(只有第一次有效)// If the lease is registered with UP status, set lease service up timestampif (InstanceStatus.UP.Status())) {lease.serviceUp();}// 设置应用实例信息的操作类型为添加registrant.setActionType(ActionType.ADDED);// 添加到最近租约变更记录队列recentlyChangedQueue.add(new RecentlyChangedItem(lease));// 设置租约的最后更新时间戳registrant.setLastUpdatedTimestamp();// 设置响应缓存过期AppName(), VIPAddress(), SecureVipAddress());logger.info("Registered instance {}/{} with status {} (replication={})",AppName(), Id(), Status(), isReplication);} finally {// 释放锁read.unlock();}}

二、应用实例续约

1、Eureka Client发起续约

Eureka Client向Eureka Server发起注册应用实例成功后获得租约,Eureka Client固定间隔向Eureka Server发起续约(renew),避免租约过期

默认情况下,租约有效期为90秒,续约频率为30秒。两者比例为1:3,保证在网络异常等情况下,有三次重试的机会

1)、初始化定时任务

Eureka Client在初始化过程中,创建心跳线程,固定间隔向Eureka Server发起续约。实现代码如下:

public class DiscoveryClient implements EurekaClient {private void initScheduledTasks() {if (clientConfig.shouldFetchRegistry()) {// registry cache refresh timerint registryFetchIntervalSeconds = RegistryFetchIntervalSeconds();int expBackOffBound = CacheRefreshExecutorExponentialBackOffBound();scheduler.schedule(new TimedSupervisorTask("cacheRefresh",scheduler,cacheRefreshExecutor,registryFetchIntervalSeconds,TimeUnit.SECONDS,expBackOffBound,new CacheRefreshThread()),registryFetchIntervalSeconds, TimeUnit.SECONDS);}// 向EurekaServer心跳执行器if (clientConfig.shouldRegisterWithEureka()) {int renewalIntervalInSecs = LeaseInfo().getRenewalIntervalInSecs();int expBackOffBound = HeartbeatExecutorExponentialBackOffBound();logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);// Heartbeat timerscheduler.schedule(new TimedSupervisorTask("heartbeat",scheduler,heartbeatExecutor,renewalIntervalInSecs,TimeUnit.SECONDS,expBackOffBound,new HeartbeatThread()),renewalIntervalInSecs, TimeUnit.SECONDS);// 创建应用实例状态变更监听器// InstanceInfo replicatorinstanceInfoReplicator = new InstanceInfoReplicator(this,InstanceInfoReplicationIntervalSeconds(),2); // burstSizestatusChangeListener = new ApplicationInfoManager.StatusChangeListener() {@Overridepublic String getId() {return "statusChangeListener";}@Overridepublic void notify(StatusChangeEvent statusChangeEvent) {if (InstanceStatus.DOWN == Status() ||InstanceStatus.DOWN == PreviousStatus()) {// log at warn level if DOWN was involvedlogger.warn("Saw local status change event {}", statusChangeEvent);} else {logger.info("Saw local status change event {}", statusChangeEvent);}DemandUpdate();}};// 注册应用实例状态变更监听器if (clientConfig.shouldOnDemandUpdateStatusChange()) {isterStatusChangeListener(statusChangeListener);}// 开启应用实例信息复制器instanceInfoReplicator.InitialInstanceInfoReplicationIntervalSeconds());} else {logger.info("Not registering with Eureka server per configuration");}}
2)、发起续约
public class DiscoveryClient implements EurekaClient {/*** 最后成功向Eureka Server心跳时间戳*/private volatile long lastSuccessfulHeartbeatTimestamp = -1;private class HeartbeatThread implements Runnable {public void run() {// 调用续约方法if (renew()) {lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();}}}boolean renew() {EurekaHttpResponse<InstanceInfo> httpResponse;try {// 调用AbstractJerseyEurekaHttpClient的sendHeartBeat方法httpResponse = istrationClient.AppName(), Id(), instanceInfo, null);logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, StatusCode());if (StatusCode() == Status.StatusCode()) {REREGISTER_COUNTER.increment();logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, AppName());long timestamp = instanceInfo.setIsDirtyWithTime();boolean success = register();if (success) {instanceInfo.unsetIsDirty(timestamp);}return success;}StatusCode() == StatusCode();} catch (Throwable e) {(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);return false;}}  

AbstractJerseyEurekaHttpClient的renew()方法使用PUT请求调用Eureka Server的apps/${APP_NAME}/${INSTANCE_INFO_ID}接口,参数为status、lastDirtyTimestamp、overriddenstatus,实现续约

2、Eureka Server接收续约

Eureka Server接收续约核心流程如下图:

1)、接收续约请求
@Produces({"application/xml", "application/json"})
public class InstanceResource {@PUTpublic Response renewLease(@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,@QueryParam("overriddenstatus") String overriddenStatus,@QueryParam("status") String status,@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {boolean isFromReplicaNode = "true".equals(isReplication);// 续约boolean isSuccess = Name(), id, isFromReplicaNode);// 续约失败// Not found in the registry, immediately ask for a registerif (!isSuccess) {logger.warn("Not Found (Renew): {} - {}", Name(), id);return Response.status(Status.NOT_FOUND).build();}// 比较InstanceInfo的lastDirtyTimestamp属性// Check if we need to sync based on dirty time stamp, the client// instance might have changed some valueResponse response;if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);// Store the overridden status since the validation found out the node that replicates winsif (Status() == Response.Status.StatusCode()&& (overriddenStatus != null)&& !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))&& isFromReplicaNode) {registry.AppName(), id, InstanceStatus.valueOf(overriddenStatus));}} else {response = Response.ok().build();}logger.debug("Found (Renew): {} - {}; reply status={}", Name(), id, Status());return response;}
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {public boolean renew(final String appName, final String id, final boolean isReplication) {// 续约if (w(appName, id, isReplication)) {// Eureka Server复制replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);return true;}return false;}

PeerAwareInstanceRegistryImpl中调用了父类AbstractInstanceRegistry的renew(...)方法续约实例信息

2)、续约应用实例信息

调用了AbstractInstanceRegistry的renew(...)方法,续约实例信息,代码如下:

public abstract class AbstractInstanceRegistry implements InstanceRegistry {public boolean renew(String appName, String id, boolean isReplication) {// 增加续约次数到监控RENEW.increment(isReplication);// 获取应用名对应的租约Map<String, Lease<InstanceInfo>> gMap = (appName);Lease<InstanceInfo> leaseToRenew = null;if (gMap != null) {leaseToRenew = (id);}// 租约不存在if (leaseToRenew == null) {RENEW_NOT_FOUND.increment(isReplication);logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);return false;} else {InstanceInfo instanceInfo = Holder();if (instanceInfo != null) {// ASGName());InstanceStatus overriddenInstanceStatus = OverriddenInstanceStatus(instanceInfo, leaseToRenew, isReplication);if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"+ "; re-register required", Id());RENEW_NOT_FOUND.increment(isReplication);return false;}if (!Status().equals(overriddenInstanceStatus)) {logger.info("The instance status {} is different from overridden instance status {} for instance {}. "+ "Hence setting the status to overridden status", Status().name(),OverriddenStatus().name(),Id());instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);}}// 新增续租每分钟次数renewsLastMin.increment();// 设置租约最后更新时间w();return true;}}
public class Lease<T> {public void renew() {// 设置租约最后更新时间戳lastUpdateTimestamp = System.currentTimeMillis() + duration;}

续约的整个过程修改租约的过期时间,即使并发请求,也不会对数据的一致性产生影响,因此不需要像注册操作一样加锁

三、应用实例下线

1、Eureka Client发起下线

应用实例关闭时,Eureka Client向Eureka Server发起下线应用实例。需要满足如下条件才可发起:

  • 配置,应用实例开启注册开关。默认为false
  • 配置eureka.shouldUnregisterOnShutdown=true,应用实例开启关闭时下线开关。默认为true
public class DiscoveryClient implements EurekaClient {@PreDestroy@Overridepublic synchronized void shutdown() {if (isShutdownpareAndSet(false, true)) {logger.info("Shutting down DiscoveryClient ...");if (statusChangeListener != null && applicationInfoManager != null) {applicationInfoManager.Id());}cancelScheduledTasks();// If APPINFO was registeredif (applicationInfoManager != null&& clientConfig.shouldRegisterWithEureka() // abled=true&& clientConfig.shouldUnregisterOnShutdown()) { // eureka.shouldUnregisterOnShutdown=trueapplicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);unregister();}if (eurekaTransport != null) {eurekaTransport.shutdown();}heartbeatStalenessMonitor.shutdown();registryStalenessMonitor.shutdown();logger.info("Completed shut down of DiscoveryClient");}}void unregister() {// It can be null if shouldRegisterWithEureka == falseif(eurekaTransport != null && istrationClient != null) {try {logger.info("Unregistering ...");// 调用AbstractJerseyEurekaHttpClient的cancel方法EurekaHttpResponse<Void> httpResponse = istrationClient.AppName(), Id());logger.info(PREFIX + "{} - deregister  status: {}", appPathIdentifier, StatusCode());} catch (Exception e) {(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);}}}
public abstract class AbstractJerseyEurekaHttpClient implements EurekaHttpClient {@Overridepublic EurekaHttpResponse<Void> cancel(String appName, String id) {String urlPath = "apps/" + appName + '/' + id;ClientResponse response = null;try {Builder resourceBuilder = source(serviceUrl).path(urlPath).getRequestBuilder();addExtraHeaders(resourceBuilder);response = resourceBuilder.delete(ClientResponse.class);return Status()).headers(headersOf(response)).build();} finally {if (logger.isDebugEnabled()) {logger.debug("Jersey HTTP DELETE {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : Status());}if (response != null) {response.close();}}}

AbstractJerseyEurekaHttpClient的cancel()方法使用DELETE请求调用Eureka Server的apps/${APP_NAME}/${INSTANCE_INFO_ID}接口,实现应用实例信息的下线

2、Eureka Server接收下线

Eureka Server接收下线请求核心流程如下图:

1)、接收下线请求
@Produces({"application/xml", "application/json"})
public class InstanceResource {@DELETEpublic Response cancelLease(@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {try {// 下线boolean isSuccess = registry.Name(), id,"true".equals(isReplication));if (isSuccess) {logger.debug("Found (Cancel): {} - {}", Name(), id);return Response.ok().build();} else {logger.info("Not Found (Cancel): {} - {}", Name(), id);return Response.status(Status.NOT_FOUND).build();}} catch (Throwable e) {("Error (cancel): {} - {}", Name(), id, e);return Response.serverError().build();}}  
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {@Overridepublic boolean cancel(final String appName, final String id,final boolean isReplication) {// 下线if (super.cancel(appName, id, isReplication)) {// Eureka Server复制replicateToPeers(Action.Cancel, appName, id, null, null, isReplication);synchronized (lock) {if (pectedNumberOfClientsSendingRenews > 0) {// Since the client wants to cancel it, reduce the number of clients to pectedNumberOfClientsSendingRenews = pectedNumberOfClientsSendingRenews - 1;updateRenewsPerMinThreshold();}}return true;}return false;}

PeerAwareInstanceRegistryImpl中调用了父类AbstractInstanceRegistry的cancel(...)方法下线应用实例信息

2)、下线应用实例信息

调用了AbstractInstanceRegistry的cancel(...)方法,下线应用实例信息,代码如下:

public abstract class AbstractInstanceRegistry implements InstanceRegistry {@Overridepublic boolean cancel(String appName, String id, boolean isReplication) {return internalCancel(appName, id, isReplication);}protected boolean internalCancel(String appName, String id, boolean isReplication) {try {// 获得读锁read.lock();// 增加取消注册次数到监控CANCEL.increment(isReplication);Map<String, Lease<InstanceInfo>> gMap = (appName);Lease<InstanceInfo> leaseToCancel = null;// 移除租约映射if (gMap != null) {leaseToCancel = ve(id);}// 添加到最近取消注册的调试队列synchronized (recentCanceledQueue) {recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));}// 移除应用实例覆盖状态映射InstanceStatus instanceStatus = ve(id);if (instanceStatus != null) {logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());}// 租约不存在if (leaseToCancel == null) {// 添加取消注册不存在到监控CANCEL_NOT_FOUND.increment(isReplication);logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);return false;} else {// 设置租约的取消注册时间戳leaseToCancel.cancel();// 添加到最近租约变更记录队列InstanceInfo instanceInfo = Holder();String vip = null;String svip = null;if (instanceInfo != null) {instanceInfo.setActionType(ActionType.DELETED);recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));instanceInfo.setLastUpdatedTimestamp();vip = VIPAddress();svip = SecureVipAddress();}// 设置响应缓存过期invalidateCache(appName, vip, svip);logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);return true;}} finally {// 释放锁read.unlock();}}  
public class Lease<T> {public void cancel() {if (evictionTimestamp <= 0) {// 设置取消注册时间戳evictionTimestamp = System.currentTimeMillis();}}

参考:

Eureka 源码解析 —— 应用实例注册发现(一)之注册

Eureka 源码解析 —— 应用实例注册发现(二)之续租

Eureka 源码解析 —— 应用实例注册发现(三)之下线

本文发布于:2024-01-31 01:15:50,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/170663495324291.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:源码   应用实例   核心   Eureka
留言与评论(共有 0 条评论)
   
验证码:

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23