// 创建 应用实例状态变更监听器 statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return"statusChangeListener"; }
@Override public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { // log at warn level if DOWN was involved logger.warn("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } instanceInfoReplicator.onDemandUpdate(); } };
// 注册 应用实例状态变更监听器 if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); }
// InstanceInfoReplicator.java public boolean onDemandUpdate() { if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) { // 限流相关,跳过 scheduler.submit(new Runnable() { @Override public void run() { logger.debug("Executing on-demand update of local InstanceInfo"); // 取消任务 Future latestPeriodic = scheduledPeriodicRef.get(); if (latestPeriodic != null && !latestPeriodic.isDone()) { logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update"); latestPeriodic.cancel(false); } // 再次调用 InstanceInfoReplicator.this.run(); } }); returntrue; } else { logger.warn("Ignoring onDemand update due to rate limiter"); returnfalse; } }
// ApplicationInfoManager.java public void refreshDataCenterInfoIfRequired() { // hostname String existingAddress = instanceInfo.getHostName(); String newAddress; if (config instanceof RefreshableInstanceConfig) { // Refresh data center info, and return up to date address newAddress = ((RefreshableInstanceConfig) config).resolveDefaultAddress(true); } else { newAddress = config.getHostName(true); } // ip String newIp = config.getIpAddress(); if (newAddress != null && !newAddress.equals(existingAddress)) { logger.warn("The address changed from : {} => {}", existingAddress, newAddress); // :( in the legacy code here the builder is acting as a mutator. // This is hard to fix as this same instanceInfo instance is referenced elsewhere. // We will most likely re-write the client at sometime so not fixing for now. InstanceInfo.Builder builder = new InstanceInfo.Builder(instanceInfo); builder.setHostName(newAddress) // hostname .setIPAddr(newIp) // ip .setDataCenterInfo(config.getDataCenterInfo()); // dataCenterInfo instanceInfo.setIsDirty(); } }
1: public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) { 2: try { 3: // 获取读锁 4: read.lock(); 5: Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName()); 6: // 增加 注册次数 到 监控 7: REGISTER.increment(isReplication); 8: // 获得 应用实例信息 对应的 租约 9: if (gMap == null) { 10: final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>(); 11: gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap); // 添加 应用 12: if (gMap == null) { // 添加 应用 成功 13: gMap = gNewMap; 14: } 15: } 16: Lease<InstanceInfo> existingLease = gMap.get(registrant.getId()); 17: // Retain the last dirty timestamp without overwriting it, if there is already a lease 18: if (existingLease != null && (existingLease.getHolder() != null)) { // 已存在时,使用数据不一致的时间大的应用注册信息为有效的 19: Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp(); // Server 注册的 InstanceInfo 20: Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp(); // Client 请求的 InstanceInfo 21: logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); 22: 23: // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted 24: // InstanceInfo instead of the server local copy. 25: if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) { 26: logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" + 27: " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); 28: logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant"); 29: registrant = existingLease.getHolder(); 30: } 31: } else { 32: // The lease does not exist and hence it is a new registration 33: // 【自我保护机制】增加 `numberOfRenewsPerMinThreshold` 、`expectedNumberOfRenewsPerMin` 34: synchronized (lock) { 35: if (this.expectedNumberOfRenewsPerMin > 0) { 36: // Since the client wants to cancel it, reduce the threshold 37: // (1 38: // for 30 seconds, 2 for a minute) 39: this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2; 40: this.numberOfRenewsPerMinThreshold = 41: (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold()); 42: } 43: } 44: logger.debug("No previous lease information found; it is new registration"); 45: } 46: // 创建 租约 47: Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration); 48: if (existingLease != null) { // 若租约已存在,设置 租约的开始服务的时间戳 49: lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp()); 50: } 51: // 添加到 租约映射 52: gMap.put(registrant.getId(), lease); 53: // 添加到 最近注册的调试队列 54: synchronized (recentRegisteredQueue) { 55: recentRegisteredQueue.add(new Pair<Long, String>( 56: System.currentTimeMillis(), 57: registrant.getAppName() + "(" + registrant.getId() + ")")); 58: } 59: // 添加到 应用实例覆盖状态映射(Eureka-Server 初始化使用) 60: // This is where the initial state transfer of overridden status happens 61: if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) { 62: logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the " 63: + "overrides", registrant.getOverriddenStatus(), registrant.getId()); 64: if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) { 65: logger.info("Not found overridden id {} and hence adding it", registrant.getId()); 66: overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus()); 67: } 68: } 69: InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId()); 70: if (overriddenStatusFromMap != null) { 71: logger.info("Storing overridden status {} from map", overriddenStatusFromMap); 72: registrant.setOverriddenStatus(overriddenStatusFromMap); 73: } 74: 75: // 获得应用实例最终状态,并设置应用实例的状态 76: // Set the status based on the overridden status rules 77: InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication); 78: registrant.setStatusWithoutDirty(overriddenInstanceStatus); 79: 80: // 设置 租约的开始服务的时间戳(只有第一次有效) 81: // If the lease is registered with UP status, set lease service up timestamp 82: if (InstanceStatus.UP.equals(registrant.getStatus())) { 83: lease.serviceUp(); 84: } 85: // 设置 应用实例信息的操作类型 为 添加 86: registrant.setActionType(ActionType.ADDED); 87: // 添加到 最近租约变更记录队列 88: recentlyChangedQueue.add(new RecentlyChangedItem(lease)); 89: // 设置 租约的最后更新时间戳 90: registrant.setLastUpdatedTimestamp(); 91: // 设置 响应缓存 过期 92: invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress()); 93: logger.info("Registered instance {}/{} with status {} (replication={})", 94: registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication); 95: } finally { 96: // 释放锁 97: read.unlock(); 98: } 99: }