【Eureka】【07】Eureka Server三级缓存,Server端 注册和获取注册信息

1.Eureka服务端 有三层缓存

第一层readOnlyCacheMap 使用ConcurrentHashMap做的map,可以通过配置开关控制,默认是打开的
第二层readWriteCacheMap 使用的是google的loadingcache

2.如果开启readCacheMap的化,会启动一个定时器,每responseCacheUpdateIntervalMs(默认30)秒从readWriteCacheMap中更新一次数据 遍历readCacheMap的keySet,取出readWriteCacheMap的value和readOnlyCacheMap中的value比较,如果不相等用前者覆盖后者

 ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
    if (shouldUseReadOnlyResponseCache) {
				new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
						+ responseCacheUpdateIntervalMs),
  private TimerTask getCacheUpdateTask() {
	return new TimerTask() {
		public void run() {
			logger.debug("Updating the client cache from response cache");
			for (Key key : readOnlyCacheMap.keySet()) {
				if (logger.isDebugEnabled()) {
					Object[] args = {key.getEntityType(), key.getName(), key.getVersion(), key.getType()};
					logger.debug("Updating the client cache from response cache for key : {} {} {} {}", args);
				try {
					Value cacheValue = readWriteCacheMap.get(key);
					Value currentCacheValue = readOnlyCacheMap.get(key);
					if (cacheValue != currentCacheValue) {
						readOnlyCacheMap.put(key, cacheValue);
				} catch (Throwable th) {
					logger.error("Error while updating the client cache from response cache", th);


Value getValue(final Key key, boolean useReadOnlyCache) {
	Value payload = null;
	try {
		if (useReadOnlyCache) {
			final Value currentPayload = readOnlyCacheMap.get(key);
			if (currentPayload != null) {
				payload = currentPayload;
			} else {
				payload = readWriteCacheMap.get(key);
				readOnlyCacheMap.put(key, payload);
		} else {
			payload = readWriteCacheMap.get(key);
	} catch (Throwable t) {
		logger.error("Cannot get value for key :" + key, t);
	return payload;


public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        try {
			invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
            logger.info("Registered instance {}/{} with status {} (replication={})",
                    registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
        } finally {

private void invalidateCache(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress) {
	// invalidate cache
	responseCache.invalidate(appName, vipAddress, secureVipAddress);



 * @author Dave Syer
public @interface EnableEurekaServer {



 * @author Gunnar Hillert
public class EurekaServerConfiguration extends WebMvcConfigurerAdapter {
	public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
			ServerCodecs serverCodecs) {
		this.eurekaClient.getApplications(); // force initialization
		return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
				serverCodecs, this.eurekaClient, this.expectedNumberOfRenewsPerMin,
	 * List of packages containing Jersey resources required by the Eureka server
	private static String[] EUREKA_PACKAGES = new String[] { "com.netflix.discovery","com.netflix.eureka" };
	 * Construct a Jersey {@link javax.ws.rs.core.Application} with all the resources
	 * required by the Eureka server.
	public javax.ws.rs.core.Application jerseyApplication(Environment environment,
			ResourceLoader resourceLoader) {

		ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(
				false, environment);

		// Filter to include only classes that have a particular annotation.
		provider.addIncludeFilter(new AnnotationTypeFilter(Path.class));
		provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class));

		// Find classes in Eureka packages (or subpackages)
		Set<Class<?>> classes = new HashSet<Class<?>>();
		for (String basePackage : EUREKA_PACKAGES) {
			Set<BeanDefinition> beans = provider.findCandidateComponents(basePackage);
			for (BeanDefinition bd : beans) {
				Class<?> cls = ClassUtils.resolveClassName(bd.getBeanClassName(),

		// Construct the Jersey ResourceConfig
		Map<String, Object> propsAndFeatures = new HashMap<String, Object>();
				// Skip static content used by the webapp
				EurekaConstants.DEFAULT_PREFIX + "/(fonts|images|css|js)/.*");

		DefaultResourceConfig rc = new DefaultResourceConfig(classes);

		return rc;


@Produces({"application/xml", "application/json"})
public class ApplicationResource {
    private static final Logger logger = LoggerFactory.getLogger(ApplicationResource.class);
	private final PeerAwareInstanceRegistry registry;
	ApplicationResource(String appName,
                        EurekaServerConfig serverConfig,
                        PeerAwareInstanceRegistry registry) {
        this.appName = appName.toUpperCase();
        this.serverConfig = serverConfig;
        this.registry = registry;
        this.responseCache = registry.getResponseCache();
     * Registers information about a particular instance for an
     * {@link com.netflix.discovery.shared.Application}.
     * @param info
     *            {@link InstanceInfo} information of the instance.
     * @param isReplication
     *            a header parameter containing information whether this is
     *            replicated from other nodes.
    @Consumes({"application/json", "application/xml"})
    public Response addInstance(InstanceInfo info,
                                @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
        logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
        // validate that the instanceinfo contains all the necessary required fields
        if (isBlank(info.getId())) {
            return Response.status(400).entity("Missing instanceId").build();
        } else if (isBlank(info.getHostName())) {
            return Response.status(400).entity("Missing hostname").build();
        } else if (isBlank(info.getAppName())) {
            return Response.status(400).entity("Missing appName").build();
        } else if (!appName.equals(info.getAppName())) {
            return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();

        // handle cases where clients may be registering with bad DataCenterInfo with missing data
        DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
        if (dataCenterInfo instanceof UniqueIdentifier) {
            String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
            if (isBlank(dataCenterInfoId)) {
                boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
                if (experimental) {
                    String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
                    return Response.status(400).entity(entity).build();
                } else if (dataCenterInfo instanceof AmazonInfo) {
                    AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
                    String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
                    if (effectiveId == null) {
                        amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
                } else {
                    logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());

        registry.register(info, "true".equals(isReplication));
        return Response.status(204).build();  // 204 to be backwards compatible

public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {
private static final Logger logger = LoggerFactory.getLogger(PeerAwareInstanceRegistryImpl.class);

public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
    this.peerEurekaNodes = peerEurekaNodes;

    try {
    } catch (Throwable e) {
        logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e);
 * Registers the information about the {@link InstanceInfo} and replicates
 * this information to all peer eureka nodes. If this is replication event
 * from other replica nodes then it is not replicated.
 * @param info
 *            the {@link InstanceInfo} to be registered and replicated.
 * @param isReplication
 *            true if this is a replication event from other replica nodes,
 *            false otherwise.
public void register(final InstanceInfo info, final boolean isReplication) {
    int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
    if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
        leaseDuration = info.getLeaseInfo().getDurationInSecs();
    super.register(info, leaseDuration, isReplication);
    replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);



  • Handles all registry requests from eureka clients.
  • Primary operations that are performed are the
  • Registers, Renewals, Cancels, Expirations, and Status Changes. The
  • registry also stores only the delta operations
  • @author Karthik Ranganathan

public abstract class AbstractInstanceRegistry implements InstanceRegistry {
private static final Logger logger = LoggerFactory.getLogger(AbstractInstanceRegistry.class);

private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
        = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
protected volatile ResponseCache responseCache;

public synchronized void initializedResponseCache() {
    if (responseCache == null) {
        responseCache = new ResponseCacheImpl(serverConfig, serverCodecs, this);
 * Registers a new instance with a given duration.
 * @see com.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean)
public void register(InstanceInfo r, int leaseDuration, boolean isReplication) {
    try {
        Map<String, Lease<InstanceInfo>> gMap = registry.get(r.getAppName());
        if (gMap == null) {
            final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap =
                    new ConcurrentHashMap<String, Lease<InstanceInfo>>();
            gMap = registry.putIfAbsent(r.getAppName(), gNewMap);
            if (gMap == null) {
                gMap = gNewMap;
        Lease<InstanceInfo> existingLease = gMap.get(r.getId());
        // Retain the last dirty timestamp without overwriting it, if there is already a lease
        if (existingLease != null && (existingLease.getHolder() != null)) {
            Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
            Long registrationLastDirtyTimestamp = r.getLastDirtyTimestamp();
            logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
            if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is " +
                                "greater than the one that is being registered {}",
        } else {
            // The lease does not exist and hence it is a new registration
            synchronized (lock) {
                if (this.expectedNumberOfRenewsPerMin > 0) {
                    // Since the client wants to cancel it, reduce the threshold
                    // (1
                    // for 30 seconds, 2 for a minute)
                    this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
                    this.numberOfRenewsPerMinThreshold =
                            (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
            logger.debug("No previous lease information found; it is new registration");
        Lease<InstanceInfo> lease = new Lease<InstanceInfo>(r, leaseDuration);
        if (existingLease != null) {
        gMap.put(r.getId(), lease);
        synchronized (recentRegisteredQueue) {
            recentRegisteredQueue.add(new Pair<Long, String>(
                    r.getAppName() + "(" + r.getId() + ")"));
        // This is where the initial state transfer of overridden status happens
        if (!InstanceStatus.UNKNOWN.equals(r.getOverriddenStatus())) {
            logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                            + "overrides", r.getOverriddenStatus(), r.getId());
            if (!overriddenInstanceStatusMap.containsKey(r.getId())) {
                logger.info("Not found overridden id {} and hence adding it", r.getId());
                overriddenInstanceStatusMap.put(r.getId(), r.getOverriddenStatus());
        InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(r.getId());
        if (overriddenStatusFromMap != null) {
            logger.info("Storing overridden status {} from map", overriddenStatusFromMap);

        // Set the status based on the overridden status rules
        InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(r, existingLease, isReplication);

        // If the lease is registered with UP status, set lease service up timestamp
        if (InstanceStatus.UP.equals(r.getStatus())) {
        recentlyChangedQueue.add(new RecentlyChangedItem(lease));
        invalidateCache(r.getAppName(), r.getVIPAddress(), r.getSecureVipAddress());
        logger.info("Registered instance {}/{} with status {} (replication={})",
                r.getAppName(), r.getId(), r.getStatus(), isReplication);
    } finally {
	private void invalidateCache(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress) {
		// invalidate cache
		responseCache.invalidate(appName, vipAddress, secureVipAddress);


 * The class that is responsible for caching registry information that will be
 * queried by the clients.
 * <p>
 * The cache is maintained in compressed and non-compressed form for three
 * categories of requests - all applications, delta changes and for individual
 * applications. The compressed form is probably the most efficient in terms of
 * network traffic especially when querying all applications.
 * The cache also maintains separate pay load for <em>JSON</em> and <em>XML</em>
 * formats and for multiple versions too.
 * </p>
 * @author Karthik Ranganathan, Greg Kim
public class ResponseCacheImpl implements ResponseCache {
	private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>();

    private final LoadingCache<Key, Value> readWriteCacheMap;
	private final boolean shouldUseReadOnlyResponseCache;
	private final java.util.Timer timer = new java.util.Timer("Eureka-CacheFillTimer", true);

    ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
        this.serverConfig = serverConfig;
        this.serverCodecs = serverCodecs;
        this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache();
        this.registry = registry;

        long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
        this.readWriteCacheMap =
                        .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
                        .removalListener(new RemovalListener<Key, Value>() {
                            public void onRemoval(RemovalNotification<Key, Value> notification) {
                                Key removedKey = notification.getKey();
                                if (removedKey.hasRegions()) {
                                    Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
                                    regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
                        .build(new CacheLoader<Key, Value>() {
                            public Value load(Key key) throws Exception {
                                if (key.hasRegions()) {
                                    Key cloneWithNoRegions = key.cloneWithoutRegions();
                                    regionSpecificKeys.put(cloneWithNoRegions, key);
                                Value value = generatePayload(key);
                                return value;

        if (shouldUseReadOnlyResponseCache) {
                    new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
                            + responseCacheUpdateIntervalMs),

        try {
        } catch (Throwable e) {
            logger.warn("Cannot register the JMX monitor for the InstanceRegistry", e);
	private TimerTask getCacheUpdateTask() {
        return new TimerTask() {
            public void run() {
                logger.debug("Updating the client cache from response cache");
                for (Key key : readOnlyCacheMap.keySet()) {
                    if (logger.isDebugEnabled()) {
                        Object[] args = {key.getEntityType(), key.getName(), key.getVersion(), key.getType()};
                        logger.debug("Updating the client cache from response cache for key : {} {} {} {}", args);
                    try {
                        Value cacheValue = readWriteCacheMap.get(key);
                        Value currentCacheValue = readOnlyCacheMap.get(key);
                        if (cacheValue != currentCacheValue) {
                            readOnlyCacheMap.put(key, cacheValue);
                    } catch (Throwable th) {
                        logger.error("Error while updating the client cache from response cache", th);
     * Invalidate the cache of a particular application.
     * @param appName the application name of the application.
    public void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress) {
        for (Key.KeyType type : Key.KeyType.values()) {
            for (Version v : Version.values()) {
                        new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.full),
                        new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.compact),
                        new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.full),
                        new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.compact),
                        new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.full),
                        new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.compact)
                if (null != vipAddress) {
                    invalidate(new Key(Key.EntityType.VIP, vipAddress, type, v, EurekaAccept.full));
                if (null != secureVipAddress) {
                    invalidate(new Key(Key.EntityType.SVIP, secureVipAddress, type, v, EurekaAccept.full));
     * Invalidate the cache information given the list of keys.
     * @param keys the list of keys for which the cache information needs to be invalidated.
    public void invalidate(Key... keys) {
        for (Key key : keys) {
            logger.debug("Invalidating the response cache key : {} {} {} {}, {}",
                    key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());

            Collection<Key> keysWithRegions = regionSpecificKeys.get(key);
            if (null != keysWithRegions && !keysWithRegions.isEmpty()) {
                for (Key keysWithRegion : keysWithRegions) {
                    logger.debug("Invalidating the response cache key : {} {} {} {} {}",
                            key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());
     * Get the cached information about applications.
     * <p>
     * If the cached information is not available it is generated on the first
     * request. After the first request, the information is then updated
     * periodically by a background thread.
     * </p>
     * @param key the key for which the cached information needs to be obtained.
     * @return payload which contains information about the applications.
    public String get(final Key key) {
        return get(key, shouldUseReadOnlyResponseCache);
    String get(final Key key, boolean useReadOnlyCache) {
        Value payload = getValue(key, useReadOnlyCache);
        if (payload == null || payload.getPayload().equals(EMPTY_PAYLOAD)) {
            return null;
        } else {
            return payload.getPayload();
     * Get the payload in both compressed and uncompressed form.
    Value getValue(final Key key, boolean useReadOnlyCache) {
        Value payload = null;
        try {
            if (useReadOnlyCache) {
                final Value currentPayload = readOnlyCacheMap.get(key);
                if (currentPayload != null) {
                    payload = currentPayload;
                } else {
                    payload = readWriteCacheMap.get(key);
                    readOnlyCacheMap.put(key, payload);
            } else {
                payload = readWriteCacheMap.get(key);
        } catch (Throwable t) {
            logger.error("Cannot get value for key :" + key, t);
        return payload;


@Produces({"application/xml", "application/json"})
public class ApplicationsResource {

     * Get information about all {@link com.netflix.discovery.shared.Applications}.
     * @param version the version of the request.
     * @param acceptHeader the accept header to indicate whether to serve JSON or XML data.
     * @param acceptEncoding the accept header to indicate whether to serve compressed or uncompressed data.
     * @param eurekaAccept an eureka accept extension, see {@link com.netflix.appinfo.EurekaAccept}
     * @param uriInfo the {@link java.net.URI} information of the request made.
     * @param regionsStr A comma separated list of remote regions from which the instances will also be returned.
     *                   The applications returned from the remote region can be limited to the applications
     *                   returned by {@link EurekaServerConfig#getRemoteRegionAppWhitelist(String)}
     * @return a response containing information about all {@link com.netflix.discovery.shared.Applications}
     *         from the {@link AbstractInstanceRegistry}.
    public Response getContainers(@PathParam("version") String version,
                                  @HeaderParam(HEADER_ACCEPT) String acceptHeader,
                                  @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
                                  @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
                                  @Context UriInfo uriInfo,
                                  @Nullable @QueryParam("regions") String regionsStr) {

        boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
        String[] regions = null;
        if (!isRemoteRegionRequested) {
        } else {
            regions = regionsStr.toLowerCase().split(",");
            Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.

        // Check if the server allows the access to the registry. The server can
        // restrict access if it is not
        // ready to serve traffic depending on various reasons.
        if (!registry.shouldAllowAccess(isRemoteRegionRequested)) {
            return Response.status(Status.FORBIDDEN).build();
        KeyType keyType = Key.KeyType.JSON;
        String returnMediaType = MediaType.APPLICATION_JSON;
        if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
            keyType = Key.KeyType.XML;
            returnMediaType = MediaType.APPLICATION_XML;

        Key cacheKey = new Key(Key.EntityType.Application,
                keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions

        Response response;
        if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
            response = Response.ok(responseCache.getGZIP(cacheKey))
                    .header(HEADER_CONTENT_TYPE, returnMediaType)
        } else {
            response = Response.ok(responseCache.get(cacheKey))
        return response;


版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: