您当前的位置: 首页 >  eureka

墨家巨子@俏如来

暂无认证

  • 1浏览

    0关注

    188博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

八.SpringCloud源码剖析-Eureka Server初始化流程

墨家巨子@俏如来 发布时间:2020-09-09 22:49:50 ,浏览量:1

系列文章目录

一.SpringCloud源码剖析-Eureka核心API

二.SpringCloud源码剖析-Eureka Client 初始化过程

三.SpringCloud源码剖析-Eureka服务注册

四.SpringCloud源码剖析-Eureka服务发现

五.SpringCloud源码剖析-Eureka Client服务续约

六.SpringCloud源码剖析-Eureka Client取消注册

七.SpringCloud源码剖析-Eureka Server的自动配置

八.SpringCloud源码剖析-Eureka Server初始化流程

九.SpringCloud源码剖析-Eureka Server服务注册流程

十.SpringCloud源码剖析-Eureka Server服务续约

十一.SpringCloud源码剖析-Eureka Server服务注册表拉取

十二.SpringCloud源码剖析-Eureka Server服务剔除

十三.SpringCloud源码剖析-Eureka Server服务下线

一.Eureka Server 核心组件介绍 1.EurekaServerContext

Eureka服务端上下文对象,包含了初始化,关闭,获取服务配置,获取集群节点,获取服务注册器,获取服务信息管理器等方法,默认实现类是DefaultEurekaServerContext

public interface EurekaServerContext {
	//初始化
    void initialize() throws Exception;
	//关闭
    void shutdown() throws Exception;
	//获取服务配置
    EurekaServerConfig getServerConfig();
	//获取集群节点管理管理类
    PeerEurekaNodes getPeerEurekaNodes();
	//服务器编解码器
    ServerCodecs getServerCodecs();
	//服务注册器
    PeerAwareInstanceRegistry getRegistry();
	//instanceInfo实例信息管理器
    ApplicationInfoManager getApplicationInfoManager();

}

DefaultEurekaServerContext实现类代码

/**
 * Represent the local server context and exposes getters to components of the
 * local server such as the registry.
 *
 * @author David Liu
 */
@Singleton
public class DefaultEurekaServerContext implements EurekaServerContext {
    private static final Logger logger = LoggerFactory.getLogger(DefaultEurekaServerContext.class);

    private final EurekaServerConfig serverConfig;
    private final ServerCodecs serverCodecs;
    private final PeerAwareInstanceRegistry registry;
    private final PeerEurekaNodes peerEurekaNodes;
    private final ApplicationInfoManager applicationInfoManager;

    @Inject
    public DefaultEurekaServerContext(EurekaServerConfig serverConfig,
                               ServerCodecs serverCodecs,
                               PeerAwareInstanceRegistry registry,
                               PeerEurekaNodes peerEurekaNodes,
                               ApplicationInfoManager applicationInfoManager) {
        this.serverConfig = serverConfig;
        this.serverCodecs = serverCodecs;
        this.registry = registry;
        this.peerEurekaNodes = peerEurekaNodes;
        this.applicationInfoManager = applicationInfoManager;
    }
	//  @PostConstruct :EurekaServerContext初始化的时候initialize方法被执行,调用 peerEurekaNodes.start();开启EurekaServer的初始化,
	//然后再调用 peerAwareInstanceRegistry的.init(peerEurekaNodes);方法初始化
    @PostConstruct
    @Override
    public void initialize() {
        logger.info("Initializing ...");
        //PeerEurekaNodes开始初始化
        peerEurekaNodes.start();
        try {
        	//peerAwareInstanceRegistry开始初始化
            registry.init(peerEurekaNodes);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        logger.info("Initialized");
    }
	//EurekaServerContext销毁之前(@PreDestroy)调用shutdown,
	//peerAwareInstanceRegistry 注册器的shutdown执行关闭流程
    @PreDestroy
    @Override
    public void shutdown() {
        logger.info("Shutting down ...");
        //服务注册器关闭
        registry.shutdown();
        //peerEurekaNodes集群节点关闭
        peerEurekaNodes.shutdown();
        logger.info("Shut down");
    }

    ...省略...

}

DefaultEurekaServerContext的initialize初始化方法中做的事情就是在初始化的时候,调用peerEurekaNodes.start();初始化集群节点, 调用PeerAwareInstanceRegistry.init初始化注册器,在shutdown销毁方法中调用PeerAwareInstanceRegistry.shudown执行注册器的关闭流程,调用peerEurekaNodes.shutdown执行集群节点的关闭

2.PeerEurekaNodes

PeerEurekaNodes用来管理Eureka集群节点PeerEurekaNode生命周期的工具被DefaultEurekaServerContext 的initialize初始化方法中执行,源码如下

/**
 * Helper class to manage lifecycle of a collection of {@link PeerEurekaNode}s.
 *
 * @author Tomasz Bak
 */
@Singleton
public class PeerEurekaNodes {

    private static final Logger logger = LoggerFactory.getLogger(PeerEurekaNodes.class);
	//服务注册接口
    protected final PeerAwareInstanceRegistry registry;
    //服务端配置对象
    protected final EurekaServerConfig serverConfig;
    //客户端配置
    protected final EurekaClientConfig clientConfig;
    protected final ServerCodecs serverCodecs;
    //InstanceInfo实例管理器
    private final ApplicationInfoManager applicationInfoManager;
	//Eureka集群节点集合
    private volatile List peerEurekaNodes = Collections.emptyList();
    	//Eureka集群节点的url集合
    private volatile Set peerEurekaNodeUrls = Collections.emptySet();
	//定时任务执行器
    private ScheduledExecutorService taskExecutor;
	//初始化节点工具
    @Inject
    public PeerEurekaNodes(
            PeerAwareInstanceRegistry registry,
            EurekaServerConfig serverConfig,
            EurekaClientConfig clientConfig,
            ServerCodecs serverCodecs,
            ApplicationInfoManager applicationInfoManager) {
        this.registry = registry;
        this.serverConfig = serverConfig;
        this.clientConfig = clientConfig;
        this.serverCodecs = serverCodecs;
        this.applicationInfoManager = applicationInfoManager;
    }
	//获取集群节点集合,不可修改
    public List getPeerNodesView() {
        return Collections.unmodifiableList(peerEurekaNodes);
    }
	//获取集群节点集合
    public List getPeerEurekaNodes() {
        return peerEurekaNodes;
    }
    
    //此实例提供对等复制实例的最小数量,被认为是健康的
    public int getMinNumberOfAvailablePeers() {
        return serverConfig.getHealthStatusMinNumberOfAvailablePeers();
    }
	//开始
    public void start() {
    	//创建 一个名字为Eureka-PeerNodesUpdater"单线程的定时执行器
        taskExecutor = Executors.newSingleThreadScheduledExecutor(
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
                        thread.setDaemon(true);
                        return thread;
                    }
                }
        );
        try {
        	//更新集群中的节点中的注册信息
            updatePeerEurekaNodes(resolvePeerUrls());
            //创建runnable线程,业务逻辑为:updatePeerEurekaNodes(resolvePeerUrls());
            Runnable peersUpdateTask = new Runnable() {
                @Override
                public void run() {
                    try {
                        updatePeerEurekaNodes(resolvePeerUrls());
                    } catch (Throwable e) {
                        logger.error("Cannot update the replica Nodes", e);
                    }

                }
            };
            //
            taskExecutor.scheduleWithFixedDelay(
                    peersUpdateTask,
                    serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                    //定时器时间间隔默认:10分钟peerEurekaNodesUpdateIntervalMs=10 * MINUTES
                    serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                    TimeUnit.MILLISECONDS
            );
        } catch (Exception e) {
            throw new IllegalStateException(e);
        }
        for (PeerEurekaNode node : peerEurekaNodes) {
            logger.info("Replica node URL:  {}", node.getServiceUrl());
        }
    }
	//关闭,关闭节点更新的定时任务,清空peerEurekaNodes ,peerEurekaNodeUrls ,调用每个节点的shutDown方法
    public void shutdown() {
        taskExecutor.shutdown();
        List toRemove = this.peerEurekaNodes;

        this.peerEurekaNodes = Collections.emptyList();
        this.peerEurekaNodeUrls = Collections.emptySet();

        for (PeerEurekaNode node : toRemove) {
            node.shutDown();
        }
    }

    /**
    基于相同的Zone得到Eureka集群中多个节点的url,过滤掉当前节点
     * Resolve peer URLs.
     *
     * @return peer URLs with node's own URL filtered out
     */
    protected List resolvePeerUrls() {
        InstanceInfo myInfo = applicationInfoManager.getInfo();
        String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo);
        //配置的eureka地址url
        List replicaUrls = EndpointUtils
                .getDiscoveryServiceUrls(clientConfig, zone, new EndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo));

        int idx = 0;
        while (idx  0) {
                    // Since the client wants to cancel it, reduce the threshold (1 for 30 seconds, 2 for a minute)
                    //客户下线,降低续约阈值
                    this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin - 2;
                    this.numberOfRenewsPerMinThreshold =
                            (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
                }
            }
            return true;
        }
        return false;
    }
	//服务注册
  @Override
    public void register(final InstanceInfo info, final boolean isReplication) {
        int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
        if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
            leaseDuration = info.getLeaseInfo().getDurationInSecs();
        }
        //调用父类的注册
        super.register(info, leaseDuration, isReplication);
        //注册信息同步到集群中其他节点
        replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
    }
	//续约
    public boolean renew(final String appName, final String id, final boolean isReplication) {
    	//调用父类的续约
        if (super.renew(appName, id, isReplication)) {
        	//同步到集群中的其他节点
            replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
            return true;
        }
        return false;
    }
    //修改服务状态
 @Override
    public boolean statusUpdate(final String appName, final String id,
                                final InstanceStatus newStatus, String lastDirtyTimestamp,
                                final boolean isReplication) {
        if (super.statusUpdate(appName, id, newStatus, lastDirtyTimestamp, isReplication)) {
        	//状态同步到其他节点
            replicateToPeers(Action.StatusUpdate, appName, id, null, newStatus, isReplication);
            return true;
        }
        return false;
    }
    //删除状态
 @Override
    public boolean deleteStatusOverride(String appName, String id,
                                        InstanceStatus newStatus,
                                        String lastDirtyTimestamp,
                                        boolean isReplication) {
        if (super.deleteStatusOverride(appName, id, newStatus, lastDirtyTimestamp, isReplication)) {
            replicateToPeers(Action.DeleteStatusOverride, appName, id, null, null, isReplication);
            return true;
        }
        return false;
    }
    //是否启用租约到期
    @Override
    public boolean isLeaseExpirationEnabled() {
        if (!isSelfPreservationModeEnabled()) {
            // The self preservation mode is disabled, hence allowing the instances to expire.
            return true;
        }
        return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;
    }

//更新续约阈值
 private void updateRenewalThreshold() {
        try {
            Applications apps = eurekaClient.getApplications();
            //统计有多少个实例
            int count = 0;
            for (Application app : apps.getRegisteredApplications()) {
                for (InstanceInfo instance : app.getInstances()) {
                    if (this.isRegisterable(instance)) {
                        ++count;
                    }
                }
            }
            synchronized (lock) {
	            //仅当阈值大于当前的预期阈值,或禁用了自我保留时才更新阈值。
                // Update threshold only if the threshold is greater than the
                // current expected threshold or if self preservation is disabled.
                if ((count * 2) > (serverConfig.getRenewalPercentThreshold() * expectedNumberOfRenewsPerMin)
                        || (!this.isSelfPreservationModeEnabled())) {
                    this.expectedNumberOfRenewsPerMin = count * 2;
                    this.numberOfRenewsPerMinThreshold = (int) ((count * 2) * serverConfig.getRenewalPercentThreshold());
                }
            }
            logger.info("Current renewal threshold is : {}", numberOfRenewsPerMinThreshold);
        } catch (Throwable e) {
            logger.error("Cannot update renewal threshold", e);
        }
    }

 /**
    集群之间的节点复制
     * Replicates all eureka actions to peer eureka nodes except for replication
     * traffic to this node.
     *
     */
    private void replicateToPeers(Action action, String appName, String id,
                                  InstanceInfo info /* optional */,
                                  InstanceStatus newStatus /* optional */, boolean isReplication) {
        Stopwatch tracer = action.getTimer().start();
        try {
            if (isReplication) {
                numberOfReplicationsLastMin.increment();
            }
            // If it is a replication already, do not replicate again as this will create a poison replication
            if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
                return;
            }

            for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
                // If the url represents this host, do not replicate to yourself.
                if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                    continue;
                }
                replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
            }
        } finally {
            tracer.stop();
        }
    }

/**
		集群之间的节点复制
     * Replicates all instance changes to peer eureka nodes except for
     * replication traffic to this node.
     *
     */
    private void replicateInstanceActionsToPeers(Action action, String appName,
                                                 String id, InstanceInfo info, InstanceStatus newStatus,
                                                 PeerEurekaNode node) {
        try {
            InstanceInfo infoFromRegistry = null;
            CurrentRequestVersion.set(Version.V2);
            switch (action) {
                case Cancel:
                    node.cancel(appName, id);
                    break;
                case Heartbeat:
                    InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                    break;
                case Register:
                    node.register(info);
                    break;
                case StatusUpdate:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                    break;
                case DeleteStatusOverride:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.deleteStatusOverride(appName, id, infoFromRegistry);
                    break;
            }
        } catch (Throwable t) {
            logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
        }
    }


  ....省略一些代码....
}

这个服务注册器实现类看起来很复杂它做了那些事情呢

  • init初始化:注册表缓存ResponseCache初始化,续约阈值定时更新任务初始化,初始化远程注册表
  • showdown:执行所有清理和关闭操作
  • syncUp:集群之间的数据同步节点复制
  • cancel:服务下线,并同步到其他节点
  • register:服务注册,并同步到其他节点
  • renew: 续约,并同步到其他节点
5.EurekaServerInitializerConfiguration

EurekaServerAutoConfiguration 通过 @Import(EurekaServerInitializerConfiguration.class)进行初始化,EurekaServerInitializerConfiguration实现了SmartLifecycle,其中的start方法会再Spring启动过程中,执行LifecycleProcessor().onRefresh()生命周期处理器刷新的时候被调用,然后再调用EurekaServerBootstrap.contextInitialized进行初始化Eureka和启动Eureka

/**
 * @author Dave Syer
 */
@Configuration
public class EurekaServerInitializerConfiguration
		implements ServletContextAware, SmartLifecycle, Ordered {

	private static final Log log = LogFactory.getLog(EurekaServerInitializerConfiguration.class);
	//EurekaServer 配置
	@Autowired
	private EurekaServerConfig eurekaServerConfig;
	//Servlet上下文
	private ServletContext servletContext;
	//应用上下文对象
	@Autowired
	private ApplicationContext applicationContext;
	//启动引导
	@Autowired
	private EurekaServerBootstrap eurekaServerBootstrap;

	private boolean running;

	private int order = 1;
	//初始化Servlet上下文
	@Override
	public void setServletContext(ServletContext servletContext) {
		this.servletContext = servletContext;
	}
	//开始方法,复写于 SmartLifecycle 在Spring启动的时候,该方法会被地调用,
	@Override
	public void start() {
		new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					//TODO: is this class even needed now?
					//初始化EurekaServer上下文,启动EurekaServer
					eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
					log.info("Started Eureka Server");
					//发布一个EurekaRegistryAvailableEvent注册事件
					publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
					//改变running状态true
					EurekaServerInitializerConfiguration.this.running = true;
					//发布EurekaServer启动事件EurekaServerStartedEvent
					publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
				}
				catch (Exception ex) {
					// Help!
					log.error("Could not initialize Eureka servlet context", ex);
				}
			}
		}).start();
	}

	private EurekaServerConfig getEurekaServerConfig() {
		return this.eurekaServerConfig;
	}

	private void publish(ApplicationEvent event) {
		this.applicationContext.publishEvent(event);
	}
	//生命周期,停止,销毁eurekaServer
	@Override
	public void stop() {
		this.running = false;
		eurekaServerBootstrap.contextDestroyed(this.servletContext);
	}

	@Override
	public boolean isRunning() {
		return this.running;
	}

	@Override
	public int getPhase() {
		return 0;
	}

	@Override
	public boolean isAutoStartup() {
		return true;
	}

	@Override
	public void stop(Runnable callback) {
		callback.run();
	}

	@Override
	public int getOrder() {
		return this.order;
	}

}

EurekaServerInitializerConfiguration通过starter初始化和启动eureka,并抛出两个事件:EurekaRegistryAvailableEvent服务注册事件,EurekaServerStartedEvent服务启动事件,EurekaServer初始化核心的代码在eurekaServerBootstrap.contextInitialized

6.EurekaServerBootstrap
/**
 * @author Spencer Gibb
 */
public class EurekaServerBootstrap {

	private static final Log log = LogFactory.getLog(EurekaServerBootstrap.class);

	private static final String TEST = "test";

	private static final String ARCHAIUS_DEPLOYMENT_ENVIRONMENT = "archaius.deployment.environment";

	private static final String EUREKA_ENVIRONMENT = "eureka.environment";

	private static final String DEFAULT = "default";

	private static final String ARCHAIUS_DEPLOYMENT_DATACENTER = "archaius.deployment.datacenter";

	private static final String EUREKA_DATACENTER = "eureka.datacenter";

	protected EurekaServerConfig eurekaServerConfig;

	protected ApplicationInfoManager applicationInfoManager;

	protected EurekaClientConfig eurekaClientConfig;

	protected PeerAwareInstanceRegistry registry;

	protected volatile EurekaServerContext serverContext;
	protected volatile AwsBinder awsBinder;

	public EurekaServerBootstrap(ApplicationInfoManager applicationInfoManager,
			EurekaClientConfig eurekaClientConfig, EurekaServerConfig eurekaServerConfig,
			PeerAwareInstanceRegistry registry, EurekaServerContext serverContext) {
		this.applicationInfoManager = applicationInfoManager;
		this.eurekaClientConfig = eurekaClientConfig;
		this.eurekaServerConfig = eurekaServerConfig;
		this.registry = registry;
		this.serverContext = serverContext;
	}
	//Eureka初始化
	public void contextInitialized(ServletContext context) {
		try {
			//初始化环境
			initEurekaEnvironment();
			//初始化上下文
			initEurekaServerContext();
			//设置上下文属性
			context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
		}
		catch (Throwable e) {
			log.error("Cannot bootstrap eureka server :", e);
			throw new RuntimeException("Cannot bootstrap eureka server :", e);
		}
	}
	//eureka上下文销毁
	public void contextDestroyed(ServletContext context) {
		try {
			log.info("Shutting down Eureka Server..");
			context.removeAttribute(EurekaServerContext.class.getName());

			destroyEurekaServerContext();
			destroyEurekaEnvironment();

		}
		catch (Throwable e) {
			log.error("Error shutting down eureka", e);
		}
		log.info("Eureka Service is now shutdown...");
	}
	//初始化环境,设置一些环境参数
	protected void initEurekaEnvironment() throws Exception {
		log.info("Setting the eureka configuration..");
		//设置数据中心
		String dataCenter = ConfigurationManager.getConfigInstance()
				.getString(EUREKA_DATACENTER);
		if (dataCenter == null) {
			log.info(
					"Eureka data center value eureka.datacenter is not set, defaulting to default");
					
			ConfigurationManager.getConfigInstance()
					.setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, DEFAULT);
		}
		else {
			ConfigurationManager.getConfigInstance()
					.setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, dataCenter);
		}
		//设置Eureka环境
		String environment = ConfigurationManager.getConfigInstance()
				.getString(EUREKA_ENVIRONMENT);
		if (environment == null) {
			ConfigurationManager.getConfigInstance()
					.setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, TEST);
			log.info(
					"Eureka environment value eureka.environment is not set, defaulting to test");
		}
		else {
			ConfigurationManager.getConfigInstance()
					.setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, environment);
		}
	}
	//初始化eurekaServer上下文
	protected void initEurekaServerContext() throws Exception {
		// For backward compatibility
		JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
				XStream.PRIORITY_VERY_HIGH);
		XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
				XStream.PRIORITY_VERY_HIGH);

		if (isAws(this.applicationInfoManager.getInfo())) {
			this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,
					this.eurekaClientConfig, this.registry, this.applicationInfoManager);
			this.awsBinder.start();
		}
		//把EurekaServerContext设置到EurekaServerContextHolder中
		EurekaServerContextHolder.initialize(this.serverContext);

		log.info("Initialized server context");

		// Copy registry from neighboring eureka node
		//从相邻的eureka节点复制注册表,使用的是PeerAwareInstanceRegistryImpl的实现
		int registryCount = this.registry.syncUp();
		this.registry.openForTraffic(this.applicationInfoManager, registryCount);

		// Register all monitoring statistics.
		//注册所有监视统计信息。
		EurekaMonitors.registerAllStats();
	}	

	/**
	 * Server context shutdown hook. Override for custom logic
	 */
	protected void destroyEurekaServerContext() throws Exception {
		EurekaMonitors.shutdown();
		if (this.awsBinder != null) {
			this.awsBinder.shutdown();
		}
		if (this.serverContext != null) {
			this.serverContext.shutdown();
		}
	}

	/**
	 * Users can override to clean up the environment themselves.
	 */
	protected void destroyEurekaEnvironment() throws Exception {
	}

	protected boolean isAws(InstanceInfo selfInstanceInfo) {
		boolean result = DataCenterInfo.Name.Amazon == selfInstanceInfo
				.getDataCenterInfo().getName();
		log.info("isAws returned " + result);
		return result;
	}

}

EurekaServerBootstrap 的contextInitialized方法中做了两个事情

  • 通过initEurekaEnvironment();方法初始化环境,通过ConfigurationManager设置环境相关的参数
  • 通过initEurekaServerContext();初始化上下文,使用PeerAwareInstanceRegistryImpl.syncUp从相邻的eureka节点复制注册表
7.JerseyFilter

在EurekaServerAutoConfiguration中注册了JerseyFilter用来处理所有的/eureka开头的请求

/**
	 * Register the Jersey filter
	 */
	@Bean
	public FilterRegistrationBean jerseyFilterRegistration(
			javax.ws.rs.core.Application eurekaJerseyApp) {
		FilterRegistrationBean bean = new FilterRegistrationBean();
		bean.setFilter(new ServletContainer(eurekaJerseyApp));
		bean.setOrder(Ordered.LOWEST_PRECEDENCE);
		bean.setUrlPatterns(
				Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));

		return bean;
	}

通过FilterRegistrationBean来注册filter,其核心逻辑是交给 ServletContainer 来完成的

public class ServletContainer extends HttpServlet implements Filter {

	...省略...
}
二.EurekaServer初始化流程

这里我们整理一下EurekaServer启动时是以什么样的流程进行初始化的,下面是根据Eureka Server启动断点跟踪出来的流程

1.ServletContainer 初始化

首先ServletContainer 会被创建并进行初始化,调用configure方法进行配置,至于 doFilter方法会在接受到请求时被执行

public class ServletContainer extends HttpServlet implements Filter {
	...省略...
	    public void init(FilterConfig filterConfig) throws ServletException {
        this.filterConfig = filterConfig;
        this.init((WebConfig)(new WebFilterConfig(filterConfig)));
    }
	  protected void configure(WebConfig wc, ResourceConfig rc, WebApplication wa) {
        if (this.getServletConfig() != null) {
            this.configure(this.getServletConfig(), rc, wa);
        } else if (this.filterConfig != null) {
            this.configure(this.filterConfig, rc, wa);
        }

        if (rc instanceof ReloadListener) {
            List notifiers = new ArrayList();
            Object o = rc.getProperties().get("com.sun.jersey.spi.container.ContainerNotifier");
            Iterator i$;
            if (o instanceof ContainerNotifier) {
                notifiers.add((ContainerNotifier)o);
            } else if (o instanceof List) {
                i$ = ((List)o).iterator();

                while(i$.hasNext()) {
                    Object elem = i$.next();
                    if (elem instanceof ContainerNotifier) {
                        notifiers.add((ContainerNotifier)elem);
                    }
                }
            }

            i$ = ServiceFinder.find(ContainerNotifier.class).iterator();

            while(i$.hasNext()) {
                ContainerNotifier cn = (ContainerNotifier)i$.next();
                notifiers.add(cn);
            }

            rc.getProperties().put("com.sun.jersey.spi.container.ContainerNotifier", notifiers);
        }

    }
    protected void configure(FilterConfig fc, ResourceConfig rc, WebApplication wa) {
        rc.getSingletons().add(new ServletContainer.ContextInjectableProvider(FilterConfig.class, fc));
        String regex = (String)rc.getProperty("com.sun.jersey.config.property.WebPageContentRegex");
        if (regex != null && regex.length() > 0) {
            try {
                this.staticContentPattern = Pattern.compile(regex);
            } catch (PatternSyntaxException var6) {
                throw new ContainerException("The syntax is invalid for the regular expression, " + regex + ", associated with the initialization parameter " + "com.sun.jersey.config.property.WebPageContentRegex", var6);
            }
        }

        this.forwardOn404 = rc.getFeature("com.sun.jersey.config.feature.FilterForwardOn404");
        this.filterContextPath = this.filterConfig.getInitParameter("com.sun.jersey.config.feature.FilterContextPath");
        if (this.filterContextPath != null) {
            if (this.filterContextPath.isEmpty()) {
                this.filterContextPath = null;
            } else {
                if (!this.filterContextPath.startsWith("/")) {
                    this.filterContextPath = '/' + this.filterContextPath;
                }

                if (this.filterContextPath.endsWith("/")) {
                    this.filterContextPath = this.filterContextPath.substring(0, this.filterContextPath.length() - 1);
                }
            }
        }

    }
    ...省略...
}
2.Eureka上下文初始化

紧接着EureakServerContext的initialize方法被调用,该方法有 @PostConstruct注解决定了它是初始化方法

@Singleton
public class DefaultEurekaServerContext implements EurekaServerContext {
 @PostConstruct
    @Override
    public void initialize() {
        logger.info("Initializing ...");
        //开始集群节点更新
        peerEurekaNodes.start();
        try {
        	//服务注册器初始化
            registry.init(peerEurekaNodes);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        logger.info("Initialized");
    }

这里做了两个事情

  • 调用 peerEurekaNodes.start(); 定时更新Eureka集群中的节点
  • 调用服务注册器PeerAwareInstanceRegistryImpl的初始化init
3.启动PeerEurekaNodes集群节点更新

PeerEurekaNodes.start被调用,这里通过定时器定时更新Eureka集群节点,默认10m/次

@Singleton
public class PeerEurekaNodes {
	//开始
    public void start() {
    	//创建 一个名字为Eureka-PeerNodesUpdater"单线程的定时执行器
        taskExecutor = Executors.newSingleThreadScheduledExecutor(
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
                        thread.setDaemon(true);
                        return thread;
                    }
                }
        );
        try {
        	//更新集群中的节点中的注册信息
            updatePeerEurekaNodes(resolvePeerUrls());
            //创建runnable线程,业务逻辑为:updatePeerEurekaNodes(resolvePeerUrls());
            Runnable peersUpdateTask = new Runnable() {
                @Override
                public void run() {
                    try {
                        updatePeerEurekaNodes(resolvePeerUrls());
                    } catch (Throwable e) {
                        logger.error("Cannot update the replica Nodes", e);
                    }

                }
            };
            //定时任务
            taskExecutor.scheduleWithFixedDelay(
                    peersUpdateTask,
                    serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                    //定时器时间间隔默认:10分钟peerEurekaNodesUpdateIntervalMs=10 * MINUTES
                    serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                    TimeUnit.MILLISECONDS
            );
        } catch (Exception e) {
            throw new IllegalStateException(e);
        }
        for (PeerEurekaNode node : peerEurekaNodes) {
            logger.info("Replica node URL:  {}", node.getServiceUrl());
        }
    }
}

定时调用updatePeerEurekaNodes方法更新集群,默认10分钟更新一次,更新逻辑是删除旧的节点,添加新的节点,旧的节点调用shutdown做关闭操作,新的节点调用createPeerEurekaNode进行创建,集群节点最终存储在List结构中

4.服务注册器初始化

DefaultEurekaServerContext 中调用完peerEurekaNodes.start();方法后调用PeerAwareInstanceRegistryImpl.init方法进行注册器的初始化

//初始化方法
    @Override
    public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
    	//最后一分钟的复制次数定时器Timer开始
        this.numberOfReplicationsLastMin.start();
        this.peerEurekaNodes = peerEurekaNodes;
        //初始化 ResponseCache(ResponseCacheImpl) ,负责缓存客户端查询的注册表信息
        initializedResponseCache();
		//续约阈值定时更新任务,15min/1次 调用 updateRenewalThreshold()方法 更新
        scheduleRenewalThresholdUpdateTask();
        //初始化远程注册表,默认么有远程Region
        initRemoteRegionRegistry();

        try {
	        //注册到对象监视器
            Monitors.registerObject(this);
        } catch (Throwable e) {
            logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e);
        }
    }

这里我们主要分析两个东西

  • initializedResponseCache 初始化注册表响应缓存
  • scheduleRenewalThresholdUpdateTask 定时更新续约阈值

initializedResponseCache初始化响应缓存

注意:这里有这么一句代码initializedResponseCache,它初始化了一个ResponseCache 响应缓存,ResponseCacheImpl是具体实现,该类中构造了一个readWriteCacheMap读写缓存的Map,和一个只读缓存readOnlyCacheMap的Map。为什么是响应缓存,以为客户端在获取服务注册表的时候就会从readOnlyCacheMap缓存中去获取

public class ResponseCacheImpl implements ResponseCache {
   ...省略...
   
   //只读缓存
    private final ConcurrentMap readOnlyCacheMap = new ConcurrentHashMap();
//读写缓存
    private final LoadingCache readWriteCacheMap;

ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
        this.serverConfig = serverConfig;
        this.serverCodecs = serverCodecs;
        //获取配置,是否是只读缓存,默认true,拉取注册表的时候还会从只读缓存拉取
        this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache();
        this.registry = registry;
		//获取响应缓存更新时间间隔 30s
        long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
        //构建一个 readWriteCacheMap 
        this.readWriteCacheMap =
                CacheBuilder.newBuilder().initialCapacity(1000)
                        .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
                        .removalListener(new RemovalListener() {
                            @Override
                            public void onRemoval(RemovalNotification notification) {
                                Key removedKey = notification.getKey();
                                if (removedKey.hasRegions()) {
                                    Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
                                    regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
                                }
                            }
                        })
                        .build(new CacheLoader() {
                            @Override
                            public Value load(Key key) throws Exception {
                                if (key.hasRegions()) {
                                    Key cloneWithNoRegions = key.cloneWithoutRegions();
                                    regionSpecificKeys.put(cloneWithNoRegions, key);
                                }
                                Value value = generatePayload(key);
                                return value;
                            }
                        });
		//如果使用只读响应缓存,
        if (shouldUseReadOnlyResponseCache) {
        	//每隔responseCacheUpdateIntervalMs=30s执行getCacheUpdateTask
            timer.schedule(getCacheUpdateTask(),
                    new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
                            + responseCacheUpdateIntervalMs),
                    responseCacheUpdateIntervalMs);
        }

        try {
            Monitors.registerObject(this);
        } catch (Throwable e) {
            logger.warn("Cannot register the JMX monitor for the InstanceRegistry", e);
        }
    }
    
    private TimerTask getCacheUpdateTask() {
        return new TimerTask() {
            @Override
            public void run() {
            //如果数据不一致,从readWriteCacheMap缓存更新readOnlyCacheMap缓存
                logger.debug("Updating the client cache from response cache");
                for (Key key : readOnlyCacheMap.keySet()) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Updating the client cache from response cache for key : {} {} {} {}",
                                key.getEntityType(), key.getName(), key.getVersion(), key.getType());
                    }
                    try {
                        CurrentRequestVersion.set(key.getVersion());
                        Value cacheValue = readWriteCacheMap.get(key);
                        Value currentCacheValue = readOnlyCacheMap.get(key);
                        if (cacheValue != currentCacheValue) {
                            readOnlyCacheMap.put(key, cacheValue);
                        }
                    } catch (Throwable th) {
                        logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
                    }
                }
            }
        };
    }
}

scheduleRenewalThresholdUpdateTask 定时更新续约阈值

定时任务每renewalThresholdUpdateIntervalMs=900秒 更新一次续约阀值

/**
	每renewalThresholdUpdateIntervalMs=900秒 更新一次续约阀值
 * Schedule the task that updates renewal threshold periodically.
 * The renewal threshold would be used to determine if the renewals drop
 * dramatically because of network partition and to protect expiring too
 * many instances at a time.
 * 
 */
private void scheduleRenewalThresholdUpdateTask() {
//定时任务
    timer.schedule(new TimerTask() {
                       @Override
                       public void run() {
                       	//更新续约阈值
                           updateRenewalThreshold();
                       }
                   }, serverConfig.getRenewalThresholdUpdateIntervalMs(),
            serverConfig.getRenewalThresholdUpdateIntervalMs());	//900s
}

updateRenewalThreshold是具体的更新逻辑

// PeerAwareInstanceRegistryImpl#updateRenewalThreshold()
/**
 * Updates the renewal threshold based on the current number of
 * renewals. The threshold is a percentage as specified in
 * {@link EurekaServerConfig#getRenewalPercentThreshold()} of renewals
 * received per minute {@link #getNumOfRenewsInLastMin()}.
 */
private void updateRenewalThreshold() {
    try {
    	//获取到注册表
        Applications apps = eurekaClient.getApplications();
        int count = 0;
        // 计算有多少个注册的服务实例
        for (Application app : apps.getRegisteredApplications()) {
            for (InstanceInfo instance : app.getInstances()) {
                if (this.isRegisterable(instance)) {
                    ++count;
                }
            }
        }
        //枷锁
        synchronized (lock) {
            // Update threshold only if the threshold is greater than the
            // current expected threshold of if the self preservation is disabled.
            // 只有当阀值大于当前预期值时或者关闭了自我保护模式才更新  
            
            if ((count * 2) > (serverConfig.getRenewalPercentThreshold() * numberOfRenewsPerMinThreshold)
                    || (!this.isSelfPreservationModeEnabled())) {
                    //判断如果阈值时候大于预期的阈值 或者 关闭了我保护
                    //更新每分钟的预期续订次数:服务数 * 2 ,每个客户端30s/次,1分钟2次
                this.expectedNumberOfRenewsPerMin = count * 2;
                //更新每分钟阈值的续订次数 :服务数 * 2 * 0.85 (百分比阈值) 
                this.numberOfRenewsPerMinThreshold = (int) ((count * 2) * serverConfig.getRenewalPercentThreshold());
            }
        }
        logger.info("Current renewal threshold is : {}", numberOfRenewsPerMinThreshold);
    } catch (Throwable e) {
        logger.error("Cannot update renewal threshold", e);
    }
}

当关闭自我保护,或者当前阈值大于预期阈值,就会更新续约的阈值,那么这是怎么样的一个更新算法呢?

  • 每分钟的预期续订次数 = 服务数 * 2 ,因为: 一个服务30s/一次续约
  • 每分钟阈值 = 服务数 * 2 * 0.85
5.EurekaServer初始化配置

EurekaServerInitializerConfiguration 的start方法会在Spring容器刷新的时候调用,因为它实现了SmartLifecycle接口 , start方法中新开线程调用eurekaServerBootstrap.contextInitialized进行初始化

public void start() {
		new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					//TODO: is this class even needed now?
					eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
					log.info("Started Eureka Server");

					publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
					EurekaServerInitializerConfiguration.this.running = true;
					publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
				}
				catch (Exception ex) {
					// Help!
					log.error("Could not initialize Eureka servlet context", ex);
				}
			}
		}).start();
	}
6.Eureka启动引导

EurekaServerBootstrap .contextInitialized 负责初始化Eureak环境和初始化上下文

//Eureka初始化
	public void contextInitialized(ServletContext context) {
		try {
			//初始化环境
			initEurekaEnvironment();
			//初始化上下文
			initEurekaServerContext();
			//设置上下文属性
			context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
		}
		catch (Throwable e) {
			log.error("Cannot bootstrap eureka server :", e);
			throw new RuntimeException("Cannot bootstrap eureka server :", e);
		}
	}

在初始化上下文的时候会调用 PeerAwareInstanceRegistryImpl.syncUp(); 从相邻的集群节点同步注册表,通过PeerAwareInstanceRegistryImpl.register注册到当前Eureka节点

//初始化eurekaServer上下文
	protected void initEurekaServerContext() throws Exception {
		...省略...
		//把EurekaServerContext设置到EurekaServerContextHolder中
		EurekaServerContextHolder.initialize(this.serverContext);

		log.info("Initialized server context");

		// Copy registry from neighboring eureka node
		//从相邻的eureka节点复制注册表,使用的是PeerAwareInstanceRegistryImpl的实现
		int registryCount = this.registry.syncUp();
		this.registry.openForTraffic(this.applicationInfoManager, registryCount);

		// Register all monitoring statistics.
		//注册所有监视统计信息。
		EurekaMonitors.registerAllStats();
	}

同步相邻节点的注册表PeerAwareInstanceRegistryImpl.syncUp()

@Override
    public int syncUp() {
        // Copy entire entry from neighboring DS node
        int count = 0;
		//getRegistrySyncRetries重试次数默认5次
        for (int i = 0; ((i  0) {
                try {
              	  //通信中断,等待下一次切换实例
                    Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
                } catch (InterruptedException e) {
                    logger.warn("Interrupted during registry transfer..");
                    break;
                }
            }
            //获取注册表
            Applications apps = eurekaClient.getApplications();
            //循环服务列表,依次注册
            for (Application app : apps.getRegisteredApplications()) {
                for (InstanceInfo instance : app.getInstances()) {
                    try {
                        if (isRegisterable(instance)) {
                        	//获取InstanceInfo之后注册到当前节点,保存到 ConcurrentHashMap registry 中缓存起来
                            register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                            count++;
                        }
                    } catch (Throwable t) {
                        logger.error("During DS init copy", t);
                    }
                }
            }
        }
        return count;
    }

到这里EurekaServer就算初始化完成了

总结

在这里插入图片描述

关注
打赏
1651329177
查看更多评论
立即登录/注册

微信扫码登录

0.0455s