一.SpringCloud源码剖析-Eureka核心API
二.SpringCloud源码剖析-Eureka Client 初始化过程
三.SpringCloud源码剖析-Eureka服务注册
四.SpringCloud源码剖析-Eureka服务发现
五.SpringCloud源码剖析-Eureka Client服务续约
六.SpringCloud源码剖析-Eureka Client取消注册
七.SpringCloud源码剖析-Eureka Server的自动配置
八.SpringCloud源码剖析-Eureka Server初始化流程
九.SpringCloud源码剖析-Eureka Server服务注册流程
十.SpringCloud源码剖析-Eureka Server服务续约
十一.SpringCloud源码剖析-Eureka Server服务注册表拉取
十二.SpringCloud源码剖析-Eureka Server服务剔除
十三.SpringCloud源码剖析-Eureka Server服务下线
一.Eureka Server 核心组件介绍 1.EurekaServerContextEureka服务端上下文对象,包含了初始化,关闭,获取服务配置,获取集群节点,获取服务注册器,获取服务信息管理器等方法,默认实现类是DefaultEurekaServerContext
public interface EurekaServerContext {
//初始化
void initialize() throws Exception;
//关闭
void shutdown() throws Exception;
//获取服务配置
EurekaServerConfig getServerConfig();
//获取集群节点管理管理类
PeerEurekaNodes getPeerEurekaNodes();
//服务器编解码器
ServerCodecs getServerCodecs();
//服务注册器
PeerAwareInstanceRegistry getRegistry();
//instanceInfo实例信息管理器
ApplicationInfoManager getApplicationInfoManager();
}
DefaultEurekaServerContext实现类代码
/**
* Represent the local server context and exposes getters to components of the
* local server such as the registry.
*
* @author David Liu
*/
@Singleton
public class DefaultEurekaServerContext implements EurekaServerContext {
private static final Logger logger = LoggerFactory.getLogger(DefaultEurekaServerContext.class);
private final EurekaServerConfig serverConfig;
private final ServerCodecs serverCodecs;
private final PeerAwareInstanceRegistry registry;
private final PeerEurekaNodes peerEurekaNodes;
private final ApplicationInfoManager applicationInfoManager;
@Inject
public DefaultEurekaServerContext(EurekaServerConfig serverConfig,
ServerCodecs serverCodecs,
PeerAwareInstanceRegistry registry,
PeerEurekaNodes peerEurekaNodes,
ApplicationInfoManager applicationInfoManager) {
this.serverConfig = serverConfig;
this.serverCodecs = serverCodecs;
this.registry = registry;
this.peerEurekaNodes = peerEurekaNodes;
this.applicationInfoManager = applicationInfoManager;
}
// @PostConstruct :EurekaServerContext初始化的时候initialize方法被执行,调用 peerEurekaNodes.start();开启EurekaServer的初始化,
//然后再调用 peerAwareInstanceRegistry的.init(peerEurekaNodes);方法初始化
@PostConstruct
@Override
public void initialize() {
logger.info("Initializing ...");
//PeerEurekaNodes开始初始化
peerEurekaNodes.start();
try {
//peerAwareInstanceRegistry开始初始化
registry.init(peerEurekaNodes);
} catch (Exception e) {
throw new RuntimeException(e);
}
logger.info("Initialized");
}
//EurekaServerContext销毁之前(@PreDestroy)调用shutdown,
//peerAwareInstanceRegistry 注册器的shutdown执行关闭流程
@PreDestroy
@Override
public void shutdown() {
logger.info("Shutting down ...");
//服务注册器关闭
registry.shutdown();
//peerEurekaNodes集群节点关闭
peerEurekaNodes.shutdown();
logger.info("Shut down");
}
...省略...
}
DefaultEurekaServerContext
的initialize初始化方法中做的事情就是在初始化的时候,调用peerEurekaNodes.start();
初始化集群节点, 调用PeerAwareInstanceRegistry.init
初始化注册器,在shutdown
销毁方法中调用PeerAwareInstanceRegistry.shudown
执行注册器的关闭流程,调用peerEurekaNodes.shutdown
执行集群节点的关闭
PeerEurekaNodes
用来管理Eureka集群节点PeerEurekaNode
生命周期的工具被DefaultEurekaServerContext
的initialize初始化方法中执行,源码如下
/**
* Helper class to manage lifecycle of a collection of {@link PeerEurekaNode}s.
*
* @author Tomasz Bak
*/
@Singleton
public class PeerEurekaNodes {
private static final Logger logger = LoggerFactory.getLogger(PeerEurekaNodes.class);
//服务注册接口
protected final PeerAwareInstanceRegistry registry;
//服务端配置对象
protected final EurekaServerConfig serverConfig;
//客户端配置
protected final EurekaClientConfig clientConfig;
protected final ServerCodecs serverCodecs;
//InstanceInfo实例管理器
private final ApplicationInfoManager applicationInfoManager;
//Eureka集群节点集合
private volatile List peerEurekaNodes = Collections.emptyList();
//Eureka集群节点的url集合
private volatile Set peerEurekaNodeUrls = Collections.emptySet();
//定时任务执行器
private ScheduledExecutorService taskExecutor;
//初始化节点工具
@Inject
public PeerEurekaNodes(
PeerAwareInstanceRegistry registry,
EurekaServerConfig serverConfig,
EurekaClientConfig clientConfig,
ServerCodecs serverCodecs,
ApplicationInfoManager applicationInfoManager) {
this.registry = registry;
this.serverConfig = serverConfig;
this.clientConfig = clientConfig;
this.serverCodecs = serverCodecs;
this.applicationInfoManager = applicationInfoManager;
}
//获取集群节点集合,不可修改
public List getPeerNodesView() {
return Collections.unmodifiableList(peerEurekaNodes);
}
//获取集群节点集合
public List getPeerEurekaNodes() {
return peerEurekaNodes;
}
//此实例提供对等复制实例的最小数量,被认为是健康的
public int getMinNumberOfAvailablePeers() {
return serverConfig.getHealthStatusMinNumberOfAvailablePeers();
}
//开始
public void start() {
//创建 一个名字为Eureka-PeerNodesUpdater"单线程的定时执行器
taskExecutor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
thread.setDaemon(true);
return thread;
}
}
);
try {
//更新集群中的节点中的注册信息
updatePeerEurekaNodes(resolvePeerUrls());
//创建runnable线程,业务逻辑为:updatePeerEurekaNodes(resolvePeerUrls());
Runnable peersUpdateTask = new Runnable() {
@Override
public void run() {
try {
updatePeerEurekaNodes(resolvePeerUrls());
} catch (Throwable e) {
logger.error("Cannot update the replica Nodes", e);
}
}
};
//
taskExecutor.scheduleWithFixedDelay(
peersUpdateTask,
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
//定时器时间间隔默认:10分钟peerEurekaNodesUpdateIntervalMs=10 * MINUTES
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
TimeUnit.MILLISECONDS
);
} catch (Exception e) {
throw new IllegalStateException(e);
}
for (PeerEurekaNode node : peerEurekaNodes) {
logger.info("Replica node URL: {}", node.getServiceUrl());
}
}
//关闭,关闭节点更新的定时任务,清空peerEurekaNodes ,peerEurekaNodeUrls ,调用每个节点的shutDown方法
public void shutdown() {
taskExecutor.shutdown();
List toRemove = this.peerEurekaNodes;
this.peerEurekaNodes = Collections.emptyList();
this.peerEurekaNodeUrls = Collections.emptySet();
for (PeerEurekaNode node : toRemove) {
node.shutDown();
}
}
/**
基于相同的Zone得到Eureka集群中多个节点的url,过滤掉当前节点
* Resolve peer URLs.
*
* @return peer URLs with node's own URL filtered out
*/
protected List resolvePeerUrls() {
InstanceInfo myInfo = applicationInfoManager.getInfo();
String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo);
//配置的eureka地址url
List replicaUrls = EndpointUtils
.getDiscoveryServiceUrls(clientConfig, zone, new EndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo));
int idx = 0;
while (idx 0) {
// Since the client wants to cancel it, reduce the threshold (1 for 30 seconds, 2 for a minute)
//客户下线,降低续约阈值
this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin - 2;
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
}
}
return true;
}
return false;
}
//服务注册
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
//调用父类的注册
super.register(info, leaseDuration, isReplication);
//注册信息同步到集群中其他节点
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
//续约
public boolean renew(final String appName, final String id, final boolean isReplication) {
//调用父类的续约
if (super.renew(appName, id, isReplication)) {
//同步到集群中的其他节点
replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
return true;
}
return false;
}
//修改服务状态
@Override
public boolean statusUpdate(final String appName, final String id,
final InstanceStatus newStatus, String lastDirtyTimestamp,
final boolean isReplication) {
if (super.statusUpdate(appName, id, newStatus, lastDirtyTimestamp, isReplication)) {
//状态同步到其他节点
replicateToPeers(Action.StatusUpdate, appName, id, null, newStatus, isReplication);
return true;
}
return false;
}
//删除状态
@Override
public boolean deleteStatusOverride(String appName, String id,
InstanceStatus newStatus,
String lastDirtyTimestamp,
boolean isReplication) {
if (super.deleteStatusOverride(appName, id, newStatus, lastDirtyTimestamp, isReplication)) {
replicateToPeers(Action.DeleteStatusOverride, appName, id, null, null, isReplication);
return true;
}
return false;
}
//是否启用租约到期
@Override
public boolean isLeaseExpirationEnabled() {
if (!isSelfPreservationModeEnabled()) {
// The self preservation mode is disabled, hence allowing the instances to expire.
return true;
}
return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;
}
//更新续约阈值
private void updateRenewalThreshold() {
try {
Applications apps = eurekaClient.getApplications();
//统计有多少个实例
int count = 0;
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
if (this.isRegisterable(instance)) {
++count;
}
}
}
synchronized (lock) {
//仅当阈值大于当前的预期阈值,或禁用了自我保留时才更新阈值。
// Update threshold only if the threshold is greater than the
// current expected threshold or if self preservation is disabled.
if ((count * 2) > (serverConfig.getRenewalPercentThreshold() * expectedNumberOfRenewsPerMin)
|| (!this.isSelfPreservationModeEnabled())) {
this.expectedNumberOfRenewsPerMin = count * 2;
this.numberOfRenewsPerMinThreshold = (int) ((count * 2) * serverConfig.getRenewalPercentThreshold());
}
}
logger.info("Current renewal threshold is : {}", numberOfRenewsPerMinThreshold);
} catch (Throwable e) {
logger.error("Cannot update renewal threshold", e);
}
}
/**
集群之间的节点复制
* Replicates all eureka actions to peer eureka nodes except for replication
* traffic to this node.
*
*/
private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info /* optional */,
InstanceStatus newStatus /* optional */, boolean isReplication) {
Stopwatch tracer = action.getTimer().start();
try {
if (isReplication) {
numberOfReplicationsLastMin.increment();
}
// If it is a replication already, do not replicate again as this will create a poison replication
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
// If the url represents this host, do not replicate to yourself.
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
}
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
} finally {
tracer.stop();
}
}
/**
集群之间的节点复制
* Replicates all instance changes to peer eureka nodes except for
* replication traffic to this node.
*
*/
private void replicateInstanceActionsToPeers(Action action, String appName,
String id, InstanceInfo info, InstanceStatus newStatus,
PeerEurekaNode node) {
try {
InstanceInfo infoFromRegistry = null;
CurrentRequestVersion.set(Version.V2);
switch (action) {
case Cancel:
node.cancel(appName, id);
break;
case Heartbeat:
InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
case Register:
node.register(info);
break;
case StatusUpdate:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
case DeleteStatusOverride:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;
}
} catch (Throwable t) {
logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
}
}
....省略一些代码....
}
这个服务注册器实现类看起来很复杂它做了那些事情呢
init初始化
:注册表缓存ResponseCache初始化,续约阈值定时更新任务初始化,初始化远程注册表showdown
:执行所有清理和关闭操作syncUp
:集群之间的数据同步节点复制cancel
:服务下线,并同步到其他节点register
:服务注册,并同步到其他节点renew
: 续约,并同步到其他节点
EurekaServerAutoConfiguration
通过 @Import(EurekaServerInitializerConfiguration.class)
进行初始化,EurekaServerInitializerConfiguration
实现了SmartLifecycle
,其中的start
方法会再Spring启动过程中,执行LifecycleProcessor().onRefresh()
生命周期处理器刷新的时候被调用,然后再调用EurekaServerBootstrap.contextInitialized
进行初始化Eureka和启动Eureka
/**
* @author Dave Syer
*/
@Configuration
public class EurekaServerInitializerConfiguration
implements ServletContextAware, SmartLifecycle, Ordered {
private static final Log log = LogFactory.getLog(EurekaServerInitializerConfiguration.class);
//EurekaServer 配置
@Autowired
private EurekaServerConfig eurekaServerConfig;
//Servlet上下文
private ServletContext servletContext;
//应用上下文对象
@Autowired
private ApplicationContext applicationContext;
//启动引导
@Autowired
private EurekaServerBootstrap eurekaServerBootstrap;
private boolean running;
private int order = 1;
//初始化Servlet上下文
@Override
public void setServletContext(ServletContext servletContext) {
this.servletContext = servletContext;
}
//开始方法,复写于 SmartLifecycle 在Spring启动的时候,该方法会被地调用,
@Override
public void start() {
new Thread(new Runnable() {
@Override
public void run() {
try {
//TODO: is this class even needed now?
//初始化EurekaServer上下文,启动EurekaServer
eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
log.info("Started Eureka Server");
//发布一个EurekaRegistryAvailableEvent注册事件
publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
//改变running状态true
EurekaServerInitializerConfiguration.this.running = true;
//发布EurekaServer启动事件EurekaServerStartedEvent
publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
}
catch (Exception ex) {
// Help!
log.error("Could not initialize Eureka servlet context", ex);
}
}
}).start();
}
private EurekaServerConfig getEurekaServerConfig() {
return this.eurekaServerConfig;
}
private void publish(ApplicationEvent event) {
this.applicationContext.publishEvent(event);
}
//生命周期,停止,销毁eurekaServer
@Override
public void stop() {
this.running = false;
eurekaServerBootstrap.contextDestroyed(this.servletContext);
}
@Override
public boolean isRunning() {
return this.running;
}
@Override
public int getPhase() {
return 0;
}
@Override
public boolean isAutoStartup() {
return true;
}
@Override
public void stop(Runnable callback) {
callback.run();
}
@Override
public int getOrder() {
return this.order;
}
}
EurekaServerInitializerConfiguration
通过starter初始化和启动eureka,并抛出两个事件:EurekaRegistryAvailableEvent
服务注册事件,EurekaServerStartedEvent
服务启动事件,EurekaServer初始化核心的代码在eurekaServerBootstrap.contextInitialized
中
/**
* @author Spencer Gibb
*/
public class EurekaServerBootstrap {
private static final Log log = LogFactory.getLog(EurekaServerBootstrap.class);
private static final String TEST = "test";
private static final String ARCHAIUS_DEPLOYMENT_ENVIRONMENT = "archaius.deployment.environment";
private static final String EUREKA_ENVIRONMENT = "eureka.environment";
private static final String DEFAULT = "default";
private static final String ARCHAIUS_DEPLOYMENT_DATACENTER = "archaius.deployment.datacenter";
private static final String EUREKA_DATACENTER = "eureka.datacenter";
protected EurekaServerConfig eurekaServerConfig;
protected ApplicationInfoManager applicationInfoManager;
protected EurekaClientConfig eurekaClientConfig;
protected PeerAwareInstanceRegistry registry;
protected volatile EurekaServerContext serverContext;
protected volatile AwsBinder awsBinder;
public EurekaServerBootstrap(ApplicationInfoManager applicationInfoManager,
EurekaClientConfig eurekaClientConfig, EurekaServerConfig eurekaServerConfig,
PeerAwareInstanceRegistry registry, EurekaServerContext serverContext) {
this.applicationInfoManager = applicationInfoManager;
this.eurekaClientConfig = eurekaClientConfig;
this.eurekaServerConfig = eurekaServerConfig;
this.registry = registry;
this.serverContext = serverContext;
}
//Eureka初始化
public void contextInitialized(ServletContext context) {
try {
//初始化环境
initEurekaEnvironment();
//初始化上下文
initEurekaServerContext();
//设置上下文属性
context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
}
catch (Throwable e) {
log.error("Cannot bootstrap eureka server :", e);
throw new RuntimeException("Cannot bootstrap eureka server :", e);
}
}
//eureka上下文销毁
public void contextDestroyed(ServletContext context) {
try {
log.info("Shutting down Eureka Server..");
context.removeAttribute(EurekaServerContext.class.getName());
destroyEurekaServerContext();
destroyEurekaEnvironment();
}
catch (Throwable e) {
log.error("Error shutting down eureka", e);
}
log.info("Eureka Service is now shutdown...");
}
//初始化环境,设置一些环境参数
protected void initEurekaEnvironment() throws Exception {
log.info("Setting the eureka configuration..");
//设置数据中心
String dataCenter = ConfigurationManager.getConfigInstance()
.getString(EUREKA_DATACENTER);
if (dataCenter == null) {
log.info(
"Eureka data center value eureka.datacenter is not set, defaulting to default");
ConfigurationManager.getConfigInstance()
.setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, DEFAULT);
}
else {
ConfigurationManager.getConfigInstance()
.setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, dataCenter);
}
//设置Eureka环境
String environment = ConfigurationManager.getConfigInstance()
.getString(EUREKA_ENVIRONMENT);
if (environment == null) {
ConfigurationManager.getConfigInstance()
.setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, TEST);
log.info(
"Eureka environment value eureka.environment is not set, defaulting to test");
}
else {
ConfigurationManager.getConfigInstance()
.setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, environment);
}
}
//初始化eurekaServer上下文
protected void initEurekaServerContext() throws Exception {
// For backward compatibility
JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
XStream.PRIORITY_VERY_HIGH);
XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
XStream.PRIORITY_VERY_HIGH);
if (isAws(this.applicationInfoManager.getInfo())) {
this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,
this.eurekaClientConfig, this.registry, this.applicationInfoManager);
this.awsBinder.start();
}
//把EurekaServerContext设置到EurekaServerContextHolder中
EurekaServerContextHolder.initialize(this.serverContext);
log.info("Initialized server context");
// Copy registry from neighboring eureka node
//从相邻的eureka节点复制注册表,使用的是PeerAwareInstanceRegistryImpl的实现
int registryCount = this.registry.syncUp();
this.registry.openForTraffic(this.applicationInfoManager, registryCount);
// Register all monitoring statistics.
//注册所有监视统计信息。
EurekaMonitors.registerAllStats();
}
/**
* Server context shutdown hook. Override for custom logic
*/
protected void destroyEurekaServerContext() throws Exception {
EurekaMonitors.shutdown();
if (this.awsBinder != null) {
this.awsBinder.shutdown();
}
if (this.serverContext != null) {
this.serverContext.shutdown();
}
}
/**
* Users can override to clean up the environment themselves.
*/
protected void destroyEurekaEnvironment() throws Exception {
}
protected boolean isAws(InstanceInfo selfInstanceInfo) {
boolean result = DataCenterInfo.Name.Amazon == selfInstanceInfo
.getDataCenterInfo().getName();
log.info("isAws returned " + result);
return result;
}
}
EurekaServerBootstrap 的contextInitialized方法中做了两个事情
- 通过
initEurekaEnvironment();
方法初始化环境,通过ConfigurationManager
设置环境相关的参数 - 通过
initEurekaServerContext();
初始化上下文,使用PeerAwareInstanceRegistryImpl.syncUp
从相邻的eureka节点复制注册表
在EurekaServerAutoConfiguration中注册了JerseyFilter用来处理所有的/eureka开头的请求
/**
* Register the Jersey filter
*/
@Bean
public FilterRegistrationBean jerseyFilterRegistration(
javax.ws.rs.core.Application eurekaJerseyApp) {
FilterRegistrationBean bean = new FilterRegistrationBean();
bean.setFilter(new ServletContainer(eurekaJerseyApp));
bean.setOrder(Ordered.LOWEST_PRECEDENCE);
bean.setUrlPatterns(
Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));
return bean;
}
通过FilterRegistrationBean来注册filter,其核心逻辑是交给 ServletContainer 来完成的
public class ServletContainer extends HttpServlet implements Filter {
...省略...
}
二.EurekaServer初始化流程
这里我们整理一下EurekaServer启动时是以什么样的流程进行初始化的,下面是根据Eureka Server启动断点跟踪出来的流程
1.ServletContainer 初始化首先ServletContainer 会被创建并进行初始化,调用configure方法进行配置,至于 doFilter方法会在接受到请求时被执行
public class ServletContainer extends HttpServlet implements Filter {
...省略...
public void init(FilterConfig filterConfig) throws ServletException {
this.filterConfig = filterConfig;
this.init((WebConfig)(new WebFilterConfig(filterConfig)));
}
protected void configure(WebConfig wc, ResourceConfig rc, WebApplication wa) {
if (this.getServletConfig() != null) {
this.configure(this.getServletConfig(), rc, wa);
} else if (this.filterConfig != null) {
this.configure(this.filterConfig, rc, wa);
}
if (rc instanceof ReloadListener) {
List notifiers = new ArrayList();
Object o = rc.getProperties().get("com.sun.jersey.spi.container.ContainerNotifier");
Iterator i$;
if (o instanceof ContainerNotifier) {
notifiers.add((ContainerNotifier)o);
} else if (o instanceof List) {
i$ = ((List)o).iterator();
while(i$.hasNext()) {
Object elem = i$.next();
if (elem instanceof ContainerNotifier) {
notifiers.add((ContainerNotifier)elem);
}
}
}
i$ = ServiceFinder.find(ContainerNotifier.class).iterator();
while(i$.hasNext()) {
ContainerNotifier cn = (ContainerNotifier)i$.next();
notifiers.add(cn);
}
rc.getProperties().put("com.sun.jersey.spi.container.ContainerNotifier", notifiers);
}
}
protected void configure(FilterConfig fc, ResourceConfig rc, WebApplication wa) {
rc.getSingletons().add(new ServletContainer.ContextInjectableProvider(FilterConfig.class, fc));
String regex = (String)rc.getProperty("com.sun.jersey.config.property.WebPageContentRegex");
if (regex != null && regex.length() > 0) {
try {
this.staticContentPattern = Pattern.compile(regex);
} catch (PatternSyntaxException var6) {
throw new ContainerException("The syntax is invalid for the regular expression, " + regex + ", associated with the initialization parameter " + "com.sun.jersey.config.property.WebPageContentRegex", var6);
}
}
this.forwardOn404 = rc.getFeature("com.sun.jersey.config.feature.FilterForwardOn404");
this.filterContextPath = this.filterConfig.getInitParameter("com.sun.jersey.config.feature.FilterContextPath");
if (this.filterContextPath != null) {
if (this.filterContextPath.isEmpty()) {
this.filterContextPath = null;
} else {
if (!this.filterContextPath.startsWith("/")) {
this.filterContextPath = '/' + this.filterContextPath;
}
if (this.filterContextPath.endsWith("/")) {
this.filterContextPath = this.filterContextPath.substring(0, this.filterContextPath.length() - 1);
}
}
}
}
...省略...
}
2.Eureka上下文初始化
紧接着EureakServerContext的initialize方法被调用,该方法有 @PostConstruct注解决定了它是初始化方法
@Singleton
public class DefaultEurekaServerContext implements EurekaServerContext {
@PostConstruct
@Override
public void initialize() {
logger.info("Initializing ...");
//开始集群节点更新
peerEurekaNodes.start();
try {
//服务注册器初始化
registry.init(peerEurekaNodes);
} catch (Exception e) {
throw new RuntimeException(e);
}
logger.info("Initialized");
}
这里做了两个事情
- 调用 peerEurekaNodes.start(); 定时更新Eureka集群中的节点
- 调用服务注册器PeerAwareInstanceRegistryImpl的初始化init
PeerEurekaNodes.start被调用,这里通过定时器定时更新Eureka集群节点,默认10m/次
@Singleton
public class PeerEurekaNodes {
//开始
public void start() {
//创建 一个名字为Eureka-PeerNodesUpdater"单线程的定时执行器
taskExecutor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
thread.setDaemon(true);
return thread;
}
}
);
try {
//更新集群中的节点中的注册信息
updatePeerEurekaNodes(resolvePeerUrls());
//创建runnable线程,业务逻辑为:updatePeerEurekaNodes(resolvePeerUrls());
Runnable peersUpdateTask = new Runnable() {
@Override
public void run() {
try {
updatePeerEurekaNodes(resolvePeerUrls());
} catch (Throwable e) {
logger.error("Cannot update the replica Nodes", e);
}
}
};
//定时任务
taskExecutor.scheduleWithFixedDelay(
peersUpdateTask,
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
//定时器时间间隔默认:10分钟peerEurekaNodesUpdateIntervalMs=10 * MINUTES
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
TimeUnit.MILLISECONDS
);
} catch (Exception e) {
throw new IllegalStateException(e);
}
for (PeerEurekaNode node : peerEurekaNodes) {
logger.info("Replica node URL: {}", node.getServiceUrl());
}
}
}
定时调用updatePeerEurekaNodes
方法更新集群,默认10分钟更新一次,更新逻辑是删除旧的节点,添加新的节点,旧的节点调用shutdown做关闭操作,新的节点调用createPeerEurekaNode进行创建,集群节点最终存储在List结构中
在DefaultEurekaServerContext
中调用完peerEurekaNodes.start();
方法后调用PeerAwareInstanceRegistryImpl
.init方法进行注册器的初始化
//初始化方法
@Override
public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
//最后一分钟的复制次数定时器Timer开始
this.numberOfReplicationsLastMin.start();
this.peerEurekaNodes = peerEurekaNodes;
//初始化 ResponseCache(ResponseCacheImpl) ,负责缓存客户端查询的注册表信息
initializedResponseCache();
//续约阈值定时更新任务,15min/1次 调用 updateRenewalThreshold()方法 更新
scheduleRenewalThresholdUpdateTask();
//初始化远程注册表,默认么有远程Region
initRemoteRegionRegistry();
try {
//注册到对象监视器
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e);
}
}
这里我们主要分析两个东西
- initializedResponseCache 初始化注册表响应缓存
- scheduleRenewalThresholdUpdateTask 定时更新续约阈值
initializedResponseCache初始化响应缓存
注意:这里有这么一句代码initializedResponseCache,它初始化了一个ResponseCache 响应缓存,ResponseCacheImpl是具体实现,该类中构造了一个readWriteCacheMap读写缓存的Map,和一个只读缓存readOnlyCacheMap的Map。为什么是响应缓存,以为客户端在获取服务注册表的时候就会从readOnlyCacheMap缓存中去获取
public class ResponseCacheImpl implements ResponseCache {
...省略...
//只读缓存
private final ConcurrentMap readOnlyCacheMap = new ConcurrentHashMap();
//读写缓存
private final LoadingCache readWriteCacheMap;
ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
this.serverConfig = serverConfig;
this.serverCodecs = serverCodecs;
//获取配置,是否是只读缓存,默认true,拉取注册表的时候还会从只读缓存拉取
this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache();
this.registry = registry;
//获取响应缓存更新时间间隔 30s
long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
//构建一个 readWriteCacheMap
this.readWriteCacheMap =
CacheBuilder.newBuilder().initialCapacity(1000)
.expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
.removalListener(new RemovalListener() {
@Override
public void onRemoval(RemovalNotification notification) {
Key removedKey = notification.getKey();
if (removedKey.hasRegions()) {
Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
}
}
})
.build(new CacheLoader() {
@Override
public Value load(Key key) throws Exception {
if (key.hasRegions()) {
Key cloneWithNoRegions = key.cloneWithoutRegions();
regionSpecificKeys.put(cloneWithNoRegions, key);
}
Value value = generatePayload(key);
return value;
}
});
//如果使用只读响应缓存,
if (shouldUseReadOnlyResponseCache) {
//每隔responseCacheUpdateIntervalMs=30s执行getCacheUpdateTask
timer.schedule(getCacheUpdateTask(),
new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
+ responseCacheUpdateIntervalMs),
responseCacheUpdateIntervalMs);
}
try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register the JMX monitor for the InstanceRegistry", e);
}
}
private TimerTask getCacheUpdateTask() {
return new TimerTask() {
@Override
public void run() {
//如果数据不一致,从readWriteCacheMap缓存更新readOnlyCacheMap缓存
logger.debug("Updating the client cache from response cache");
for (Key key : readOnlyCacheMap.keySet()) {
if (logger.isDebugEnabled()) {
logger.debug("Updating the client cache from response cache for key : {} {} {} {}",
key.getEntityType(), key.getName(), key.getVersion(), key.getType());
}
try {
CurrentRequestVersion.set(key.getVersion());
Value cacheValue = readWriteCacheMap.get(key);
Value currentCacheValue = readOnlyCacheMap.get(key);
if (cacheValue != currentCacheValue) {
readOnlyCacheMap.put(key, cacheValue);
}
} catch (Throwable th) {
logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
}
}
}
};
}
}
scheduleRenewalThresholdUpdateTask 定时更新续约阈值
定时任务每renewalThresholdUpdateIntervalMs=900秒 更新一次续约阀值
/**
每renewalThresholdUpdateIntervalMs=900秒 更新一次续约阀值
* Schedule the task that updates renewal threshold periodically.
* The renewal threshold would be used to determine if the renewals drop
* dramatically because of network partition and to protect expiring too
* many instances at a time.
*
*/
private void scheduleRenewalThresholdUpdateTask() {
//定时任务
timer.schedule(new TimerTask() {
@Override
public void run() {
//更新续约阈值
updateRenewalThreshold();
}
}, serverConfig.getRenewalThresholdUpdateIntervalMs(),
serverConfig.getRenewalThresholdUpdateIntervalMs()); //900s
}
updateRenewalThreshold是具体的更新逻辑
// PeerAwareInstanceRegistryImpl#updateRenewalThreshold()
/**
* Updates the renewal threshold based on the current number of
* renewals. The threshold is a percentage as specified in
* {@link EurekaServerConfig#getRenewalPercentThreshold()} of renewals
* received per minute {@link #getNumOfRenewsInLastMin()}.
*/
private void updateRenewalThreshold() {
try {
//获取到注册表
Applications apps = eurekaClient.getApplications();
int count = 0;
// 计算有多少个注册的服务实例
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
if (this.isRegisterable(instance)) {
++count;
}
}
}
//枷锁
synchronized (lock) {
// Update threshold only if the threshold is greater than the
// current expected threshold of if the self preservation is disabled.
// 只有当阀值大于当前预期值时或者关闭了自我保护模式才更新
if ((count * 2) > (serverConfig.getRenewalPercentThreshold() * numberOfRenewsPerMinThreshold)
|| (!this.isSelfPreservationModeEnabled())) {
//判断如果阈值时候大于预期的阈值 或者 关闭了我保护
//更新每分钟的预期续订次数:服务数 * 2 ,每个客户端30s/次,1分钟2次
this.expectedNumberOfRenewsPerMin = count * 2;
//更新每分钟阈值的续订次数 :服务数 * 2 * 0.85 (百分比阈值)
this.numberOfRenewsPerMinThreshold = (int) ((count * 2) * serverConfig.getRenewalPercentThreshold());
}
}
logger.info("Current renewal threshold is : {}", numberOfRenewsPerMinThreshold);
} catch (Throwable e) {
logger.error("Cannot update renewal threshold", e);
}
}
当关闭自我保护,或者当前阈值大于预期阈值,就会更新续约的阈值,那么这是怎么样的一个更新算法呢?
- 每分钟的预期续订次数 = 服务数 * 2 ,因为: 一个服务30s/一次续约
- 每分钟阈值 = 服务数 * 2 * 0.85
EurekaServerInitializerConfiguration 的start方法会在Spring容器刷新的时候调用,因为它实现了SmartLifecycle接口 , start方法中新开线程调用eurekaServerBootstrap.contextInitialized进行初始化
public void start() {
new Thread(new Runnable() {
@Override
public void run() {
try {
//TODO: is this class even needed now?
eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
log.info("Started Eureka Server");
publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
EurekaServerInitializerConfiguration.this.running = true;
publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
}
catch (Exception ex) {
// Help!
log.error("Could not initialize Eureka servlet context", ex);
}
}
}).start();
}
6.Eureka启动引导
EurekaServerBootstrap .contextInitialized 负责初始化Eureak环境和初始化上下文
//Eureka初始化
public void contextInitialized(ServletContext context) {
try {
//初始化环境
initEurekaEnvironment();
//初始化上下文
initEurekaServerContext();
//设置上下文属性
context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
}
catch (Throwable e) {
log.error("Cannot bootstrap eureka server :", e);
throw new RuntimeException("Cannot bootstrap eureka server :", e);
}
}
在初始化上下文的时候会调用 PeerAwareInstanceRegistryImpl.syncUp(); 从相邻的集群节点同步注册表,通过PeerAwareInstanceRegistryImpl.register注册到当前Eureka节点
//初始化eurekaServer上下文
protected void initEurekaServerContext() throws Exception {
...省略...
//把EurekaServerContext设置到EurekaServerContextHolder中
EurekaServerContextHolder.initialize(this.serverContext);
log.info("Initialized server context");
// Copy registry from neighboring eureka node
//从相邻的eureka节点复制注册表,使用的是PeerAwareInstanceRegistryImpl的实现
int registryCount = this.registry.syncUp();
this.registry.openForTraffic(this.applicationInfoManager, registryCount);
// Register all monitoring statistics.
//注册所有监视统计信息。
EurekaMonitors.registerAllStats();
}
同步相邻节点的注册表PeerAwareInstanceRegistryImpl.syncUp()
@Override
public int syncUp() {
// Copy entire entry from neighboring DS node
int count = 0;
//getRegistrySyncRetries重试次数默认5次
for (int i = 0; ((i 0) {
try {
//通信中断,等待下一次切换实例
Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
} catch (InterruptedException e) {
logger.warn("Interrupted during registry transfer..");
break;
}
}
//获取注册表
Applications apps = eurekaClient.getApplications();
//循环服务列表,依次注册
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
try {
if (isRegisterable(instance)) {
//获取InstanceInfo之后注册到当前节点,保存到 ConcurrentHashMap registry 中缓存起来
register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
count++;
}
} catch (Throwable t) {
logger.error("During DS init copy", t);
}
}
}
}
return count;
}
到这里EurekaServer就算初始化完成了
总结