【Eureka】【09】当Eureka服务器宕机后,注册在上的服务第二次重试到其他节点

1.EurekaHttpClientDecorator是一个装饰者模式

有RetryableEurekaHttpClient、SessionedEurekaHttpClient、
RedirectingEurekaHttpClient、MetricsCollectingEurekaHttpClient四种装饰功能类
RetryableEurekaHttpClient负责重试

2.部分RetryableEurekaHttpClient类代码分析重试过程

public class RetryableEurekaHttpClient extends EurekaHttpClientDecorator {
   
     
	//默认重试3次
	public static final int DEFAULT_NUMBER_OF_RETRIES = 3;
	private final int numberOfRetries;
	
	//AtomicReference类提供了一个可以原子读写的对象引用变量。 
	//原子意味着尝试更改相同AtomicReference的多个线程(例如,使用比较和交换操作)
	//不会使AtomicReference最终达到不一致的状态
	private final AtomicReference<EurekaHttpClient> delegate = new AtomicReference<>();
	
	//保存不可用的eureka服务地址
	private final Set<EurekaEndpoint> quarantineSet = new ConcurrentSkipListSet<>();
	
	@Override
    protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
   
     
        List<EurekaEndpoint> candidateHosts = null;
        int endpointIdx = 0;
        for (int retry = 0; retry < numberOfRetries; retry++) {
   
     
			//0.假设client连接到了Eureka1,Eureka1服务器发生故障
			//1.从缓存的引用中获取,正在运行时currentHttpClient不会为空
			//4.第一次重试,delegate已在第3步清空,所以currentHttpClient为空
			//10.第二次重试,delegate已在第7步清空,所以currentHttpClient为空
            EurekaHttpClient currentHttpClient = delegate.get();
            EurekaEndpoint currentEndpoint = null;
            if (currentHttpClient == null) {
   
     
                if (candidateHosts == null) {
   
     
					//11.获取一个除Eureka1之外的Eureka服务器地址列表
                    candidateHosts = getHostCandidates();
                    if (candidateHosts.isEmpty()) {
   
     
                        throw new TransportException("There is no known eureka server; cluster server list is empty");
                    }
                }
                if (endpointIdx >= candidateHosts.size()) {
   
     
                    throw new TransportException("Cannot execute request on any known server");
                }
				//5.重新获取一个Eureka地址(本次获取的地址还是Eureka1)
				//12.重新获取一个Eureka地址(本次获取的地址不会是Eureka1)
                currentEndpoint = candidateHosts.get(endpointIdx++);
                currentHttpClient = clientFactory.newClient(currentEndpoint);
            }

            try {
   
     
				//2.例如eureka1地址不可以用,以下会抛出异常,本句之后的逻辑不会走
				//6.由于还是获取了Eureka1,所以还是报错的
				//13.一个新的Eureka服务器地址执行
                EurekaHttpResponse<R> response = requestExecutor.execute(currentHttpClient);
                if (serverStatusEvaluator.accept(response.getStatusCode(), requestExecutor.getRequestType())) {
   
     
                    delegate.set(currentHttpClient);
                    if (retry > 0) {
   
     
                        logger.info("Request execution succeeded on retry #{}", retry);
                    }
                    return response;
                }
                logger.warn("Request execution failure with status code {}; retrying on another server if available", response.getStatusCode());
            } catch (Exception e) {
   
     
                logger.warn("Request execution failure", e.getMessage());  // just log message as the underlying client should log the stacktrace
            }

            // Connection error or 5xx from the server that must be retried on another server
			//3.currentHttpClient值被设置为空
			//7.currentHttpClient值被设置为空
            delegate.compareAndSet(currentHttpClient, null);
			//4.currentHttpClient是为空的,quarantineSet不会有值被添加进去
			//8.第5步设置了值,所有currentEndpoint不为空
            if (currentEndpoint != null) {
   
     
			    //9.将eureka1放到不可用集合中quarantineSet
                quarantineSet.add(currentEndpoint);
            }
        }
        throw new TransportException("Retry limit reached; giving up on completing the request");
    }
	
	
	private List<EurekaEndpoint> getHostCandidates() {
   
     
        List<EurekaEndpoint> candidateHosts = clusterResolver.getClusterEndpoints();
		//取交集
        quarantineSet.retainAll(candidateHosts);

        // If enough hosts are bad, we have no choice but start over again
        int threshold = (int) (candidateHosts.size() * transportConfig.getRetryableClientQuarantineRefreshPercentage());
        if (quarantineSet.isEmpty()) {
   
     
            // no-op
        } else if (quarantineSet.size() >= threshold) {
   
     
            logger.debug("Clearing quarantined list of size {}", quarantineSet.size());
            quarantineSet.clear();
        } else {
   
     
            List<EurekaEndpoint> remainingHosts = new ArrayList<>(candidateHosts.size());
            for (EurekaEndpoint endpoint : candidateHosts) {
   
     
                if (!quarantineSet.contains(endpoint)) {
   
     
                    remainingHosts.add(endpoint);
                }
            }
            candidateHosts = remainingHosts;
        }

        return candidateHosts;
    }

}

版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: