Eureka(4)-- Eureka Client 注册源码分析(Eureka Server部分)

1.EurekaClient 发送Rest请求到EurekaSever

Jersey HTTP POST http://localhost:8761/eureka//apps/SPRING-CLOUD-USER with instance BR-IT-A00966.bairong.ad.com:spring-cloud-user:8082; statusCode=204

2.EureakServer启动的时候会启动一些JettyRest接口,ApplicationResource用来处理注册请求

@POST
@Consumes({
   
     "application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
                            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
   
     
    logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
    // validate that the instanceinfo contains all the necessary required fields
    //注册需要有id、hostName、appName、并且hostName和appName相等
    if (isBlank(info.getId())) {
   
     
        return Response.status(400).entity("Missing instanceId").build();
    } else if (isBlank(info.getHostName())) {
   
     
        return Response.status(400).entity("Missing hostname").build();
    } else if (isBlank(info.getAppName())) {
   
     
        return Response.status(400).entity("Missing appName").build();
    } else if (!appName.equals(info.getAppName())) {
   
     
        return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
    }

    // handle cases where clients may be registering with bad DataCenterInfo with missing data
    DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
    if (dataCenterInfo instanceof UniqueIdentifier) {
   
     
        String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
        if (isBlank(dataCenterInfoId)) {
   
     
            boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
            if (experimental) {
   
     
                String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
                return Response.status(400).entity(entity).build();
            } else if (dataCenterInfo instanceof AmazonInfo) {
   
     
                AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
                String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
                if (effectiveId == null) {
   
     
                    amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
                }
            } else {
   
     
                logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
            }
        }
    }
    //调用PeerAwareInstanceRegistry、PeerAwareInstanceRegistryImpl的register方法
    registry.register(info, "true".equals(isReplication));
    return Response.status(204).build();  // 204 to be backwards compatible
}

3.调用PeerAwareInstanceRegistry、PeerAwareInstanceRegistryImpl的register方法

1.先注册到一个一个节点
2、 再将注册信息同步到其他的节点

/**
 * Registers the information about the {@link InstanceInfo} and replicates
 * this information to all peer eureka nodes. If this is replication event
 * from other replica nodes then it is not replicated.
 *
 * @param info
 *            the {@link InstanceInfo} to be registered and replicated.
 * @param isReplication
 *            true if this is a replication event from other replica nodes,
 *            false otherwise.
 */
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
   
     
    int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
    if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
   
     
        leaseDuration = info.getLeaseInfo().getDurationInSecs();
    }
    //1.先注册到一个一个节点
    super.register(info, leaseDuration, isReplication);
    //2.再将注册信息同步到其他的节点
    replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}

3.1调用AbstractInstanceRegistry的register进行注册

/**
     * Registers a new instance with a given duration.
     *
     * @see com.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean)
     */
    public void register(InstanceInfo r, int leaseDuration, boolean isReplication) {
   
     
        try {
   
     
            read.lock();
            Map<String, Lease<InstanceInfo>> gMap = registry.get(r.getAppName());
            REGISTER.increment(isReplication);
            if (gMap == null) {
   
     
                final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap =
                        new ConcurrentHashMap<String, Lease<InstanceInfo>>();
                gMap = registry.putIfAbsent(r.getAppName(), gNewMap);
                if (gMap == null) {
   
     
                    gMap = gNewMap;
                }
            }
            Lease<InstanceInfo> existingLease = gMap.get(r.getId());
            // Retain the last dirty timestamp without overwriting it, if there is already a lease
            if (existingLease != null && (existingLease.getHolder() != null)) {
   
     
                Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
                Long registrationLastDirtyTimestamp = r.getLastDirtyTimestamp();
                logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
   
     
                    logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is " +
                                    "greater than the one that is being registered {}",
                            existingLastDirtyTimestamp,
                            registrationLastDirtyTimestamp);
                    r.setLastDirtyTimestamp(existingLastDirtyTimestamp);
                }
            } else {
   
     
                // The lease does not exist and hence it is a new registration
                synchronized (lock) {
   
     
                    if (this.expectedNumberOfRenewsPerMin > 0) {
   
     
                        // Since the client wants to cancel it, reduce the threshold
                        // (1
                        // for 30 seconds, 2 for a minute)
                        this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
                        this.numberOfRenewsPerMinThreshold =
                                (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
                    }
                }
                logger.debug("No previous lease information found; it is new registration");
            }
            Lease<InstanceInfo> lease = new Lease<InstanceInfo>(r, leaseDuration);
            if (existingLease != null) {
   
     
                lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
            }
            //注册
            gMap.put(r.getId(), lease);
            synchronized (recentRegisteredQueue) {
   
     
                recentRegisteredQueue.add(new Pair<Long, String>(
                        System.currentTimeMillis(),
                        r.getAppName() + "(" + r.getId() + ")"));
            }
            // This is where the initial state transfer of overridden status happens
            if (!InstanceStatus.UNKNOWN.equals(r.getOverriddenStatus())) {
   
     
                logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                                + "overrides", r.getOverriddenStatus(), r.getId());
                if (!overriddenInstanceStatusMap.containsKey(r.getId())) {
   
     
                    logger.info("Not found overridden id {} and hence adding it", r.getId());
                    overriddenInstanceStatusMap.put(r.getId(), r.getOverriddenStatus());
                }
            }
            InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(r.getId());
            if (overriddenStatusFromMap != null) {
   
     
                logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
                r.setOverriddenStatus(overriddenStatusFromMap);
            }

            // Set the status based on the overridden status rules
            InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(r, existingLease, isReplication);
            r.setStatusWithoutDirty(overriddenInstanceStatus);

            // If the lease is registered with UP status, set lease service up timestamp
            if (InstanceStatus.UP.equals(r.getStatus())) {
   
     
                lease.serviceUp();
            }
            r.setActionType(ActionType.ADDED);
            recentlyChangedQueue.add(new RecentlyChangedItem(lease));
            r.setLastUpdatedTimestamp();
            invalidateCache(r.getAppName(), r.getVIPAddress(), r.getSecureVipAddress());
            logger.info("Registered instance {}/{} with status {} (replication={})",
                    r.getAppName(), r.getId(), r.getStatus(), isReplication);
        } finally {
   
     
            read.unlock();
        }
    }

3.2.1将注册信息同步到对端节点,调用PeerAwareInstanceRegistryImpl的replicateToPeers方法。

/**
     * Replicates all eureka actions to peer eureka nodes except for replication
     * traffic to this node.
     *
     */
    private void replicateToPeers(Action action, String appName, String id,
                                  InstanceInfo info /* optional */,
                                  InstanceStatus newStatus /* optional */, boolean isReplication) {
   
     
        Stopwatch tracer = action.getTimer().start();
        try {
   
     
            if (isReplication) {
   
     
                numberOfReplicationsLastMin.increment();
            }
            // If it is a replication already, do not replicate again as this will create a poison replication
            //如果没有对端节点或者这是复制过来的,直接返回
            if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
   
     
                return;
            }

            for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
   
     
                // If the url represents this host, do not replicate to yourself.
                if (peerEurekaNodes.isThisMe(node.getServiceUrl())) {
   
     
                    continue;
                }
                //同步信息
                replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
            }
        } finally {
   
     
            tracer.stop();
        }
    }

3.2.2调用PeerAwareInstanceRegistryImpl的replicateInstanceActionsToPeers方法

/**
     * Replicates all instance changes to peer eureka nodes except for
     * replication traffic to this node.
     *
     */
    private void replicateInstanceActionsToPeers(Action action, String appName,
                                                 String id, InstanceInfo info, InstanceStatus newStatus,
                                                 PeerEurekaNode node) {
   
     
        try {
   
     
            InstanceInfo infoFromRegistry = null;
            CurrentRequestVersion.set(Version.V2);
            switch (action) {
   
     
                case Cancel:
                    node.cancel(appName, id);
                    break;
                case Heartbeat:
                    InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                    break;
                case Register:
                    node.register(info);
                    break;
                case StatusUpdate:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                    break;
                case DeleteStatusOverride:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.deleteStatusOverride(appName, id, infoFromRegistry);
                    break;
            }
        } catch (Throwable t) {
   
     
            logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
        }
    }

3.2.3调用PeerEurekaNode的register方法同步注册信息

/**
     * Sends the registration information of {@link InstanceInfo} receiving by
     * this node to the peer node represented by this class.
     *
     * @param info
     *            the instance information {@link InstanceInfo} of any instance
     *            that is send to this instance.
     * @throws Exception
     */
    public void register(final InstanceInfo info) throws Exception {
   
     
        long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
        batchingDispatcher.process(
                taskId("register", info),
                new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
   
     
                    public EurekaHttpResponse<Void> execute() {
   
     
                        return replicationClient.register(info);
                    }
                },
                expiryTime
        );
    }

3.2.4最终访问AbstractJerseyEurekaHttpClient的register

@Override
    public EurekaHttpResponse<Void> register(InstanceInfo info) {
   
     
        String urlPath = "apps/" + info.getAppName();
        ClientResponse response = null;
        try {
   
     
            Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
            addExtraHeaders(resourceBuilder);
            response = resourceBuilder
                    .header("Accept-Encoding", "gzip")
                    .type(MediaType.APPLICATION_JSON_TYPE)
                    .accept(MediaType.APPLICATION_JSON)
                    .post(ClientResponse.class, info);
            return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
        } finally {
   
     
            if (logger.isDebugEnabled()) {
   
     
                logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
                        response == null ? "N/A" : response.getStatus());
            }
            if (response != null) {
   
     
                response.close();
            }
        }
    }

版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: