[Nacos] Nacos Client向Server发送注册请求和心跳请求 (二)

文章目录

      • 1.Nacos Client的自动注册原理和实现
    • 2.Naocs Client向Server发送注册请求
    • 3.Nacos Client向Server发送心跳请求

Nacos Client的任务: 向Server发送注册请求, 向Server发送心跳请求, Client获取所有的服务, Client定时更新本地服务, Client获取要调用服务的提供者列表

1.Nacos Client的自动注册原理和实现

 <dependency>
     <groupId>com.alibaba.cloud</groupId>
     <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
 </dependency>

 

 

@Configuration(proxyBeanMethods = false)
@Import(AutoServiceRegistrationConfiguration.class)
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled",
		matchIfMissing = true)
public class AutoServiceRegistrationAutoConfiguration {
   
     

	@Autowired(required = false)
	private AutoServiceRegistration autoServiceRegistration;

	@Autowired
	private AutoServiceRegistrationProperties properties;

	@PostConstruct
	protected void init() {
   
     
		if (this.autoServiceRegistration == null && this.properties.isFailFast()) {
   
     
			throw new IllegalStateException("Auto Service Registration has "
					+ "been requested, but there is no AutoServiceRegistration bean");
		}
	}

}

 

 

注册方法: register()

 

 

 

1、 NacosServiceRegistry;
2、 NacosAutoServiceRegistration;

 

AbstractAutoServiceRegistration.onApplicationEvent()

 

 

调用的是NacosServiceRegistry.register()

  • 为什么一启动的时候就注册:
    AbstractAutoServiceRegistration.onApplicationEvent(), 一启动的时候会调用NacosServiceRegistry.register(), 而NacosServiceRegistry是NacosDiscoveryAutoConfiguration类注入的。

2.Naocs Client向Server发送注册请求

根据之前的register()

 

NacosNamingService.registerInstance()

 

    @Override
    public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
   
     
        // 生成的String格式为:groupId@@微服务名称
        String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
        // 若当前实例为临时实例,则向Server发送心跳
        if (instance.isEphemeral()) {
   
     
            // 构建一个心跳信息实例
            BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
            // 向server发送心跳(定时任务)
            beatReactor.addBeatInfo(groupedServiceName, beatInfo);
        }
        // 向Server发送注册请求
        serverProxy.registerService(groupedServiceName, groupName, instance);
    }

BeatInfo为心跳类, BeatReactor为生成心跳类的类

NamingProxy.registerService()

    public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
   
     

        NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,
                instance);

        // 将instance拆散后写入到params中,并以请求参数的形式出现在请求中
        final Map<String, String> params = new HashMap<String, String>(16);
        params.put(CommonParams.NAMESPACE_ID, namespaceId);
        params.put(CommonParams.SERVICE_NAME, serviceName);
        params.put(CommonParams.GROUP_NAME, groupName);
        params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
        params.put("ip", instance.getIp());
        params.put("port", String.valueOf(instance.getPort()));
        params.put("weight", String.valueOf(instance.getWeight()));
        params.put("enable", String.valueOf(instance.isEnabled()));
        params.put("healthy", String.valueOf(instance.isHealthy()));
        params.put("ephemeral", String.valueOf(instance.isEphemeral()));
        params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
        // 提交一个POST请求
        reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);

    }

    public String reqApi(String api, Map<String, String> params, String method) throws NacosException {
   
     
        return reqApi(api, params, Collections.EMPTY_MAP, method);
    }

    public String reqApi(String api, Map<String, String> params, Map<String, String> body, String method)
            throws NacosException {
   
     
        // 第4个参数是获取到配置文件中指定的nacos server地址
        return reqApi(api, params, body, getServerList(), method);
    }

将instance拆散后写入到params中,并以请求参数的形式出现在请求中, 提交一个post请求

    public String reqApi(String api, Map<String, String> params, Map<String, String> body, List<String> servers,
            String method) throws NacosException {
   
     

        params.put(CommonParams.NAMESPACE_ID, getNamespaceId());

        if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(nacosDomain)) {
   
     
            throw new NacosException(NacosException.INVALID_PARAM, "no server available");
        }

        NacosException exception = new NacosException();

        // 遍历所有server,从中随机的选择一个server去连接,
        // 若该server连接失败,则采用轮询方式选择下一个去尝试连接,
        // 直到连接成功为止,或尝试次数为server数量
        if (servers != null && !servers.isEmpty()) {
   
     

            // 生成一个随机数
            Random random = new Random(System.currentTimeMillis());
            int index = random.nextInt(servers.size());

            for (int i = 0; i < servers.size(); i++) {
   
     
                String server = servers.get(index);
                try {
   
     
                    // 连接server
                    return callServer(api, params, body, server, method);
                } catch (NacosException e) {
   
     
                    exception = e;
                    if (NAMING_LOGGER.isDebugEnabled()) {
   
     
                        NAMING_LOGGER.debug("request {} failed.", server, e);
                    }
                }
                index = (index + 1) % servers.size();  // 轮询
            }
        }

        if (StringUtils.isNotBlank(nacosDomain)) {
   
     
            // 默认尝试着连接三次
            for (int i = 0; i < UtilAndComs.REQUEST_DOMAIN_RETRY_COUNT; i++) {
   
     
                try {
   
     
                    return callServer(api, params, body, nacosDomain, method);
                } catch (NacosException e) {
   
     
                    exception = e;
                    if (NAMING_LOGGER.isDebugEnabled()) {
   
     
                        NAMING_LOGGER.debug("request {} failed.", nacosDomain, e);
                    }
                }
            } // end-for
        }

        NAMING_LOGGER.error("request: {} failed, servers: {}, code: {}, msg: {}", api, servers, exception.getErrCode(),
                exception.getErrMsg());

        throw new NacosException(exception.getErrCode(),
                "failed to req API:" + api + " after all servers(" + servers + ") tried: " + exception.getMessage());

    }

 

    public String callServer(String api, Map<String, String> params, Map<String, String> body, String curServer,
            String method) throws NacosException {
   
     
        long start = System.currentTimeMillis();
        long end = 0;
        injectSecurityInfo(params);
        Header header = builderHeader();

        String url;
        // 构建请求url
        if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) {
   
     
            url = curServer + api;
        } else {
   
     
            if (!curServer.contains(UtilAndComs.SERVER_ADDR_IP_SPLITER)) {
   
     
                curServer = curServer + UtilAndComs.SERVER_ADDR_IP_SPLITER + serverPort;
            }
            url = NamingHttpClientManager.getInstance().getPrefix() + curServer + api;
        }

        try {
   
     
            // 提交请求
            HttpRestResult<String> restResult = nacosRestTemplate
                    .exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.class);
            end = System.currentTimeMillis();

            MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(restResult.getCode()))
                    .observe(end - start);

            if (restResult.ok()) {
   
     
                return restResult.getData();
            }
            if (HttpStatus.SC_NOT_MODIFIED == restResult.getCode()) {
   
     
                return StringUtils.EMPTY;
            }
            throw new NacosException(restResult.getCode(), restResult.getMessage());
        } catch (Exception e) {
   
     
            NAMING_LOGGER.error("[NA] failed to request", e);
            throw new NacosException(NacosException.SERVER_ERROR, e);
        }
    }

NacosRestTemplate.java

	// NacosRestTemplate.java
    public <T> HttpRestResult<T> exchangeForm(String url, Header header, Query query, Map<String, String> bodyValues,
            String httpMethod, Type responseType) throws Exception {
   
     
        RequestHttpEntity requestHttpEntity = new RequestHttpEntity(
                header.setContentType(MediaType.APPLICATION_FORM_URLENCODED), query, bodyValues);
        // 提交请求
        return execute(url, httpMethod, requestHttpEntity, responseType);
    }

    private <T> HttpRestResult<T> execute(String url, String httpMethod, RequestHttpEntity requestEntity,
            Type responseType) throws Exception {
   
     
        URI uri = HttpUtils.buildUri(url, requestEntity.getQuery());
        if (logger.isDebugEnabled()) {
   
     
            logger.debug("HTTP method: {}, url: {}, body: {}", httpMethod, uri, requestEntity.getBody());
        }

        ResponseHandler<T> responseHandler = super.selectResponseHandler(responseType);
        HttpClientResponse response = null;
        try {
   
     
            // 获取到Nacos自定义的HttpClient,其就是对JDK中的HttpURLConnection的封装
            response = this.requestClient().execute(uri, httpMethod, requestEntity);
            return responseHandler.handle(response);
        } finally {
   
     
            if (response != null) {
   
     
                response.close();
            }
        }
    }

获取到Nacos自定义的HttpClient,其就是对JDK中的HttpURLConnection的封装,发送对应的post请求。

 

JdkHttpClientRequest.execute()

 

3.Nacos Client向Server发送心跳请求

NacosNamingService.registerInstance()

 

    public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
   
     
        NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
        // 形成的key的格式为:groupId@@微服务名称#ip#port
        // 这个key是固定了主机了
        String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
        BeatInfo existBeat = null;
        //fix #1733
        // dom2Beat是一个缓存map,其key为主机,value则为该主机发送的心跳beatInfo
        if ((existBeat = dom2Beat.remove(key)) != null) {
   
     
            existBeat.setStopped(true);
        }
        dom2Beat.put(key, beatInfo);
        // 开启一个定时任务
        executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
        MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
    }

开启一个定时任务来发送心跳。

 

BeatTask.run()

        @Override
        public void run() {
   
     
            if (beatInfo.isStopped()) {
   
     
                return;
            }
            long nextTime = beatInfo.getPeriod();
            try {
   
     
                // 发送心跳
                JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
                long interval = result.get("clientBeatInterval").asLong();
                boolean lightBeatEnabled = false;
                if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
   
     
                    lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
                }
                BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
                if (interval > 0) {
   
     
                    nextTime = interval;
                }
                int code = NamingResponseCode.OK;
                if (result.has(CommonParams.CODE)) {
   
     
                    code = result.get(CommonParams.CODE).asInt();
                }
                // 若在server端没有发现该client,则server返回的状态码为20404
                // 此时client会发起注册请求
                if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
   
     
                    Instance instance = new Instance();
                    instance.setPort(beatInfo.getPort());
                    instance.setIp(beatInfo.getIp());
                    instance.setWeight(beatInfo.getWeight());
                    instance.setMetadata(beatInfo.getMetadata());
                    instance.setClusterName(beatInfo.getCluster());
                    instance.setServiceName(beatInfo.getServiceName());
                    instance.setInstanceId(instance.getInstanceId());
                    instance.setEphemeral(true);
                    try {
   
     
                        // 向server发送注册请求
                        serverProxy.registerService(beatInfo.getServiceName(),
                                NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
                    } catch (Exception ignore) {
   
     
                    }
                }
            } catch (NacosException ex) {
   
     
                NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",
                        JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());

            }
            // 又启动一次定时任务
            executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
        }

 

发送心跳: sendBeat(), 发送put请求。

 

若在server端没有发现该client,则server返回的状态码为20404, 此时client会发起注册请求

registerService(): 发送post请求

 

在run()方法最后有又启动一次定时任务。

 

版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: