答上期问题:
接着上篇文章 Eureka源码深度解析(上) 来看,上篇文章,我们留下了一个问题,就是这些服务注册、续约的方法是什么时候被调用的呢?
我们还是来看下com.netflix.discovery.DiscoveryClient这个类,这个类在应用启动的时候被加载到容器中,肯定会调用其构造方法,构造方法如下:
@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider backupRegistryProvider) {
// 一系列的参数配置
...
try {
// 下面是几个线程池的创建
scheduler = Executors.newScheduledThreadPool(3,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
// 心跳线程池(维持当前应用续约的线程池,用来持续发送心跳请求)
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
...
// 重要方法在这里
initScheduledTasks();
try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register timers", e);
}
...
}
//initScheduledTasks()
private void initScheduledTasks() {
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
// 1.执行刷新任务的定时器
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
//线程任务在这里,主要就是为了执行这个Thread的run()
new CacheRefreshThread()//在1)中继续详细分析
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
// 2.客户端默认注册到eureka,shouldRegisterWithEureka()默认为true
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);
// 3.执行心跳任务的定时器
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
// 主要就是执行该Thread的run()
new HeartbeatThread()// 在2)中详细分析
),
renewalIntervalInSecs, TimeUnit.SECONDS);
// 获取当前注册服务的基本信息
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
...
// 4.将当前服务注册到注册中心
// 在3)中详细分析
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
1)CacheRefreshThread执行刷新任务的线程
class CacheRefreshThread implements Runnable {
public void run() {
refreshRegistry();//主要定时执行该方法
}
}
@VisibleForTesting
void refreshRegistry() {
try {
...
// 获取注册信息,客户端定时去刷新服务信息
boolean success = fetchRegistry(remoteRegionsModified);
if (success) {
registrySize = localRegionApps.get().size();
lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
}
...
} catch (Throwable e) {
logger.error("Cannot fetch registry from server", e);
}
}
//fetchRegistry()
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
// If the delta is disabled or if it is the first time, get all
// applications
Applications applications = getApplications();
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{
...
// 主要执行该方法(发送HTTP请求到注册中心来获取注册信息,并缓存到本地)
getAndStoreFullRegistry();
} else {
getAndUpdateDelta(applications);
}
applications.setAppsHashCode(applications.getReconcileHashCode());
logTotalInstances();
} catch (Throwable e) {
logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e);
return false;
} finally {
if (tracer != null) {
tracer.stop();
}
}
...
}
2)HeartbeatThread执行心跳续约任务的线程
private class HeartbeatThread implements Runnable {
public void run() {
if (renew()) {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
//续约任务
boolean renew() {
EurekaHttpResponse httpResponse;
try {
// 1.发送HTTP请求到注册中心
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
// 2.如果请求响应失败,则重新调用注册方法
if (httpResponse.getStatusCode() == 404) {
REREGISTER_COUNTER.increment();
logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
return register();
}
return httpResponse.getStatusCode() == 200;
} catch (Throwable e) {
logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
return false;
}
}
3)InstanceInfoReplicator.instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds())发送注册请求到注册中心
public void start(int initialDelayMs) {
if (started.compareAndSet(false, true)) {
instanceInfo.setIsDirty(); // for initial register
// 启动一个定时任务,任务指向this,
// class InstanceInfoReplicator implements Runnable ,由该类可知,真正执行的是其run()方法
Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
// run()
public void run() {
try {
discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
// 真正的注册实现在这里,参考上篇问章可知,这里也是发送一个HTTP请求到注册中心
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
总结:由以上代码分析可知,在DiscoveryClient被创建的时候,在其构造方法中,启动了三个线程池,
然后分别启动了三个定时器任务:注册当前服务到注册中心;持续发送心跳进行续约任务;定时刷新注册中心注册细信息到本地
所以可以说,在项目启动的时候这些任务就被执行了。
前言:
下面我们接着来看本期的内容。上期看了作为Eureka客户端的一系列行为,那么做为服务端的注册中心,又做了哪些事情呢?是如何接收客户端的一系列请求呢,注册信息又是如何存储的呢?
想了解Eureka的具体使用的可参考笔者的另一篇文章:https://blog.csdn.net/qq_26323323/article/details/78652849
可知,Eureka服务端,maven引入了spring-cloud-starter-eureka-server,在Application类上引入了@EnableEurekaServer注解,那么一切都是从注解开始的,我们就先从该注解开始分析
1.@EnableEurekaServer
@EnableDiscoveryClient
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(EurekaServerMarkerConfiguration.class)// 主要就是注册该类到Spring中
public @interface EnableEurekaServer {
}
// EurekaServerMarkerConfiguration
@Configuration
public class EurekaServerMarkerConfiguration {
@Bean
public Marker eurekaServerMarkerBean() {
return new Marker();
}
class Marker {
}
}
看下该类还是蛮失望的,基本就是个空架子,那么@EnableEurekaServer究竟是如何提供服务的呢?
通过之前的文章,我们实际可以发现SpringBoot相关项目的一些套路了,很多的类并不是被显示的加载到容器中,而是通过配置的方式,最经典的方式就是放到META-INF/spring.factories文件中去加载,那么我们也来看下spring-cloud-netflix-eureka-server-1.3.1.RELEASE-sources.jar这个包下的这个文件,具体内容如下:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration
通过笔者的另一篇文章https://blog.csdn.net/qq_26323323/article/details/81204284 可知, EnableAutoConfiguration对应的value值列表中的类会在SpringBoot项目启动的时候注册到Spring容器中,那么EurekaServerAutoConfiguration会被默认加载到Spring中,真正的动作应该都在这个类里。
2.EurekaServerAutoConfiguration
@Configuration
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter {
...
@Configuration
protected static class EurekaServerConfigBeanConfiguration {
@Bean
@ConditionalOnMissingBean
// 1.创建并加载EurekaServerConfig的实现类,主要是Eureka-server的配置信息
public EurekaServerConfig eurekaServerConfig(EurekaClientConfig clientConfig) {
EurekaServerConfigBean server = new EurekaServerConfigBean();
if (clientConfig.shouldRegisterWithEureka()) {
// Set a sensible default if we are supposed to replicate
server.setRegistrySyncRetries(5);
}
return server;
}
}
@Bean
@ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled", matchIfMissing = true)
// 2.Eureka-server的可视化界面就是通过EurekaController提供的
public EurekaController eurekaController() {
return new EurekaController(this.applicationInfoManager);
}
...
@Bean
// 3.接收客户端的注册等请求就是通过InstanceRegistry来处理的
// 是真正处理业务的类,接下来会详细分析
public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
ServerCodecs serverCodecs) {
this.eurekaClient.getApplications(); // force initialization
return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
serverCodecs, this.eurekaClient,
this.instanceRegistryProperties.getExpectedNumberOfRenewsPerMin(),
this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
}
@Bean
@ConditionalOnMissingBean
public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry,
ServerCodecs serverCodecs) {
return new PeerEurekaNodes(registry, this.eurekaServerConfig,
this.eurekaClientConfig, serverCodecs, this.applicationInfoManager);
}
@Bean
public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs,
PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,
registry, peerEurekaNodes, this.applicationInfoManager);
}
@Bean
// 4.初始化Eureka-server,会同步其他注册中心的数据到当前注册中心
public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,
EurekaServerContext serverContext) {
return new EurekaServerBootstrap(this.applicationInfoManager,
this.eurekaClientConfig, this.eurekaServerConfig, registry,
serverContext);
}
...
}
总结:通过以上分析可知,EurekaServer在启动的时候,会加载很多bean到Spring容器中,每个bean都实现了各自的功能,鉴于篇幅,笔者只分析最重要的一个bean,也就是真正处理客户端请求的类InstanceRegistry.java,其他类请读者自行分析
3.org.springframework.cloud.netflix.eureka.server.InstanceRegistry
// 1.接收客户端注册请求
public void register(final InstanceInfo info, final boolean isReplication) {
handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication);
super.register(info, isReplication);// 在1)中详细分析
}
// 2.接收客户端下线请求
@Override
public boolean cancel(String appName, String serverId, boolean isReplication) {
handleCancelation(appName, serverId, isReplication);
return super.cancel(appName, serverId, isReplication);// 在2)中详细分析
}
// 3.接收客户端续约请求
@Override
public boolean renew(final String appName, final String serverId,
boolean isReplication) {
log("renew " + appName + " serverId " + serverId + ", isReplication {}"
+ isReplication);
List applications = getSortedApplications();
for (Application input : applications) {
if (input.getName().equals(appName)) {
InstanceInfo instance = null;
for (InstanceInfo info : input.getInstances()) {
if (info.getId().equals(serverId)) {
instance = info;
break;
}
}
publishEvent(new EurekaInstanceRenewedEvent(this, appName, serverId,
instance, isReplication));
break;
}
}
return super.renew(appName, serverId, isReplication);// 在3)中详细分析
}
疑问:上面的方式是在处理客户端的不同请求,但是,客户端发送的是HTTP请求,这只是一个接口,服务端应该也有一个接收HTTP请求的类,然后将接收到的请求封装后委托给InstanceRegistry来处理具体业务。
我们通过查询这些方法被调用的情况可以看到,确实有一个类接收客户端请求,并将具体业务处理委托给InstanceRegistry,这个类就是com.netflix.eureka.resources包下的ApplicationResource、InstanceResource类
注意:这种接收请求的方式是采用jax-rs的方式,有关于jax-rs的技术细节笔者不再赘述,读者可自行查看相关技术实现。
注意:以上方法中均出现了handleRegistration()方法,实际其主要操作就是publishEvent(),发送不同的事件,SpringCloud中没有实现相应的监听器,应该是设置给用户自定义实现的
1)AbstractInstanceRegistry.register(InstanceInfo registrant, int leaseDuration, boolean isReplication)注册
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
read.lock();
// 1.所有的服务信息都添加到registry这个map中,
// registry 格式为:ConcurrentHashMap()
Map gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
// 1.如果没有该服务的信息,则新建,并添加到registry中
if (gMap == null) {
final ConcurrentHashMap gNewMap = new ConcurrentHashMap();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
// 2.existingLease信息即服务的一些注册时间等信息,主要是为了校验该服务是否过期,如果已过期,则剔除
Lease existingLease = gMap.get(registrant.getId());
// Retain the last dirty timestamp without overwriting it, if there is already a lease
if (existingLease != null && (existingLease.getHolder() != null)) {
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
registrant = existingLease.getHolder();
}
} else {
// The lease does not exist and hence it is a new registration
synchronized (lock) {
if (this.expectedNumberOfRenewsPerMin > 0) {
// Since the client wants to cancel it, reduce the threshold
// (1
// for 30 seconds, 2 for a minute)
this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
}
}
logger.debug("No previous lease information found; it is new registration");
}
Lease lease = new Lease(registrant, leaseDuration);
if (existingLease != null) {
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
gMap.put(registrant.getId(), lease);
...
} finally {
read.unlock();
}
}
总结1):通过上述分析可知,服务注册信息最终存放到
// 外层map的key即为应用的服务名,内层map的key为我们设置的eureka.instance.instance-id
// 设置成这种格式,当多个应用提供相同服务时,那么外层map的key都相同,内层map的key不同
private final ConcurrentHashMap registry
= new ConcurrentHashMap();
所有的操作都是针对这个map进行操作
2)AbstractInstanceRegistry.renew(String appName, String id, boolean isReplication)续约
public boolean renew(String appName, String id, boolean isReplication) {
RENEW.increment(isReplication);
// 1.获取对应map
Map gMap = registry.get(appName);
Lease leaseToRenew = null;
if (gMap != null) {
// 2.主要是为了获取当前服务的一些过期信息
leaseToRenew = gMap.get(id);
}
...
renewsLastMin.increment();
// 主要操作在这里,将最新更新时间重置,剔除任务检查的也就是这个最新更新时间
// lastUpdateTimestamp = System.currentTimeMillis() + duration;
leaseToRenew.renew();
return true;
}
}
3)AbstractInstanceRegistry.cancel(String appName, String id, boolean isReplication)下线
public boolean cancel(String appName, String id, boolean isReplication) {
return internalCancel(appName, id, isReplication);
}
// internalCancel()
protected boolean internalCancel(String appName, String id, boolean isReplication) {
try {
read.lock();
CANCEL.increment(isReplication);
// 1.获取gmap
Map gMap = registry.get(appName);
Lease leaseToCancel = null;
if (gMap != null) {
// 2.删除gmap中该服务id
leaseToCancel = gMap.remove(id);
}
...
} else {
// 3.将当前服务的剔除时间置为当前时间
//evictionTimestamp = System.currentTimeMillis();
leaseToCancel.cancel();
// 4.获取服务信息
InstanceInfo instanceInfo = leaseToCancel.getHolder();
String vip = null;
String svip = null;
if (instanceInfo != null) {
// 5.将服务信息置为已删除
instanceInfo.setActionType(ActionType.DELETED);
recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
instanceInfo.setLastUpdatedTimestamp();
vip = instanceInfo.getVIPAddress();
svip = instanceInfo.getSecureVipAddress();
}
invalidateCache(appName, vip, svip);
logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
return true;
}
} finally {
read.unlock();
}
}
总结:
* 服务的注册实际上是将服务信息添加到一个map中,map的key是服务名称,value也是一个map,是提供该服务的所有客户端信息;
* 服务的续约实际上是获取map中该服务的客户端信息,然后修改其最新更新时间
* 服务的下线实际上是删除该map中该服务信息,然后修改服务状态
以上就是Eureka的client和server行为的分析。笔者只分析了最重要的部分,实际Eureka还做了很多事情。
更多行为分析用户可参考:
http://nobodyiam.com/2016/06/25/dive-into-eureka/
https://github.com/xusuy/SpringCloudTutorial/blob/9d6807e5653328b72bf7a44cb50453cb836aa94d/doc/flow-analysis/Eureka_01.md
参考:SpringCloud微服务实战