淘先锋技术网

首页 1 2 3 4 5 6 7

系列文章目录

springCloud实践
springCloud实践之浅谈Feign原理
springCloud-Eureka(一)
springCloud-Eureka—服务注册与服务续约(二)
springCloud-Eureka—服务同步与剔除(三)


一、服务同步

服务的同步就比较简单了,只是不同节点间的http调用,实现代码如下:

private void replicateToPeers(Action action, String appName, String id,
                                  InstanceInfo info /* optional */,
                                  InstanceStatus newStatus /* optional */, boolean isReplication) {
        Stopwatch tracer = action.getTimer().start();
        try {
            if (isReplication) {
                numberOfReplicationsLastMin.increment();
            }
            // If it is a replication already, do not replicate again as this will create a poison replication
            if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
                return;
            }

            for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
                // If the url represents this host, do not replicate to yourself.
                if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                    continue;
                }
                //具体执行同步方法
                replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
            }
        } finally {
            tracer.stop();
        }
    }


	
	/**
     * Replicates all instance changes to peer eureka nodes except for
     * replication traffic to this node.
     * 复制所有的实例修改除了同伴尤里卡节点, 复制这个节点的流量
     */
	
	private void replicateInstanceActionsToPeers(Action action, String appName,
	                                                 String id, InstanceInfo info, InstanceStatus newStatus,
	                                                 PeerEurekaNode node) {
	        try {
	            InstanceInfo infoFromRegistry = null;
	            CurrentRequestVersion.set(Version.V2);
	            switch (action) {
	                case Cancel:
	                    node.cancel(appName, id);
	                    break;
	                case Heartbeat:
	                    InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
	                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
	                    node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
	                    break;
	                case Register:
	                    node.register(info);
	                    break;
	                case StatusUpdate:
	                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
	                    node.statusUpdate(appName, id, newStatus, infoFromRegistry);
	                    break;
	                case DeleteStatusOverride:
	                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
	                    node.deleteStatusOverride(appName, id, infoFromRegistry);
	                    break;
	            }
	        } catch (Throwable t) {
	            logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
	        }
	    }
由于集群间的同步复制是通过Http方式进行,基于网络的不可靠性,集群中的Eureka Server间的注册表难免存在不同步的时间节点,不满足CAP中的C(数据一致性)。

二、服务剔除

    public void evict(long additionalLeaseMs) {
        logger.debug("Running the evict task");
		//服务的自我保护策略判断,决定是否剔除服务,防止网络分区问题
        if (!isLeaseExpirationEnabled()) {
            logger.debug("DS: lease expiration is currently disabled.");
            return;
        }

        // We collect first all expired items, to evict them in random order. For large eviction sets,
        // if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
        // the impact should be evenly distributed across all applications.
        //遍历服务注册表获取租约到期服务列表
        List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
        for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
            Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
            if (leaseMap != null) {
                for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                    Lease<InstanceInfo> lease = leaseEntry.getValue();
                    if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                        expiredLeases.add(lease);
                    }
                }
            }
        }

        // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
        // triggering self-preservation. Without that we would wipe out full registry.
        //获取注册表租约总数
        int registrySize = (int) getLocalRegistrySize();
        //计算注册表租约的阈值(总数 * 续租百分比 默认85%),得到要续租的数量
        int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
        int evictionLimit = registrySize - registrySizeThreshold;
		//实际剔除的数量 = min(实际租期到期服务实例个数,理论剔除数量)
        int toEvict = Math.min(expiredLeases.size(), evictionLimit);
        if (toEvict > 0) {
            logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);

            Random random = new Random(System.currentTimeMillis());
            for (int i = 0; i < toEvict; i++) {
                // Pick a random item (Knuth shuffle algorithm)
                int next = i + random.nextInt(expiredLeases.size() - i);
                Collections.swap(expiredLeases, i, next);
                Lease<InstanceInfo> lease = expiredLeases.get(i);

                String appName = lease.getHolder().getAppName();
                String id = lease.getHolder().getId();
                EXPIRED.increment();
                logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
                internalCancel(appName, id, false);
            }
        }
    }

	/**
	*	自我保护机制判断
	*/
    @Override
    public boolean isLeaseExpirationEnabled() {
    	//是否开通自我保护机制
        if (!isSelfPreservationModeEnabled()) {
            // The self preservation mode is disabled, hence allowing the instances to expire.
            return true;
        }
        //续约数量 > 阈值 就可以剔除
        return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;
    }

总结Eureka事件

EurekaInstanceCanceledEvent       服务下线事件
EurekaInstanceRegisteredEvent     服务注册事件
EurekaInstanceRenewedEvent       服务续约事件
EurekaRegistryAvailableEvent        注册中心可用事件
EurekaServerStartedEvent             注册中心启动