前面说了客户端的服务注册任务,当微服务注册到服务中心后是需要向服务注册中心定时发送心跳包,也就是接下来要讲的心跳任务。心跳任务是在DiscoveryClient的initScheduledTasks()方法开启的,renewalIntervalInSecs即心跳任务的时间间隔,是由eureka.instance.lease-renewal-interval-in-seconds配置项指定的。
// DiscoveryClient.java#initScheduledTasks()
scheduler.schedule(new TimedSupervisorTask("heartbeat",scheduler,heartbeatExecutor,renewalIntervalInSecs,TimeUnit.SECONDS,expBackOffBound,new HeartbeatThread()),renewalIntervalInSecs, TimeUnit.SECONDS);
直接看心跳任务吧,看上去很简单,就是定时续约renew(),成功的话就更新lastSuccessfulHeartbeatTimestamp用来记录时间。
// DiscoveryClient.java$HeartbeatThreadpublic void run() {if (renew()) {lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();}}
由于eureka集群中各节点可以相互同步,所以并不是向所有节点发送心跳包,而是向集群中的第一个节点发送心跳包,如果发送成功,则记录下来,下次再向这个节点发送心跳包;如果发送失败了,则更换一个节点重新发送。说下存储节点信息的delegate(AtomicReference)吧,首先保持对象的value是volatile的,保证了变量对其他线程的可见性,也可以通过cas对value进行操作,保证了线程的安全性。
protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {List<EurekaEndpoint> candidateHosts = null;int endpointIdx = 0;// 默认尝试三次for (int retry = 0; retry < numberOfRetries; retry++) {// delegate用来保持发送心跳包的节点信息EurekaHttpClient currentHttpClient = ();EurekaEndpoint currentEndpoint = null;if (currentHttpClient == null) {if (candidateHosts == null) {// 获取集群中所有节点信息candidateHosts = getHostCandidates();if (candidateHosts.isEmpty()) {throw new TransportException("There is no known eureka server; cluster server list is empty");}}if (endpointIdx >= candidateHosts.size()) {throw new TransportException("Cannot execute request on any known server");}// 从集群中获取一个节点currentEndpoint = (endpointIdx++);currentHttpClient = wClient(currentEndpoint);}try {EurekaHttpResponse<R> response = ute(currentHttpClient);if (serverStatusEvaluator.StatusCode(), RequestType())) {// 如果集群中的eureka节点能够响应,则将这个节点记录下来delegate.set(currentHttpClient);if (retry > 0) {logger.info("Request execution succeeded on retry #{}", retry);}return response;}logger.warn("Request execution failure with status code {}; retrying on another server if available", StatusCode());} catch (Exception e) {logger.warn("Request execution failed with message: {}", e.getMessage()); // just log message as the underlying client should log the stacktrace}// Connection error or 5xx from the server that must be retried on another server// 将当前不能处理心跳的节点清掉,重新选择节点发送心跳包delegatepareAndSet(currentHttpClient, null);if (currentEndpoint != null) {quarantineSet.add(currentEndpoint);}}throw new TransportException("Retry limit reached; giving up on completing the request");}
方法的调用栈很深很绕,最终是来得AbstractJerseyEurekaHttpClient的sendHeartBeat(),就是在这个方法里向服务端发送心跳包的。就是先拼接urlPath,再用jersey发http请求,获取服务端响应。
// AbstractJerseyEurekaHttpClient.javapublic EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) {String urlPath = "apps/" + appName + '/' + id;ClientResponse response = null;try {WebResource webResource = source(serviceUrl).path(urlPath).queryParam("status", Status().toString()).queryParam("lastDirtyTimestamp", LastDirtyTimestamp().toString());if (overriddenStatus != null) {webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name());}Builder requestBuilder = RequestBuilder();addExtraHeaders(requestBuilder);response = requestBuilder.put(ClientResponse.class);EurekaHttpResponseBuilder<InstanceInfo> eurekaResponseBuilder = Status(), InstanceInfo.class).headers(headersOf(response));if (response.hasEntity()) {Entity(InstanceInfo.class));}return eurekaResponseBuilder.build();} finally {if (logger.isDebugEnabled()) {logger.debug("Jersey HTTP PUT {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : Status());}if (response != null) {response.close();}}}
服务端收到客户端发过来的续约请求后会进行处理,在InstanceResource.java的renewLease()方法中进行处理的。
public Response renewLease(@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,@QueryParam("overriddenstatus") String overriddenStatus,@QueryParam("status") String status,@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {boolean isFromReplicaNode = "true".equals(isReplication);// 处理续约boolean isSuccess = Name(), id, isFromReplicaNode);// Not found in the registry, immediately ask for a registerif (!isSuccess) {logger.warn("Not Found (Renew): {} - {}", Name(), id);return Response.status(Status.NOT_FOUND).build();}// Check if we need to sync based on dirty time stamp, the client// instance might have changed some valueResponse response;if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {// 判断客户端发过来的lastDirtyTimestamp和服务端保存的是否相等// 如果不相等就更新实例信息response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);// Store the overridden status since the validation found out the node that replicates winsif (Status() == Response.Status.StatusCode()&& (overriddenStatus != null)&& !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))&& isFromReplicaNode) {registry.AppName(), id, InstanceStatus.valueOf(overriddenStatus));}} else {response = Response.ok().build();}logger.debug("Found (Renew): {} - {}; reply status={}", Name(), id, Status());return response;}
根据服务名和实例id找到存储在服务端的InstanceInfo实例对象,发布事件EurekaInstanceRenewedEvent,然后进行续约处理,如果续约成功则同步到集群中各个节点。
public boolean renew(final String appName, final String id, final boolean isReplication) {if (w(appName, id, isReplication)) {replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);return true;}return false;}
获取续约信息,更新续约信息状态InstanceStatus
public boolean renew(String appName, String id, boolean isReplication) {RENEW.increment(isReplication);Map<String, Lease<InstanceInfo>> gMap = (appName);Lease<InstanceInfo> leaseToRenew = null;if (gMap != null) {leaseToRenew = (id);}if (leaseToRenew == null) {RENEW_NOT_FOUND.increment(isReplication);logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);return false;} else {InstanceInfo instanceInfo = Holder();if (instanceInfo != null) {// ASGName());InstanceStatus overriddenInstanceStatus = OverriddenInstanceStatus(instanceInfo, leaseToRenew, isReplication);if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"+ "; re-register required", Id());RENEW_NOT_FOUND.increment(isReplication);return false;}if (!Status().equals(overriddenInstanceStatus)) {logger.info("The instance status {} is different from overridden instance status {} for instance {}. "+ "Hence setting the status to overridden status", Status().name(),OverriddenStatus().name(),Id());instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);}}renewsLastMin.increment();w();return true;}}
需要的话在集群间同步
private void replicateToPeers(Action action, String appName, String id,InstanceInfo info /* optional */,InstanceStatus newStatus /* optional */, boolean isReplication) {Stopwatch tracer = Timer().start();try {if (isReplication) {numberOfReplicationsLastMin.increment();}// If it is a replication already, do not replicate again as this will create a poison replicationif (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {return;}for (final PeerEurekaNode node : PeerEurekaNodes()) {// If the url represents this host, do not replicate to yourself.if (peerEurekaNodes.ServiceUrl())) {continue;}replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);}} finally {tracer.stop();}}
总结下吧,心跳任务是由客户端发起的,客户端会将注册的服务应用名和id发送给服务端,服务端设置实例状态,更新保存的实例信息,如果需要的话就会在集群间同步
本文发布于:2024-01-31 01:17:49,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/170663507224302.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |