[Nacos] Nacos Server之间的操作 (十一)

文章目录

      • 1.ServiceManager#init()
      • 1.1 定时发送任务
      • 1.2 定时更新状态任务
      • 1.3 定时清除空service任务

1.ServiceManager#init()

 

    @PostConstruct
    public void init() {
   
     
        // 启动了一个定时任务:每60s当前Server会向其它Nacos Server发送一次本机注册表
        // 本机注册表是以各个服务的checksum(字串拼接)形式被发送的
        GlobalExecutor.scheduleServiceReporter(new ServiceReporter(), 60000, TimeUnit.MILLISECONDS);

        // 从其它Nacos Server获取到注册表中的所有instance的最新状态并更新到本地注册表
        GlobalExecutor.submitServiceUpdateManager(new UpdatedServiceProcessor());

        if (emptyServiceAutoClean) {
   
     

            Loggers.SRV_LOG.info("open empty service auto clean job, initialDelay : {} ms, period : {} ms",
                    cleanEmptyServiceDelay, cleanEmptyServicePeriod);

            // delay 60s, period 20s;

            // This task is not recommended to be performed frequently in order to avoid
            // the possibility that the service cache information may just be deleted
            // and then created due to the heartbeat mechanism

            // 启动了一个定时任务:每30s清理一次注册表中的空service
            // 空service,即没有任何instance的service
            GlobalExecutor.scheduleServiceAutoClean(new EmptyServiceAutoClean(), cleanEmptyServiceDelay,
                    cleanEmptyServicePeriod);
        }

        try {
   
     
            Loggers.SRV_LOG.info("listen for service meta change");
            consistencyService.listen(KeyBuilder.SERVICE_META_KEY_PREFIX, this);
        } catch (NacosException e) {
   
     
            Loggers.SRV_LOG.error("listen for service meta change failed!");
        }
    }

1、 启动了一个定时任务:每60s当前Server会向其它NacosServer发送一次本机注册表;
2、 从其它NacosServer获取到注册表中的所有instance的最新状态并更新到本地注册表;
3、 启动了一个定时任务:每30s清理一次注册表中的空service;

1.1 定时发送任务

启动了一个定时任务:每60s当前Server会向其它Nacos Server发送一次本机注册表

ServiceReporter#run()

        @Override
        public void run() {
   
     
            try {
   
     

                // map的key为namespaceId,value为一个Set集合,集合中存放的是当前
                // namespace中所有service的名称
                // 这个map中存放的是当前注册表中所有服务的名称
                Map<String, Set<String>> allServiceNames = getAllServiceNames();

                if (allServiceNames.size() <= 0) {
   
     
                    //ignore
                    return;
                }

                // 遍历所有的namespace
                for (String namespaceId : allServiceNames.keySet()) {
   
     

                    ServiceChecksum checksum = new ServiceChecksum(namespaceId);

                    // 遍历当前namespace中的所有服务名称
                    for (String serviceName : allServiceNames.get(namespaceId)) {
   
     
                        // 若当前服务不归当前Server负责,则直接跳过
                        if (!distroMapper.responsible(serviceName)) {
   
     
                            continue;
                        }

                        // 从注册表中获取到当前遍历的服务
                        Service service = getService(namespaceId, serviceName);

                        if (service == null || service.isEmpty()) {
   
     
                            continue;
                        }
                        // 重新计算当前service的checksum
                        service.recalculateChecksum();
                        // 将计算好的checksum写入到map
                        checksum.addItem(serviceName, service.getChecksum());
                    }  // end-内层for

                    Message msg = new Message();

                    // 将当前namespace中的所有服务的checksum写入到msg中,
                    // 将来将msg发送给其它nacos
                    msg.setData(JacksonUtils.toJson(checksum));

                    // 获取到所有nacos
                    Collection<Member> sameSiteServers = memberManager.allMembers();

                    if (sameSiteServers == null || sameSiteServers.size() <= 0) {
   
     
                        return;
                    }

                    // 遍历所有nacos,要将msg发送出去
                    for (Member server : sameSiteServers) {
   
     
                        // 若当前遍历的server是当前server,则直接跳过
                        if (server.getAddress().equals(NetUtils.localServer())) {
   
     
                            continue;
                        }
                        // 将msg发送给当前遍历的server
                        synchronizer.send(server.getAddress(), msg);
                    }
                }  // end-外层for
            } catch (Exception e) {
   
     
                Loggers.SRV_LOG.error("[DOMAIN-STATUS] Exception while sending service status", e);
            } finally {
   
     
                // 开启下一次定时执行
                GlobalExecutor.scheduleServiceReporter(this, switchDomain.getServiceStatusSynchronizationPeriodMillis(),
                        TimeUnit.MILLISECONDS);
            }
        }
    }

1、 获取当前注册表中所有服务的名称allServiceNames
 
2、 遍历所有的namespace,遍历当前namespace中的所有服务名称;

  • 从注册表中获取到当前遍历的服务, 重新计算当前service的checksum, 保持至map中
  • 获取并遍历所有nacos,要将msg发送出去, msg就是上面的checksum的封装
1.2 定时更新状态任务

从其它Nacos Server获取到注册表中的所有instance的最新状态并更新到本地注册表

UpdatedServiceProcessor#run() -> ServiceUpdater#run() -> ServiceUpdater#updatedHealthStatus()

        @Override
        public void run() {
   
     
            ServiceKey serviceKey = null;

            try {
   
     
                // 运行一个无限循环
                while (true) {
   
     
                    try {
   
     
                        // 从队列中取出一个元素
                        // toBeUpdatedServicesQueue 中存放的是来自于其它Server的服务状态发生变更的服务
                        serviceKey = toBeUpdatedServicesQueue.take();
                    } catch (Exception e) {
   
     
                        Loggers.EVT_LOG.error("[UPDATE-DOMAIN] Exception while taking item from LinkedBlockingDeque.");
                    }

                    if (serviceKey == null) {
   
     
                        continue;
                    }
                    // 另外启用一个线程来完成ServiceUpdater任务
                    GlobalExecutor.submitServiceUpdate(new ServiceUpdater(serviceKey));
                }
            } catch (Exception e) {
   
     
                Loggers.EVT_LOG.error("[UPDATE-DOMAIN] Exception while update service: {}", serviceKey, e);
            }
        }
    }

        @Override
        public void run() {
   
     
            try {
   
     
                updatedHealthStatus(namespaceId, serviceName, serverIP);
            } catch (Exception e) {
   
     
                Loggers.SRV_LOG
                        .warn("[DOMAIN-UPDATER] Exception while update service: {} from {}, error: {}", serviceName,
                                serverIP, e);
            }
        }
    }

 

 

    public void updatedHealthStatus(String namespaceId, String serviceName, String serverIP) {
   
     
        // 从其它server获取指定服务的数据
        Message msg = synchronizer.get(serverIP, UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
        JsonNode serviceJson = JacksonUtils.toObj(msg.getData());

        ArrayNode ipList = (ArrayNode) serviceJson.get("ips");
        // 这个map中存放的是来自于其它Nacos中的、当前服务所包含的所有instance的健康状态
        // map的key为ip:port,value为healthy
        Map<String, String> ipsMap = new HashMap<>(ipList.size());
        // 遍历ipList
        for (int i = 0; i < ipList.size(); i++) {
   
     
            // 这个ip字符串的格式是:  ip:port_healthy
            String ip = ipList.get(i).asText();
            String[] strings = ip.split("_");
            // 将当前遍历instance的地址及健康状态写入到map
            ipsMap.put(strings[0], strings[1]);
        }

        // 从注册表中获取当前服务
        Service service = getService(namespaceId, serviceName);

        if (service == null) {
   
     
            return;
        }

        boolean changed = false;

        // 获取到注册表中当前服务的所有instance
        List<Instance> instances = service.allIPs();
        // 遍历注册表中当前服务的所有instance
        for (Instance instance : instances) {
   
     
            // 获取来自于其它nacos的当前遍历instance的健康状态
            boolean valid = Boolean.parseBoolean(ipsMap.get(instance.toIpAddr()));
            // 若当前instance在注册表中记录的状态与外来的状态不一致,则以外来的为准
            if (valid != instance.isHealthy()) {
   
     
                changed = true;
                // 将注册表中的instance状态修改为外来的状态
                instance.setHealthy(valid);
                Loggers.EVT_LOG.info("{} {SYNC} IP-{} : {}:{}@{}", serviceName,
                        (instance.isHealthy() ? "ENABLED" : "DISABLED"), instance.getIp(), instance.getPort(),
                        instance.getClusterName());
            }
        }

        // 只要有一个instance的状态发生了变更,那么这个changed的值就为true
        if (changed) {
   
     
            // 发布状态变更事件
            pushService.serviceChanged(service);
            if (Loggers.EVT_LOG.isDebugEnabled()) {
   
     
                StringBuilder stringBuilder = new StringBuilder();
                List<Instance> allIps = service.allIPs();
                for (Instance instance : allIps) {
   
     
                    stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");
                }
                Loggers.EVT_LOG
                        .debug("[HEALTH-STATUS-UPDATED] namespace: {}, service: {}, ips: {}", service.getNamespaceId(),
                                service.getName(), stringBuilder.toString());
            }
        }

    }

1、 从其它server获取指定服务的数据msg;
2、 获取到注册表中当前服务的所有instance,遍历,获取来自于其它nacos的当前遍历instance的健康状态,如果当前instance在注册表中记录的状态与外来的状态不一致,则以外来的为准,将注册表中的instance状态修改为外来的状态;
3、 只要有一个instance的状态发生了变更,发布状态变更事件,在上一节的NacosServer与Client的UDP通信中有这个方法的详情分析;

1.3 定时清除空service任务

启动了一个定时任务:每30s清理一次注册表中的空service, 即没有任何instance的service

EmptyServiceAutoClean#run()

        @Override
        public void run() {
   
     

            // Parallel flow opening threshold

            // 这是一个并行流开启阈值:当一个namespace中包含的service的数量超过100时,
            // 会将注册创建为一个并行流,否则就是一个串行流
            int parallelSize = 100;

            // 遍历注册表
            // stringServiceMap 就是注册表的内层map
            serviceMap.forEach((namespace, stringServiceMap) -> {
   
     
                Stream<Map.Entry<String, Service>> stream = null;
                // 若当前遍历的元素(namespace)中包含的服务的数量超出了阈值,
                // 则生成一个并行流
                if (stringServiceMap.size() > parallelSize) {
   
     
                    // 并行流
                    stream = stringServiceMap.entrySet().parallelStream();
                } else {
   
     
                    // 串行流
                    stream = stringServiceMap.entrySet().stream();
                }
                stream.filter(entry -> {
   
     
                    final String serviceName = entry.getKey();
                    // 只要当前遍历的服务需要当前server负责,则通过过滤
                    return distroMapper.responsible(serviceName);

                    // 这里的forEach遍历的元素一定是最终需要由当前server处理的服务
                }).forEach(entry -> stringServiceMap.computeIfPresent(entry.getKey(), (serviceName, service) -> {
   
     
                    // 空的service
                    if (service.isEmpty()) {
   
     

                        // To avoid violent Service removal, the number of times the Service
                        // experiences Empty is determined by finalizeCnt, and if the specified
                        // value is reached, it is removed

                        // 若当前服务为空的次数超出了最大允许值,则删除这个服务,防止暴力删除
                        if (service.getFinalizeCount() > maxFinalizeCount) {
   
     
                            Loggers.SRV_LOG.warn("namespace : {}, [{}] services are automatically cleaned", namespace,
                                    serviceName);
                            try {
   
     
                                // 删除服务
                                easyRemoveService(namespace, serviceName);
                            } catch (Exception e) {
   
     
                                Loggers.SRV_LOG.error("namespace : {}, [{}] services are automatically clean has "
                                        + "error : {}", namespace, serviceName, e);
                            }
                        }

                        // 计数器 + 1
                        service.setFinalizeCount(service.getFinalizeCount() + 1);

                        Loggers.SRV_LOG
                                .debug("namespace : {}, [{}] The number of times the current service experiences "
                                                + "an empty instance is : {}", namespace, serviceName,
                                        service.getFinalizeCount());
                    } else {
   
     
                        // 如果当前service不空instance的话
                        // 将计数器归零
                        service.setFinalizeCount(0);
                    }
                    return service;
                }));
            });
        }

1、 遍历注册表,如果遍历的元素中的服务数量超过阈值则注册并行流,如果没有则生成串行流,去解决清除任务;
2、 先判断是否是空的service,如果是的话则若当前服务为空的次数超出了最大允许值,则删除这个服务,防止暴力删除,并删除服务;
 

版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: