[Nacos] Nacos Client获取所有服务和定时更新Client端的注册表 (三)

文章目录

      • 1.Nacos Client获取所有服务
      • 1.1 Client如何获取所有服务
      • 1.2 Client获取服务方法getServices()详解
    • 2.Nacos定时更新Client端的注册表
      • 2.1 Nacos和Eureka定时更新Client端的注册表的区别
      • 2.2 Client定时更新本地服务过程
      • 2.3 updateServiceNow方法解析
      • 2.4 定时更新本地注册表中的当前服务

Nacos的服务发现功能: 获取所有服务, 定时更新Client端的注册表

1.Nacos Client获取所有服务

1.1 Client如何获取所有服务
 <dependency>
     <groupId>com.alibaba.cloud</groupId>
     <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
 </dependency>

 

 

NacosDiscoveryClientAutoConfiguration.java

@Configuration
@ConditionalOnNacosDiscoveryEnabled
@AutoConfigureBefore({
   
      SimpleDiscoveryClientAutoConfiguration.class,
		CommonsClientAutoConfiguration.class })
public class NacosDiscoveryClientAutoConfiguration {
   
     

	@Bean
	@ConditionalOnMissingBean
	public NacosDiscoveryProperties nacosProperties() {
   
     
		return new NacosDiscoveryProperties();
	}

	@Bean
	public DiscoveryClient nacosDiscoveryClient(
			NacosDiscoveryProperties discoveryProperties) {
   
     
		return new NacosDiscoveryClient(discoveryProperties);
	}

	@Bean
	@ConditionalOnMissingBean
	@ConditionalOnProperty(value = "spring.cloud.nacos.discovery.watch.enabled", matchIfMissing = true)
	public NacosWatch nacosWatch(NacosDiscoveryProperties nacosDiscoveryProperties) {
   
     
		return new NacosWatch(nacosDiscoveryProperties);
	}
}

NacosDiscoveryClient#getServices()

    @Override
    public List<String> getServices() {
   
     

        try {
   
     
            ListView<String> services = discoveryProperties.namingServiceInstance()
                    .getServicesOfServer(1, Integer.MAX_VALUE);
            return services.getData();
        } catch (Exception e) {
   
     
            log.error("get service name from nacos server fail,", e);
            return Collections.emptyList();
        }
    }

 

这里的discoveryProperties为上面的NacosDiscoveryClientAutoConfiguration自动注入了。

DiscoveryClientHealthIndicator为SpringBoot的actuator自带的监控功能。
DiscoveryClientHealthIndicator#health()调用了这个getServices()方法。

 

 

 

1.2 Client获取服务方法getServices()详解

NacosMaingService#getServicesOfServer()

 

NamingProxy#getServiceList()

 

最后还是通过向Server端发送get请求。

 

result为两个服务名称。

2.Nacos定时更新Client端的注册表

还是自动注册类NacosDiscoreryClientAutoConfiguration.java, 会自动注入NaocsWatch类, 这个类在Nacos源码不存在, 只是自动注入SpringBoot的类。

 

2.1 Nacos和Eureka定时更新Client端的注册表的区别

Eureka:

Nacos:

2.2 Client定时更新本地服务过程

NacosNamingService#subscribe()

 

通过hostReactor.getServiceInfo()更新本地服务过程

HostReactor#getServiceInfo()

    public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
   
     

        NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
        // 构建key,其格式为  groupId@@微服务名称@@clusters名称
        String key = ServiceInfo.getKey(serviceName, clusters);
        if (failoverReactor.isFailoverSwitch()) {
   
     
            return failoverReactor.getService(key);
        }

        // 从当前client的本地注册表中获取当前服务
        ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);

        if (null == serviceObj) {
   
       // 若本地注册表中没有该服务,则创建一个
            // 创建一个空的服务(没有任何提供者实例instance的ServiceInfo)
            serviceObj = new ServiceInfo(serviceName, clusters);

            // serviceInfoMap即为Client端的本地注册表
            serviceInfoMap.put(serviceObj.getKey(), serviceObj);

            // updatingMap是一个临时缓存,其主要是使用这个缓存map的key,
            // 使用map的key不能重复这个特性
            // 只要有服务名称出现在这个缓存map中,就表示当前这个服务正在被更新
            // 准备要更新serviceName的服务了,就先将其名称写入到这个临时缓存map
            updatingMap.put(serviceName, new Object());
            // 更新本地注册表中的serviceName的服务
            updateServiceNow(serviceName, clusters);
            // 更新完毕,将该serviceName服务从临时缓存map中干掉
            updatingMap.remove(serviceName);

            // 若当前注册表中已经有了这个服务,那么查看一下临时缓存map中
            // 是否存在该服务。若存在,则说明这个服务正在被更新,所以本次
            // 操作先等待wait一会儿
        } else if (updatingMap.containsKey(serviceName)) {
   
     

            if (UPDATE_HOLD_INTERVAL > 0) {
   
     
                // hold a moment waiting for update finish
                synchronized (serviceObj) {
   
     
                    try {
   
     
                        serviceObj.wait(UPDATE_HOLD_INTERVAL);
                    } catch (InterruptedException e) {
   
     
                        NAMING_LOGGER
                                .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
                    }
                }
            }
        }

        // 启动一个定时任务,定时更新本地注册表中的当前服务
        scheduleUpdateIfAbsent(serviceName, clusters);

        return serviceInfoMap.get(serviceObj.getKey());
    }

    private ServiceInfo getServiceInfo0(String serviceName, String clusters) {
   
     

        String key = ServiceInfo.getKey(serviceName, clusters);
        // serviceInfoMap即为Client端的本地注册表,
        // 其key为 groupId@@微服务名称@@clusters名称
        // value为ServiceInfo
        return serviceInfoMap.get(key);
    }

  • 通过Key, Value的模式去serviceInfoMap即为Client端的本地注册表获取服务, Key: groupId@@微服务名称@@clusters名称, Value:ServiceInfo
  • 先从当前client的本地注册表中获取当前服务, 若本地注册表中没有该服务,则创建一个, 首先创建一个空的服务, 然后填充属性。
  • updatingMap: updatingMap是一个临时缓存,其主要是使用这个缓存map的key, 使用map的key不能重复这个特性, 只要有服务名称出现在这个缓存map中,就表示当前这个服务正在被更新, 准备要更新serviceName的服务了,就先将其名称写入到这个临时缓存map。
  • updateServiceNow(serviceName, clusters): 更新本地注册表中的serviceName的服务, 更新完毕后从updatingMap删除serviceName服务
  • 若当前注册表中已经有了这个服务,那么查看一下临时缓存map中, 是否存在该服务。若存在,则说明这个服务正在被更新, 操作先等待wait一会儿
  • 最后, 启动一个定时任务,定时更新本地注册表中的当前服务
2.3 updateServiceNow方法解析

更新本地注册表中的serviceName的服务

    private void updateServiceNow(String serviceName, String clusters) {
   
     
        try {
   
     
            // 更新本地注册表中的serviceName的服务
            updateService(serviceName, clusters);
        } catch (NacosException e) {
   
     
            NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
        }
    }

 

 

processServiceJson方法去将 将来自于Server的ServiceInfo更新到本地注册表, 这里的ServerInfo为向server提交一个GTE请求, 是JSON字符串的形式。

HostReactor#processServiceJson(): 将来自Server的ServerInfo更新至本地注册表, 有一些情况具体分析。

    public ServiceInfo processServiceJson(String json) {
   
     
        // 将来自于Server的JSON转换为ServiceInfo
        ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class);
        // 获取注册表中当前服务的ServiceInfo
        ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
        if (serviceInfo.getHosts() == null || !serviceInfo.validate()) {
   
     
            //empty or error push, just ignore
            return oldService;
        }

        boolean changed = false;

        // 若当前注册表中存在当前服务,则想办法将来自于server的数据更新到本地注册表
        if (oldService != null) {
   
     

            // 为了安全起见,这种情况几乎是不会出现的
            if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) {
   
     
                NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime() + ", new-t: "
                        + serviceInfo.getLastRefTime());
            }

            // 将来自于Server的serviceInfo替换掉注册表中的当前服务
            serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);

            // 遍历本地注册表中当前服务的所有instance实例
            Map<String, Instance> oldHostMap = new HashMap<String, Instance>(oldService.getHosts().size());
            for (Instance host : oldService.getHosts()) {
   
     
                // 将当前遍历的instance主机的ip:port作为key,instance作为value,
                // 写入到一个新的map
                oldHostMap.put(host.toInetAddr(), host);
            }

            // 遍历来自于server的当前服务的所有instance实例
            Map<String, Instance> newHostMap = new HashMap<String, Instance>(serviceInfo.getHosts().size());
            for (Instance host : serviceInfo.getHosts()) {
   
     
                // 将当前遍历的instance主机的ip:port作为key,instance作为value,
                // 写入到一个新的map
                newHostMap.put(host.toInetAddr(), host);
            }

            // 该set集合中存放的是,两个map(oldHostMap与newHostMap)中都有的ip:port,
            // 但它们的instance不相同,此时会将来自于server的instance写入到这个set
            Set<Instance> modHosts = new HashSet<Instance>();
            // 只有newHostMap中存在的instance,即在server端新增的instance
            Set<Instance> newHosts = new HashSet<Instance>();
            // 只有oldHostMap中存在的instance,即在server端被删除的instance
            Set<Instance> remvHosts = new HashSet<Instance>();

            List<Map.Entry<String, Instance>> newServiceHosts = new ArrayList<Map.Entry<String, Instance>>(
                    newHostMap.entrySet());
            // 遍历来自于server的主机
            for (Map.Entry<String, Instance> entry : newServiceHosts) {
   
     
                Instance host = entry.getValue();
                // ip:port
                String key = entry.getKey();
                // 在注册表中存在该ip:port,但这两个instance又不同,则将这个instance写入到modHosts
                if (oldHostMap.containsKey(key) && !StringUtils
                        .equals(host.toString(), oldHostMap.get(key).toString())) {
   
     
                    modHosts.add(host);
                    continue;
                }

                // 若注册表中不存在该ip:port,说明这个主机是新增的,则将其写入到newHosts
                if (!oldHostMap.containsKey(key)) {
   
     
                    newHosts.add(host);
                }
            }

            // 遍历来自于本地注册表的主机
            for (Map.Entry<String, Instance> entry : oldHostMap.entrySet()) {
   
     
                Instance host = entry.getValue();
                String key = entry.getKey();
                if (newHostMap.containsKey(key)) {
   
     
                    continue;
                }

                // 注册表中存在,但来自于server的serviceInfo中不存在,
                // 说明这个instance被干掉了,将其写入到remvHosts
                if (!newHostMap.containsKey(key)) {
   
     
                    remvHosts.add(host);
                }

            }

            if (newHosts.size() > 0) {
   
     
                changed = true;
                NAMING_LOGGER.info("new ips(" + newHosts.size() + ") service: " + serviceInfo.getKey() + " -> "
                        + JacksonUtils.toJson(newHosts));
            }

            if (remvHosts.size() > 0) {
   
     
                changed = true;
                NAMING_LOGGER.info("removed ips(" + remvHosts.size() + ") service: " + serviceInfo.getKey() + " -> "
                        + JacksonUtils.toJson(remvHosts));
            }

            if (modHosts.size() > 0) {
   
     
                changed = true;
                // 变更心跳信息BeatInfo
                updateBeatInfo(modHosts);
                NAMING_LOGGER.info("modified ips(" + modHosts.size() + ") service: " + serviceInfo.getKey() + " -> "
                        + JacksonUtils.toJson(modHosts));
            }

            serviceInfo.setJsonFromServer(json);

            // 只要发生了变更,就将这个发生变更的serviceInfo记录到一个缓存队列
            if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) {
   
     
                eventDispatcher.serviceChanged(serviceInfo);
                DiskCache.write(serviceInfo, cacheDir);
            }

            // 若本地注册表中就没有当前服务,则直接将来自于server的serviceInfo写入到注册表
        } else {
   
     
            changed = true;
            NAMING_LOGGER.info("init new ips(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> "
                    + JacksonUtils.toJson(serviceInfo.getHosts()));
            // 将来自于server的serviceInfo写入到注册表
            serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
            // 将这个发生变更的serviceInfo记录到一个缓存队列
            eventDispatcher.serviceChanged(serviceInfo);
            serviceInfo.setJsonFromServer(json);
            DiskCache.write(serviceInfo, cacheDir);
        }

        MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());

        if (changed) {
   
     
            NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> "
                    + JacksonUtils.toJson(serviceInfo.getHosts()));
        }

        return serviceInfo;
    }

  • 来自于Server的数据是最新的数据, oldService为当前注册表中的服务SericeInfo, 为旧的服务。

  • 若当前注册表中存在当前服务, 将来自于server的数据更新到本地注册表

  • 将来自于Server的serviceInfo替换掉注册表中的当前服务, 保持至serviceInfoMap

  • 遍历本地注册表中当前服务的所有instance实例, 将当前遍历的instance主机的ip:port作为key,instance作为value,写入到一个新的map, oldHostMap

  • 遍历来自于server的当前服务的所有instance实例, 将当前遍历的instance主机的ip:port作为key,instance作为value,入到一个新的map, newHostMap

  • 创建3个Set集合, modHosts存放的是两个map(oldHostMap与newHostMap)中都有的ip:port, 但它们的instance不相同,此时会将来自于server的instance写入到这个set, newHosts存放的是只有newHostMap中存在的instance,即在server端新增的instance, remvHosts存放的是只有oldHostMap中存在的instance,即在server端被删除的instance

  • 遍历来自于server的主机, 如果在注册表中存在该ip:port,但这两个instance又不同,则将这个instance写入到modHosts, 如果若注册表中不存在该ip:port,说明这个主机是新增的,则将其写入到newHosts

  • 遍历来自于本地注册表的主机, 如果注册表中存在,但来自于server的serviceInfo中不存在, 说明这个instance被干掉了,将其写入到remvHosts

  • 如果modHosts中存在对象, 那么变更心跳信息BeatInfo, updateBeatInfo()

  • 只要发生了变更,就将这个发生变更的serviceInfo记录到一个缓存队列, DiskCache

  • 若本地注册表中就没有当前服务,则直接将来自于server的serviceInfo写入到注册表, 将这个发生变更的serviceInfo记录到一个缓存队列, DiskCache

2.4 定时更新本地注册表中的当前服务

HostReactor#scheduleUpdateIfAbsent()
发起一个定时任务, 定时更新本地注册表中的当前服务

在HostReator#getServiceInfo()最后有一个定时任务的代码

 

    public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
   
     
        // futureMap是一个缓存map,其key为 groupId@@微服务名称@@clusters
        // value是一个定时异步操作对象
        // 这种结构称之为:双重检测锁,DCL,Double Check Lock
        // 该结构是为了避免在并发情况下,多线程重复写入数据
        // 该结构的特征:
        // 1)有两个不为null的判断
        // 2)有共享集合
        // 3)有synchronized代码块
        if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
   
     
            return;
        }

        synchronized (futureMap) {
   
     
            if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
   
     
                return;
            }

            // 创建一个定时异步操作对象,并启动这个定时任务
            ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));
            // 将这个定时异步操作对象写入到缓存map
            futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
        }
    }

这个是一个DCL结构, 结构是为了避免在并发情况下,多线程重复写入数据。

UpdateTask#run() 启动定时任务

        @Override
        public void run() {
   
     
            long delayTime = DEFAULT_DELAY;

            try {
   
     
                // 从本地注册表中获取当前服务
                ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));

                // 若本地注册表中不存在该服务,则从server获取到后,更新到本地注册表
                if (serviceObj == null) {
   
     
                    // 从server获取当前服务,并更新到本地注册表
                    updateService(serviceName, clusters);
                    return;
                }

                // 处理本地注册表中存在当前服务的情况
                // 1)serviceObj.getLastRefTime() 获取到的是当前服务最后被访问的时间,这个时间
                // 是来自于本地注册表的,其记录的是所有提供这个服务的instance中最后一个instance
                // 被访问的时间
                // 2)缓存lastRefTime 记录的是当前instance最后被访问的时间
                // 若1)时间 小于 2)时间,说明当前注册表应该更新的
                if (serviceObj.getLastRefTime() <= lastRefTime) {
   
     
                    updateService(serviceName, clusters);
                    serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
                } else {
   
     
                    // if serviceName already updated by push, we should not override it
                    // since the push data may be different from pull through force push
                    refreshOnly(serviceName, clusters);
                }

                // 将来自于注册表的这个最后访问时间更新到当前client的缓存
                lastRefTime = serviceObj.getLastRefTime();

                if (!eventDispatcher.isSubscribed(serviceName, clusters) && !futureMap
                        .containsKey(ServiceInfo.getKey(serviceName, clusters))) {
   
     
                    // abort the update task
                    NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);
                    return;
                }
                if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
   
     
                    incFailCount();
                    return;
                }
                delayTime = serviceObj.getCacheMillis();
                resetFailCount();
            } catch (Throwable e) {
   
     
                incFailCount();
                NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
            } finally {
   
     
                // 开启下一次的定时任务
                executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
            }
        }

1、 从本地注册表中获取当前服务,若本地注册表中不存在该服务,Server获取后,更新到本地注册表ServiceInfo;
 
这里的updateService()在之前分析过。
2、 处理本地注册表中存在当前服务的情况,判断当前服务最后被访问的时间和当前instance最后被访问的时间的大小;
如果当前服务最后被访问的时间小于等于当前instance最后被访问的时间的话, 那么说明当前注册表应该更新的
 
这里的updateService()在之前分析过。
3、 最后开启下一次的定时任务;
 

版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: