(九) Nacos Server处理订阅请求
文章目录
-
-
- 1.InstanceController#list()
- 2.InstanceController#doSrvIpxt()
- 3.总结
-
1.InstanceController#list()
Nacos Server处理订阅请求

主要还是从请求中获取参数, 比如namespceId、serviceName、agent(指定提交请求的客户端是哪种类型)、clusters、clusterIP、udpPort(后续UDP通信会使用)、app、tenant, 最后调用方法对参数进行处理
2.InstanceController#doSrvIpxt()
对请求进行详细处理
public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP,
int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {
// 不同agent,生成不同的clientInfo
ClientInfo clientInfo = new ClientInfo(agent);
// 创建一个JSON Node,其就是当前方法返回的结果。后续代码就是对这个Node的各种初始化
ObjectNode result = JacksonUtils.createEmptyJsonNode();
// 从注册表中获取当前服务
Service service = serviceManager.getService(namespaceId, serviceName);
long cacheMillis = switchDomain.getDefaultCacheMillis();
// now try to enable the push
try {
if (udpPort > 0 && pushService.canEnablePush(agent)) {
// 创建当前发出订阅请求的Nacos client的UDP Client, PushClient
// 注意,在Nacos的UDP通信中,Nacos Server充当的是UDP Client,Nacos Client充当的是UDP Server
pushService
.addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort),
pushDataSource, tid, app);
cacheMillis = switchDomain.getPushCacheMillis(serviceName);
}
} catch (Exception e) {
Loggers.SRV_LOG
.error("[NACOS-API] failed to added push client {}, {}:{}", clientInfo, clientIP, udpPort, e);
cacheMillis = switchDomain.getDefaultCacheMillis();
}
// 若注册表中没有该服务,则直接结束
if (service == null) {
if (Loggers.SRV_LOG.isDebugEnabled()) {
Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
}
result.put("name", serviceName);
result.put("clusters", clusters);
result.put("cacheMillis", cacheMillis);
// 注意,hosts为空
result.replace("hosts", JacksonUtils.createEmptyArrayNode());
return result;
}
// 代码直到这里,说明注册表中存在该服务
// 检测该服务是否被禁。若是被禁的服务,直接抛出异常
checkIfDisabled(service);
List<Instance> srvedIPs;
// 获取到当前服务的所有实例,包含所有持久/临时实例
srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));
// filter ips using selector:
// 若选择器不空,则根据选择算法选择可用的intance列表,默认情况下,选择器不做任何过滤
if (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) {
srvedIPs = service.getSelector().select(clientIP, srvedIPs);
}
// 若最终选择的结果为空,则直接结束
if (CollectionUtils.isEmpty(srvedIPs)) {
if (Loggers.SRV_LOG.isDebugEnabled()) {
Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
}
if (clientInfo.type == ClientInfo.ClientType.JAVA
&& clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
result.put("dom", serviceName);
} else {
result.put("dom", NamingUtils.getServiceName(serviceName));
}
result.put("name", serviceName);
result.put("cacheMillis", cacheMillis);
result.put("lastRefTime", System.currentTimeMillis());
result.put("checksum", service.getChecksum());
result.put("useSpecifiedURL", false);
result.put("clusters", clusters);
result.put("env", env);
// 注意,hosts为空
result.set("hosts", JacksonUtils.createEmptyArrayNode());
result.set("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));
return result;
}
// 代码走到这里,说明具有可用的instance
Map<Boolean, List<Instance>> ipMap = new HashMap<>(2);
// 这个map只有两个key,True与False
// key为true的value中存放的是所有健康的instance
// key为false的value存放的是所有不健康的instance
ipMap.put(Boolean.TRUE, new ArrayList<>());
ipMap.put(Boolean.FALSE, new ArrayList<>());
// 根据instance的健康状态,将所有instance分流放入map的不同key的value中
for (Instance ip : srvedIPs) {
// 这个语句写的非常好
// 健康加入健康的列表, 不健康的加入不健康的列表
ipMap.get(ip.isHealthy()).add(ip);
}
// isCheck为true,表示需要检测instance的保护阈值
if (isCheck) {
// reachProtectThreshold 是否达到了保护阈值, false 为没有达到
result.put("reachProtectThreshold", false);
}
// 获取服务的保护阈值
double threshold = service.getProtectThreshold();
// 若 "健康instance数量/instance总数" <= 保护阈值,则说明需要启动保护机制了
if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) {
Loggers.SRV_LOG.warn("protect threshold reached, return all ips, service: {}", serviceName);
if (isCheck) {
// true表示启动保护机制
result.put("reachProtectThreshold", true);
}
// 健康数量小于阈值, 则从所有实例中调用, 可能会有不健康实例, 可以保证健康实例不被压崩溃
// 将所有不健康的instance添加到的key为true的instance列表,
// 即key为true的value中(instance列表)存放的是所有instance实例
// 包含所有健康的与不健康的instance
ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE));
// 清空key为false的value(不健康的instance列表)
ipMap.get(Boolean.FALSE).clear();
}
if (isCheck) {
result.put("protectThreshold", service.getProtectThreshold());
result.put("reachLocalSiteCallThreshold", false);
return JacksonUtils.createEmptyJsonNode();
}
ArrayNode hosts = JacksonUtils.createEmptyArrayNode();
// 注意,这个ipMap中存放着所有健康与不健康的instance列表
for (Map.Entry<Boolean, List<Instance>> entry : ipMap.entrySet()) {
List<Instance> ips = entry.getValue();
// 若客户端只要健康的instance,且当前遍历的map的key为false,则跳过
if (healthyOnly && !entry.getKey()) {
continue;
}
// 遍历的这个ips可能是所有不健康的instance列表,
// 也可能是所有健康的instance列表,
// 也可能是所有健康与不健康的instance列表总和
for (Instance instance : ips) {
// 跳过禁用的instance
if (!instance.isEnabled()) {
continue;
}
ObjectNode ipObj = JacksonUtils.createEmptyJsonNode();
// 将当前遍历的instance转换为JSON
ipObj.put("ip", instance.getIp());
ipObj.put("port", instance.getPort());
// deprecated since nacos 1.0.0:
ipObj.put("valid", entry.getKey());
ipObj.put("healthy", entry.getKey());
ipObj.put("marked", instance.isMarked());
ipObj.put("instanceId", instance.getInstanceId());
ipObj.set("metadata", JacksonUtils.transferToJsonNode(instance.getMetadata()));
ipObj.put("enabled", instance.isEnabled());
ipObj.put("weight", instance.getWeight());
ipObj.put("clusterName", instance.getClusterName());
if (clientInfo.type == ClientInfo.ClientType.JAVA
&& clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
ipObj.put("serviceName", instance.getServiceName());
} else {
ipObj.put("serviceName", NamingUtils.getServiceName(instance.getServiceName()));
}
ipObj.put("ephemeral", instance.isEphemeral());
hosts.add(ipObj);
} // end-for
} // end-for
result.replace("hosts", hosts);
if (clientInfo.type == ClientInfo.ClientType.JAVA
&& clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
result.put("dom", serviceName);
} else {
result.put("dom", NamingUtils.getServiceName(serviceName));
}
result.put("name", serviceName);
result.put("cacheMillis", cacheMillis);
result.put("lastRefTime", System.currentTimeMillis());
result.put("checksum", service.getChecksum());
result.put("useSpecifiedURL", false);
result.put("clusters", clusters);
result.put("env", env);
result.replace("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));
return result;
}