1.Eureka Client客户端
1.1 Application上配置@EnableDiscoveryClient,引入了EnableDiscoveryClientImportSelector
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
//引入EnableDiscoveryClientImportSelector
@Import(EnableDiscoveryClientImportSelector.class)
public @interface EnableDiscoveryClient {
}
1.2EnableDiscoveryClientImportSelector继承了SpringFactoryImportSelector,并指定泛型EnableDiscoveryClient,SpringFactoryImportSelector的selectImports方法中返回了org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration
所有实现ImportSelector的类,都会在启动时被ConfigurationClassParser中的processImports进行实例化,并执行selectImports方法
SpringFactoryImportSelector是spring-cloud-commons-1.1.0.RELEASE-sources.jar包中的一个抽象类,
主要作用是检查泛型T是否有指定的factory实现, 即spring.factories中有对应类的配置.
spring-cloud-netflix-eureka-client-1.1.0.RELEASE-sources.jar包的/META-INF/spring.factories中有配置
同样的方式可以看到EnableAutoConfiguration会用到EurekaClientAutoConfiguration
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaClientConfigServerAutoConfiguration,\
org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceAutoConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration,\
org.springframework.cloud.netflix.ribbon.eureka.RibbonEurekaAutoConfiguration
org.springframework.cloud.bootstrap.BootstrapConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceBootstrapConfiguration
org.springframework.cloud.client.discovery.EnableDiscoveryClient=\
org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration
@Order(Ordered.LOWEST_PRECEDENCE - 100)
public class EnableDiscoveryClientImportSelector
extends SpringFactoryImportSelector<EnableDiscoveryClient> {
@Override
public String[] selectImports(AnnotationMetadata metadata) {
if (!isEnabled()) {
return new String[0];
}
AnnotationAttributes attributes = AnnotationAttributes.fromMap(
metadata.getAnnotationAttributes(this.annotationClass.getName(), true));
Assert.notNull(attributes, "No " + getSimpleName() + " attributes found. Is "
+ metadata.getClassName() + " annotated with @" + getSimpleName() + "?");
// Find all possible auto configuration classes, filtering duplicates
List<String> factories = new ArrayList<>(new LinkedHashSet<>(SpringFactoriesLoader
.loadFactoryNames(this.annotationClass, this.beanClassLoader)));
if (factories.isEmpty() && !hasDefaultFactory()) {
throw new IllegalStateException("Annotation @" + getSimpleName()
+ " found, but there are no implementations. Did you forget to include a starter?");
}
if (factories.size() > 1) {
// there should only ever be one DiscoveryClient, but there might be more than
// one factory
log.warn("More than one implementation " + "of @" + getSimpleName()
+ " (now relying on @Conditionals to pick one): " + factories);
}
return factories.toArray(new String[factories.size()]);
}
1.3EurekaDiscoveryClientConfiguration实现了SmartLifecycle,SmartLifecycle再所有bean初始化完成后调用start方法,并且注册了ApplicationInfoManager和EurekaClient
在applicationInfoManager和eurekaClient被使用的时候将会触发EurekaClientConfiguration的eurekaApplicationInfoManager和DiscoveryClient的初始化。
/**
* @author Dave Syer
* @author Spencer Gibb
* @author Jon Schneider
* @author Jakub Narloch
*/
@Configuration
@EnableConfigurationProperties
@ConditionalOnClass(EurekaClientConfig.class)
@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
@CommonsLog
public class EurekaDiscoveryClientConfiguration implements SmartLifecycle, Ordered {
private AtomicBoolean running = new AtomicBoolean(false);
private int order = 0;
private AtomicInteger port = new AtomicInteger(0);
@Autowired
private CloudEurekaInstanceConfig instanceConfig;
@Autowired(required = false)
private HealthCheckHandler healthCheckHandler;
@Autowired
private ApplicationContext context;
@Autowired
private ApplicationInfoManager applicationInfoManager;
@Autowired
private EurekaClient eurekaClient;
//所有的bean初始化完成后调用
@Override
public void start() {
// only set the port if the nonSecurePort is 0 and this.port != 0
if (this.port.get() != 0 && this.instanceConfig.getNonSecurePort() == 0) {
this.instanceConfig.setNonSecurePort(this.port.get());
}
// only initialize if nonSecurePort is greater than 0 and it isn't already running
// because of containerPortInitializer below
if (!this.running.get() && this.instanceConfig.getNonSecurePort() > 0) {
maybeInitializeClient();
if (log.isInfoEnabled()) {
log.info("Registering application " + this.instanceConfig.getAppname()
+ " with eureka with status "
+ this.instanceConfig.getInitialStatus());
}
//3.修改了实例的状态为InitialStatus,触发DiscoveryClient,ApplicationInfoManager.StatusChangeListener()的notify方法。
this.applicationInfoManager
.setInstanceStatus(this.instanceConfig.getInitialStatus());
if (this.healthCheckHandler != null) {
this.eurekaClient.registerHealthCheck(this.healthCheckHandler);
}
this.context.publishEvent(
new InstanceRegisteredEvent<>(this, this.instanceConfig));
this.running.set(true);
}
}
private void maybeInitializeClient() {
// force initialization of possibly scoped proxies
// 1.触发EurekaClientConfiguration的eurekaApplicationInfoManager初始化,新建InstanceInfo,设置初始化状态为STARTING
this.applicationInfoManager.getInfo();
// 2.触发 DiscoveryClient的初始化,新建了ApplicationInfoManager.StatusChangeListener()状态变化监听器。并且启动了InstanceInfoReplicator的定时任务。
this.eurekaClient.getApplications();
}
1.3.1 this.applicationInfoManager.getInfo() 触发EurekaClientConfiguration的eurekaApplicationInfoManager初始化,新建InstanceInfo,设置初始化状态为STARTING
@Configuration
@ConditionalOnMissingRefreshScope
protected static class EurekaClientConfiguration {
@Autowired
private ApplicationContext context;
@Autowired(required = false)
private DiscoveryClientOptionalArgs optionalArgs;
@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
public EurekaClient eurekaClient(ApplicationInfoManager manager,
EurekaClientConfig config) {
return new CloudEurekaClient(manager, config, this.optionalArgs, this.context);
}
@Bean
@ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT)
public ApplicationInfoManager eurekaApplicationInfoManager(
EurekaInstanceConfig config) {
//新建InstanceInfo
InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
return new ApplicationInfoManager(config, instanceInfo);
}
}
@CommonsLog
public class InstanceInfoFactory {
public InstanceInfo create(EurekaInstanceConfig config) {
LeaseInfo.Builder leaseInfoBuilder = LeaseInfo.Builder.newBuilder()
.setRenewalIntervalInSecs(config.getLeaseRenewalIntervalInSeconds())
.setDurationInSecs(config.getLeaseExpirationDurationInSeconds());
// Builder the instance information to be registered with eureka
// server
InstanceInfo.Builder builder = InstanceInfo.Builder.newBuilder();
String namespace = config.getNamespace();
if (!namespace.endsWith(".")) {
namespace = namespace + ".";
}
builder.setNamespace(namespace).setAppName(config.getAppname())
.setInstanceId(config.getInstanceId())
.setAppGroupName(config.getAppGroupName())
.setDataCenterInfo(config.getDataCenterInfo())
.setIPAddr(config.getIpAddress()).setHostName(config.getHostName(false))
.setPort(config.getNonSecurePort())
.enablePort(InstanceInfo.PortType.UNSECURE,
config.isNonSecurePortEnabled())
.setSecurePort(config.getSecurePort())
.enablePort(InstanceInfo.PortType.SECURE, config.getSecurePortEnabled())
.setVIPAddress(config.getVirtualHostName())
.setSecureVIPAddress(config.getSecureVirtualHostName())
.setHomePageUrl(config.getHomePageUrlPath(), config.getHomePageUrl())
.setStatusPageUrl(config.getStatusPageUrlPath(),
config.getStatusPageUrl())
.setHealthCheckUrls(config.getHealthCheckUrlPath(),
config.getHealthCheckUrl(), config.getSecureHealthCheckUrl())
.setASGName(config.getASGName());
// Start off with the STARTING state to avoid traffic
if (!config.isInstanceEnabledOnit()) {
//InstanceInfo将InstanceInfo的状态设置为STARTING
InstanceInfo.InstanceStatus initialStatus = InstanceInfo.InstanceStatus.STARTING;
if (log.isInfoEnabled()) {
log.info("Setting initial instance status as: " + initialStatus);
}
builder.setStatus(initialStatus);
}
else {
if (log.isInfoEnabled()) {
log.info("Setting initial instance status as: "
+ InstanceInfo.InstanceStatus.UP
+ ". This may be too early for the instance to advertise itself as available. "
+ "You would instead want to control this via a healthcheck handler.");
}
}
// Add any user-specific metadata information
for (Map.Entry<String, String> mapEntry : config.getMetadataMap().entrySet()) {
String key = mapEntry.getKey();
String value = mapEntry.getValue();
builder.add(key, value);
}
InstanceInfo instanceInfo = builder.build();
instanceInfo.setLeaseInfo(leaseInfoBuilder.build());
return instanceInfo;
}
}
1.3.2 this.eurekaClient.getApplications(),触发 DiscoveryClient的初始化,新建了ApplicationInfoManager.StatusChangeListener()状态变化监听器。并且启动了InstanceInfoReplicator的定时任务。将InstanceInfo的脏数据标识设置ture
/**
* Initializes all scheduled tasks.
*/
private void initScheduledTasks() {
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);
// Heartbeat timer
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
// InstanceInfo replicator
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
//新建了ApplicationInfoManager.StatusChangeListener()状态变化监听器
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
//状态变化时触发notify方法。
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
//启动一个状态注册定时器。将InstanceInfo的脏数据标识设置ture
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
class InstanceInfoReplicator implements Runnable {
public void start(int initialDelayMs) {
if (started.compareAndSet(false, true)) {
instanceInfo.setIsDirty(); // for initial register
scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
}
}
}
1.3.3this.applicationInfoManager.setInstanceStatus(this.instanceConfig.getInitialStatus());
修改了实例的状态为InitialStatus,触发DiscoveryClient中ApplicationInfoManager.StatusChangeListener()的notify方法。
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
1.4.ApplicationInfoManager.StatusChangeListener()的notify方法中调用了InstanceInfoReplicator的onDemandUpdate方法。
onDemandUpdate将脏数据标识设置为true,调用run方法进行注册
Eureka Client的InstanceInfo注册是通过 InstanceInfoReplicator来实现的,注册有两个途径
途径一:InstanceInfo状态发生变化时触发InstanceInfoReplicator的onDemandUpdate方法。
途径二: Eureka Client启动时,通过DiscoveryClient启动一个InstanceInfoReplicator的定时任务,可用于补偿启动时注册失败的漏洞。
class InstanceInfoReplicator implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(InstanceInfoReplicator.class);
private final DiscoveryClient discoveryClient;
private final InstanceInfo instanceInfo;
private final int replicationIntervalSeconds;
private final ScheduledExecutorService scheduler;
private final AtomicReference<Future> scheduledPeriodicRef;
private final AtomicBoolean started;
private final RateLimiter rateLimiter;
private final int burstSize;
private final int allowedRatePerMinute;
InstanceInfoReplicator(DiscoveryClient discoveryClient, InstanceInfo instanceInfo, int replicationIntervalSeconds, int burstSize) {
this.discoveryClient = discoveryClient;
this.instanceInfo = instanceInfo;
this.scheduler = Executors.newScheduledThreadPool(1,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-InstanceInfoReplicator-%d")
.setDaemon(true)
.build());
this.scheduledPeriodicRef = new AtomicReference<Future>();
this.started = new AtomicBoolean(false);
this.rateLimiter = new RateLimiter(TimeUnit.MINUTES);
this.replicationIntervalSeconds = replicationIntervalSeconds;
this.burstSize = burstSize;
this.allowedRatePerMinute = 60 * this.burstSize / this.replicationIntervalSeconds;
logger.info("InstanceInfoReplicator onDemand update allowed rate per min is {}", allowedRatePerMinute);
}
public void start(int initialDelayMs) {
if (started.compareAndSet(false, true)) {
instanceInfo.setIsDirty(); // for initial register
scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
}
}
public void stop() {
scheduler.shutdownNow();
started.set(false);
}
public boolean onDemandUpdate() {
if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
scheduler.submit(new Runnable() {
@Override
public void run() {
logger.debug("Executing on-demand update of local InstanceInfo");
// cancel the latest scheduled update, it will be rescheduled at the end of run()
Future latestPeriodic = scheduledPeriodicRef.get();
if (latestPeriodic != null && !latestPeriodic.isDone()) {
latestPeriodic.cancel(false);
}
InstanceInfoReplicator.this.run();
}
});
return true;
} else {
logger.warn("Ignoring onDemand update due to rate limiter");
return false;
}
}
public void run() {
try {
discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
//注册功能
discoveryClient.register();
//如果注册成功会将脏数据标志设置为false,下次不会重新注册,如果注册失败,将会在下一次定时任务中进行再次注册。
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
}
1.5调用discoveryClient.register(),再调用EurekaTransport的registrationClient进行注册,EurekaTransport是DiscoveryClient初始化的时候new出来的
/**
* Register with the eureka service by making the appropriate REST call.
*/
boolean register() throws Throwable {
logger.info(PREFIX + appPathIdentifier + ": registering service...");
EurekaHttpResponse<Void> httpResponse;
try {
//使用EurekaTransport的registrationClient进行注册
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == 204;
}
@Override
public EurekaHttpResponse<Void> register(InstanceInfo info) {
String urlPath = "apps/" + info.getAppName();
ClientResponse response = null;
try {
Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
addExtraHeaders(resourceBuilder);
response = resourceBuilder
.header("Accept-Encoding", "gzip")
.type(MediaType.APPLICATION_JSON_TYPE)
.accept(MediaType.APPLICATION_JSON)
.post(ClientResponse.class, info);
return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}
//EurekaTransport的registrationClient方法做初始化
private static final class EurekaTransport {
private ClosableResolver bootstrapResolver;
private TransportClientFactory transportClientFactory;
private EurekaHttpClient registrationClient;
private EurekaHttpClientFactory registrationClientFactory;
private EurekaHttpClient queryClient;
private EurekaHttpClientFactory queryClientFactory;
}
//EurekaTransport是DiscoveryClient初始化的时候new出来的,scheduleServerEndpointTask初始化registrationClient
private final EurekaTransport eurekaTransport;
@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, DiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider) {
eurekaTransport = new EurekaTransport();
scheduleServerEndpointTask(eurekaTransport, args);
private void scheduleServerEndpointTask(EurekaTransport eurekaTransport,
DiscoveryClientOptionalArgs args) {
if (clientConfig.shouldRegisterWithEureka()) {
EurekaHttpClientFactory newRegistrationClientFactory = null;
EurekaHttpClient newRegistrationClient = null;
try {
newRegistrationClientFactory = EurekaHttpClients.registrationClientFactory(
eurekaTransport.bootstrapResolver,
eurekaTransport.transportClientFactory,
transportConfig
);
//通过一系列调用返回一个RetryableEurekaHttpClient
newRegistrationClient = newRegistrationClientFactory.newClient();
} catch (Exception e) {
logger.warn("Experimental transport initialization failure", e);
}
eurekaTransport.registrationClientFactory = newRegistrationClientFactory;
eurekaTransport.registrationClient = newRegistrationClient;
}
1.6调用RetryableEurekaHttpClient的execute方法进行注册
private final AtomicReference<EurekaHttpClient> delegate = new AtomicReference<>();
@Override
protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
List<EurekaEndpoint> candidateHosts = null;
int endpointIdx = 0;
//默认重试3次
for (int retry = 0; retry < numberOfRetries; retry++) {
//从delegate中获取EurekaHttpClient,如果为null,从getHostCandidates()中获取
EurekaHttpClient currentHttpClient = delegate.get();
EurekaEndpoint currentEndpoint = null;
if (currentHttpClient == null) {
if (candidateHosts == null) {
candidateHosts = getHostCandidates();
if (candidateHosts.isEmpty()) {
throw new TransportException("There is no known eureka server; cluster server list is empty");
}
}
if (endpointIdx >= candidateHosts.size()) {
throw new TransportException("Cannot execute request on any known server");
}
currentEndpoint = candidateHosts.get(endpointIdx++);
currentHttpClient = clientFactory.newClient(currentEndpoint);
}
try {
EurekaHttpResponse<R> response = requestExecutor.execute(currentHttpClient);
if (serverStatusEvaluator.accept(response.getStatusCode(), requestExecutor.getRequestType())) {
//如果可用,将currentHttpClient放到delegate中
delegate.set(currentHttpClient);
if (retry > 0) {
logger.info("Request execution succeeded on retry #{}", retry);
}
return response;
}
logger.warn("Request execution failure with status code {}; retrying on another server if available", response.getStatusCode());
} catch (Exception e) {
logger.warn("Request execution failure", e.getMessage()); // just log message as the underlying client should log the stacktrace
}
// Connection error or 5xx from the server that must be retried on another server
//如果请求失败了,将delegate设置为空
delegate.compareAndSet(currentHttpClient, null);
if (currentEndpoint != null) {
//quarantineSet中记录请求失败的currentEndpoint
quarantineSet.add(currentEndpoint);
}
}
//重试了三次都没有成功,本次请求失败
throw new TransportException("Retry limit reached; giving up on completing the request");
}
private List<EurekaEndpoint> getHostCandidates() {
//candidateHosts的顺序是根据ip地址随机生成的,根据启动日志可以看到ZoneAffinityClusterResolver处理了candidateHosts顺序
List<EurekaEndpoint> candidateHosts = clusterResolver.getClusterEndpoints();
//请求失败的集合和所有的取交集
quarantineSet.retainAll(candidateHosts);
// If enough hosts are bad, we have no choice but start over again
//(int)(3 * 0.66) = 1
int threshold = (int) (candidateHosts.size() * transportConfig.getRetryableClientQuarantineRefreshPercentage());
if (quarantineSet.isEmpty()) {
// no-op
} else if (quarantineSet.size() >= threshold) {
logger.debug("Clearing quarantined list of size {}", quarantineSet.size());
quarantineSet.clear();
} else {
List<EurekaEndpoint> remainingHosts = new ArrayList<>(candidateHosts.size());
for (EurekaEndpoint endpoint : candidateHosts) {
if (!quarantineSet.contains(endpoint)) {
remainingHosts.add(endpoint);
}
}
candidateHosts = remainingHosts;
}
return candidateHosts;
}
@Monitor(name = METRIC_TRANSPORT_PREFIX + "quarantineSize",
description = "number of servers quarantined", type = DataSourceType.GAUGE)
public long getQuarantineSetSize() {
return quarantineSet.size();
}
1.7ZoneAffinityClusterResolver处理了candidateHosts顺序逻辑
public class ZoneAffinityClusterResolver implements ClusterResolver<AwsEndpoint> {
private static final Logger logger = LoggerFactory.getLogger(ZoneAffinityClusterResolver.class);
private final ClusterResolver<AwsEndpoint> delegate;
private final String myZone;
private final boolean zoneAffinity;
/**
* A zoneAffinity defines zone affinity (true) or anti-affinity rules (false).
*/
public ZoneAffinityClusterResolver(ClusterResolver<AwsEndpoint> delegate, String myZone, boolean zoneAffinity) {
this.delegate = delegate;
this.myZone = myZone;
this.zoneAffinity = zoneAffinity;
}
@Override
public String getRegion() {
return delegate.getRegion();
}
@Override
public List<AwsEndpoint> getClusterEndpoints() {
List<AwsEndpoint>[] parts = ResolverUtils.splitByZone(delegate.getClusterEndpoints(), myZone);
List<AwsEndpoint> myZoneEndpoints = parts[0];
List<AwsEndpoint> remainingEndpoints = parts[1];
List<AwsEndpoint> randomizedList = randomizeAndMerge(myZoneEndpoints, remainingEndpoints);
if (!zoneAffinity) {
Collections.reverse(randomizedList);
}
if (logger.isDebugEnabled()) {
logger.debug("Local zone={}; resolved to: {}", myZone, randomizedList);
}
return randomizedList;
}
private static List<AwsEndpoint> randomizeAndMerge(List<AwsEndpoint> myZoneEndpoints, List<AwsEndpoint> remainingEndpoints) {
if (myZoneEndpoints.isEmpty()) {
return ResolverUtils.randomize(remainingEndpoints);
}
if (remainingEndpoints.isEmpty()) {
return ResolverUtils.randomize(myZoneEndpoints);
}
List<AwsEndpoint> mergedList = ResolverUtils.randomize(myZoneEndpoints);
mergedList.addAll(ResolverUtils.randomize(remainingEndpoints));
return mergedList;
}
}
public final class ResolverUtils {
/**
* Randomize server list using local IPv4 address hash as a seed.
*
* @return a copy of the original list with elements in the random order
*/
public static <T extends EurekaEndpoint> List<T> randomize(List<T> list) {
List<T> randomList = new ArrayList<>(list);
if (randomList.size() < 2) {
return randomList;
}
Random random = new Random(LOCAL_IPV4_ADDRESS.hashCode());
int last = randomList.size() - 1;
for (int i = 0; i < last; i++) {
int pos = random.nextInt(randomList.size() - i);
if (pos != i) {
Collections.swap(randomList, i, pos);
}
}
return randomList;
}
}
版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: