您当前的位置: 首页 >  spring

墨家巨子@俏如来

暂无认证

  • 2浏览

    0关注

    188博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

十五.SpringCloud源码剖析-Ribbon工作流程

墨家巨子@俏如来 发布时间:2020-10-08 11:33:03 ,浏览量:2

前言

Ribbon是由Netflix公司开源的一个客户端负载均衡器,主要功能是实现服务之间的负载均衡调用,内置丰富的负载均衡算法,本章意在探讨Ribbon的核心工作流程,Ribbon基本使用请看《SpringCloud极简入门-客户端负载均衡Ribbon》

Ribbon的工作流程

我们知道,微服务在启动成功之后,默认30s/次会从注册中心拉取服务注册表到本地缓存起来,而我们使用Ribbon时是通过RestTemplate发起请求,URL以:http://user-server/user/... 服务名方式去调用服务,其实Ribbon干的事情就是根据URL中的服务名去本地的服务注册表中查找服务名对应的服务实例(一个或多个),然后通过负载均衡算法选择其中一个服务后,发起Http请求,那接下来我们就来看一下Ribbon的底层到底是怎么工作的

Ribbon配合RestTemplate使用

在《SpringCloud极简入门-客户端负载均衡Ribbon》中我们有讲解到Ribbon的基本使用,首选需要定义RestTemplate


@SpringBootApplication
@EnableEurekaClient
public class OrderServerApplication1030
{

    //配置一个RestTemplate ,http客户端,支持Rest风格
    //@LoadBalanced :负载均衡注册,让RestTmplate可以实现负载均衡请求
    //这个标签标记RestTemplate可以使用LoadBalancerClient进行负载均衡
    @Bean
    @LoadBalanced
    public RestTemplate restTemplate(){
        return new RestTemplate();
    }
    //省略...

调用服务的是否使用服务名调用

@RestController
public class OrderController {

    //需要配置成Bean
    @Autowired
    private RestTemplate  restTemplate ;

    //浏览器调用该方法
    @RequestMapping(value = "/order/{id}",method = RequestMethod.GET)
    public User getById(@PathVariable("id")Long id){
        //发送http请求调用 user的服务,获取user对象 : RestTemplate
        //user的ip,user的端口,user的Controller路径
        //String url = "http://localhost:1020/user/"+id;
        String url = "http://user-server/user/"+id;

        //发送http请求
        return restTemplate.getForObject(url, User.class);

    }
}

被 @LoadBalanced标记的RestTemplate 可以使用LoadBalancerClient负载均衡客户端实现负载均衡,我们在使用RestTemplate 发起请求的时候需要跟上服务名的方式http://user-server/user/

LoadBalancerClient负载均衡客户端

我们从 @LoadBalanced入手,先看一下LoadBalancerClient它的源码

/**
注解标记RestTemplate 可以使用LoadBalancerClient 负载均衡客户端
 * Annotation to mark a RestTemplate bean to be configured to use a LoadBalancerClient
 * @author Spencer Gibb
 */
@Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {
}

这个注解@LoadBalanced 的作用在注释上说的非常清楚,就是标记RestTemplate课程可以使用使用LoadBalancerClient来实现负载均衡,LoadBalancerClient就是Ribbon实现负载均衡的一个客户端,它在spring-cloud-commons包下,我们可以直接看LoadBalancerClient的源码

/**
客户端负载均衡
 * Represents a client side load balancer
 * @author Spencer Gibb
 */
public interface LoadBalancerClient extends ServiceInstanceChooser {
	//执行请求,会根据serviceId使用负载均衡查找服务
	/**
	使用负载均衡器执行指定服务
	 * execute request using a ServiceInstance from the LoadBalancer for the specified service
	 * 服务Id - 服务ID来查找负载平衡器
	 * @param serviceId the service id to look up the LoadBalancer
	 * 
	 * @param request allows implementations to execute pre and post actions such as
	 * incrementing metrics
	 * 返回选择的服务
	 * @return the result of the LoadBalancerRequest callback on the selected
	 * ServiceInstance
	 */
	 T execute(String serviceId, LoadBalancerRequest request) throws IOException;

	//执行请求,这个方法多了一个参数ServiceInstance ,即请求指定的服务
	/**
	 * execute request using a ServiceInstance from the LoadBalancer for the specified
	 * service
	 * @param serviceId the service id to look up the LoadBalancer
	 * @param serviceInstance the service to execute the request to
	 * @param request allows implementations to execute pre and post actions such as
	 * incrementing metrics
	 * @return the result of the LoadBalancerRequest callback on the selected
	 * ServiceInstance
	 */
	 T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest request) throws IOException;

	//重构URL,把http://myservice/path/to/service重构成http://ip:端口/path/to/service
	/**
	 * Create a proper URI with a real host and port for systems to utilize.
	 * Some systems use a URI with the logical serivce name as the host,
	 * such as http://myservice/path/to/service.  This will replace the
	 * service name with the host:port from the ServiceInstance.
	 * @param instance
	 * @param original a URI with the host as a logical service name
	 * @return a reconstructed URI
	 */
	URI reconstructURI(ServiceInstance instance, URI original);
}

LoadBalancerClient接口三个方法,excute()为执行请求,reconstructURI()用来重构url,它实现了ServiceInstanceChooser 接口,这个接口的作用是用来选择服务的,看下源码

/**
	通过使用负载平衡器,选择一个服务器发送请求。
 * Implemented by classes which use a load balancer to choose a server to
 * send a request to.
 *
 * @author Ryan Baxter
 */
public interface ServiceInstanceChooser {

    /**
      从LoadBalancer中为指定服务选择一个ServiceInstance
     * Choose a ServiceInstance from the LoadBalancer for the specified service
     * 
     * //根据服务id去LoadBalancer查找服务
     * @param serviceId the service id to look up the LoadBalancer
     * 
     * 返回查找到的服务实例ServiceInstance 
     * @return a ServiceInstance that matches the serviceId
     */
    ServiceInstance choose(String serviceId);
}

提供了一个choose方法,根据服务ID serviceId 查找一个ServiceInstance 服务实例,这里的serviceId其实就是http://user-server/… url中带的服务名。

LoadBalancerClient 还有一个默认实现类RibbonLoadBalancerClient,这个实现是针对Ribbon的客户端负载均衡,继承关系如下: 在这里插入图片描述

RibbonLoadBalancerClient是一个非常核心的类,最终的负载均衡的请求处理由它来执行,源码如下:

public class RibbonLoadBalancerClient implements LoadBalancerClient {

	private SpringClientFactory clientFactory;

	public RibbonLoadBalancerClient(SpringClientFactory clientFactory) {
		this.clientFactory = clientFactory;
	}
	//重构URL,找到服务之后,把http://服务名/  格式 重构成 http://ip:port/ 格式
	@Override
	public URI reconstructURI(ServiceInstance instance, URI original) {
		Assert.notNull(instance, "instance can not be null");
		String serviceId = instance.getServiceId();
		RibbonLoadBalancerContext context = this.clientFactory
				.getLoadBalancerContext(serviceId);

		URI uri;
		Server server;
		if (instance instanceof RibbonServer) {
			RibbonServer ribbonServer = (RibbonServer) instance;
			server = ribbonServer.getServer();
			uri = updateToSecureConnectionIfNeeded(original, ribbonServer);
		} else {
			server = new Server(instance.getScheme(), instance.getHost(), instance.getPort());
			IClientConfig clientConfig = clientFactory.getClientConfig(serviceId);
			ServerIntrospector serverIntrospector = serverIntrospector(serviceId);
			uri = updateToSecureConnectionIfNeeded(original, clientConfig,
					serverIntrospector, server);
		}
		return context.reconstructURIWithServer(server, uri);
	}
	//根据服务名,查找服务实例,选择一个返回ServiceInstance 
	@Override
	public ServiceInstance choose(String serviceId) {
		//查找服务
		Server server = getServer(serviceId);
		if (server == null) {
			return null;
		}
		return new RibbonServer(serviceId, server, isSecure(server, serviceId),
				serverIntrospector(serviceId).getMetadata(server));
	}
	//执行请求
	@Override
	public  T execute(String serviceId, LoadBalancerRequest request) throws IOException {
		//获取负载均衡器[重要]
		ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
		//选择服务,使用负载均衡器,根据服务的ID,选择一个服务
		Server server = getServer(loadBalancer);
		if (server == null) {
			throw new IllegalStateException("No instances available for " + serviceId);
		}
		//选择的服务封装成一个RibbonServer:RibbonServer implements ServiceInstance
		RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
				serviceId), serverIntrospector(serviceId).getMetadata(server));
		//执行请求调用服务
		return execute(serviceId, ribbonServer, request);
	}
	//执行请求调用服务
	@Override
	public  T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest request) throws IOException {
		Server server = null;
		if(serviceInstance instanceof RibbonServer) {
			server = ((RibbonServer)serviceInstance).getServer();
		}
		if (server == null) {
			throw new IllegalStateException("No instances available for " + serviceId);
		}

		RibbonLoadBalancerContext context = this.clientFactory
				.getLoadBalancerContext(serviceId);
		RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);

		try {
			//使用 LoadBalancerRequest 向服务发请求
			T returnVal = request.apply(serviceInstance);
			statsRecorder.recordStats(returnVal);
			return returnVal;
		}
		// catch IOException and rethrow so RestTemplate behaves correctly
		catch (IOException ex) {
			statsRecorder.recordStats(ex);
			throw ex;
		}
		catch (Exception ex) {
			statsRecorder.recordStats(ex);
			ReflectionUtils.rethrowRuntimeException(ex);
		}
		return null;
	}

	private ServerIntrospector serverIntrospector(String serviceId) {
		ServerIntrospector serverIntrospector = this.clientFactory.getInstance(serviceId,
				ServerIntrospector.class);
		if (serverIntrospector == null) {
			serverIntrospector = new DefaultServerIntrospector();
		}
		return serverIntrospector;
	}
	//是否是https请求
	private boolean isSecure(Server server, String serviceId) {
		IClientConfig config = this.clientFactory.getClientConfig(serviceId);
		ServerIntrospector serverIntrospector = serverIntrospector(serviceId);
		return RibbonUtils.isSecure(config, serverIntrospector, server);
	}
	//根据服务ID选择服务
	protected Server getServer(String serviceId) {
		return getServer(getLoadBalancer(serviceId));
	}
	
	//负载均衡器选择服务
	protected Server getServer(ILoadBalancer loadBalancer) {
		if (loadBalancer == null) {
			return null;
		}
		return loadBalancer.chooseServer("default"); // TODO: better handling of key
	}
	//根据服务id得到负载均衡器
	protected ILoadBalancer getLoadBalancer(String serviceId) {
		return this.clientFactory.getLoadBalancer(serviceId);
	}
...省略...

解释一下:

  • 这里的ServiceInstance choose(String serviceId)方法的作用是根据ServideId选择一个服务,底层实现是通过LoadBalancer.chooseServer 负载均衡器LoadBalancer来完成的服务的选择的
  • 选择到服务之后调用execute向选择到的服务发起请求,通过LoadBalancerRequest来完成其请求。
RestTemplate的执行流程

RestTmplate发请求时地址 "http://user-server/user/"+id; 中 user-server是当前服务需要调用的目标服务的服务名,那么Ribbon到底是如何实现负载均衡调用的呢?我们可以从这里跟踪一下RestTemplate的执行流程

public class RestTemplate extends InterceptingHttpAccessor implements RestOperations {
...省略...
 @Nullable
    protected  T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback, @Nullable ResponseExtractor responseExtractor) throws RestClientException {
        Assert.notNull(url, "URI is required");
        Assert.notNull(method, "HttpMethod is required");
        ClientHttpResponse response = null;

        Object var14;
        try {
        	//创建请求对象,使用SimpleClientHttpRequestFactory创建ClientHttpRequest 
            ClientHttpRequest request = this.createRequest(url, method);
            if (requestCallback != null) {
            	//设置header和body
                requestCallback.doWithRequest(request);
            }

            response = request.execute();
            this.handleResponse(url, method, response);
            var14 = responseExtractor != null ? responseExtractor.extractData(response) : null;
        } catch (IOException var12) {
            String resource = url.toString();
            String query = url.getRawQuery();
            resource = query != null ? resource.substring(0, resource.indexOf(63)) : resource;
            throw new ResourceAccessException("I/O error on " + method.name() + " request for \"" + resource + "\": " + var12.getMessage(), var12);
        } finally {
            if (response != null) {
                response.close();
            }

        }

        return var14;
    }

请求来到RestTemplate#doExecute方法,首选是通过使用SimpleClientHttpRequestFactory根据url和method创建ClientHttpRequest 请求对象,使用的实现是InterceptingClientHttpRequestFactory,然后使用response = request.execute();去执行请求,一路跟踪,请求来到InterceptingClientHttpRequest#executeInternal

class InterceptingClientHttpRequest extends AbstractBufferingClientHttpRequest {
	//headers请求头 , bufferedOutput输出内容
    protected final ClientHttpResponse executeInternal(HttpHeaders headers, byte[] bufferedOutput) throws IOException {
    	//创建拦截器执行器
        InterceptingClientHttpRequest.InterceptingRequestExecution requestExecution = new InterceptingClientHttpRequest.InterceptingRequestExecution();
        return requestExecution.execute(this, bufferedOutput);
    }

这里通过InterceptingClientHttpRequest.InterceptingRequestExecution() 拦截器执行器去执行请求,请求来到InterceptingClientHttpRequest.InterceptingRequestExecution#execute

 private class InterceptingRequestExecution implements ClientHttpRequestExecution {
        private final Iterator iterator;

        public InterceptingRequestExecution() {
            this.iterator = InterceptingClientHttpRequest.this.interceptors.iterator();
        }

        public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
            if (this.iterator.hasNext()) {
            	//[重要]这里取到的正是  LoadBalancerInterceptor
                ClientHttpRequestInterceptor nextInterceptor = (ClientHttpRequestInterceptor)this.iterator.next();
                return nextInterceptor.intercept(request, body, this);
            } else {
                HttpMethod method = request.getMethod();
                Assert.state(method != null, "No standard HTTP method");
                //如果iterator中没有拦截器了,就创建一个ClientHttpRequest去执行请求
                ClientHttpRequest delegate = InterceptingClientHttpRequest.this.requestFactory.createRequest(request.getURI(), method);
                request.getHeaders().forEach((key, value) -> {
                    delegate.getHeaders().addAll(key, value);
                });
                if (body.length > 0) {
                    if (delegate instanceof StreamingHttpOutputMessage) {
                        StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage)delegate;
                        streamingOutputMessage.setBody((outputStream) -> {
                            StreamUtils.copy(body, outputStream);
                        });
                    } else {
                        StreamUtils.copy(body, delegate.getBody());
                    }
                }
				//执行请求
                return delegate.execute();
            }
        }
    }

InterceptingRequestExecution 中维护了一个Iterator iterator;其中LoadBalancerInterceptor 就在该集合中,所以请求来到LoadBalancerInterceptor #intercept(request, body, this); 方法

//负载均衡拦截器
public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
	//负载均衡客户端[重要]
	private LoadBalancerClient loadBalancer;
	//负载均衡请求创建工厂
	private LoadBalancerRequestFactory requestFactory;
	//初始化
	public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) {
		this.loadBalancer = loadBalancer;
		this.requestFactory = requestFactory;
	}
	//初始化
	public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
		// for backwards compatibility
		this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
	}
	//拦截器核心方法【重要】
	//request请求对象
	//body 内容
	@Override
	public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
			final ClientHttpRequestExecution execution) throws IOException {
		//请求的URL,格式如:http://user-server/user/1 ,user-server是服务名
		final URI originalUri = request.getURI();
		//URL中的服务名
		String serviceName = originalUri.getHost();
		
		Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
		//通过requestFactory.createRequest(request, body, execution)创建LoadBalancerRequest
		//然后调用负载均衡器执行请求,参数:服务名,LoadBalancerRequest
		return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));
	}
}

这里蛮重要的,请求调用了LoadBalancerInterceptor #intercept负载均衡拦截器的拦截方法,获取到URL,从中获取到主机名即调用的服务名(Ribbon客户端服务名),然后使用LoadBalancerRequestFactory 创建了LoadBalancerRequest请求对象,调用loadBalancer#execute 负载均衡器执行请求

ILoadBalancer 选择服务(负载均衡)

请求来到RibbonLoadBalancerClient#execute

	@Override
	public  T execute(String serviceId, LoadBalancerRequest request) throws IOException {
		//获取负载均衡器
		ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
		//loadBalancer选择服务
		Server server = getServer(loadBalancer);
		if (server == null) {
			throw new IllegalStateException("No instances available for " + serviceId);
		}
		//选择的服务封装成RibbonServer 
		RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
				serviceId), serverIntrospector(serviceId).getMetadata(server));
		//LoadBalancerRequest对服务执行请求
		return execute(serviceId, ribbonServer, request);
	}

这里就蛮关键了

  • 首选是通过服务名调用getLoadBalancer方法得到负载均衡器
  • 然后getServer(loadBalancer)是通过负载均衡器选择一个服务,底层会使用IRule的算法
  • 然后将服务封装成RibbonServer 对象,交给LoadBalancerRequest去执行请求

这里的负载均衡器默认会走ZoneAwareLoadBalancer,它是通过SpringClientFactory 从Ribbon上下文对象中获取到的负载均衡器对象,关于这个我们在上一章讨论过

public class RibbonLoadBalancerClient implements LoadBalancerClient {
...省略...
	private SpringClientFactory clientFactory;
	protected ILoadBalancer getLoadBalancer(String serviceId) {
		return this.clientFactory.getLoadBalancer(serviceId);
	}

而得到ILoadBalancer之后,调用getServer(loadBalancer)方法选择服务,我们跟踪一下

public class RibbonLoadBalancerClient implements LoadBalancerClient {
...省略...
	protected Server getServer(ILoadBalancer loadBalancer) {
		if (loadBalancer == null) {
			return null;
		}
		//ZoneAwareLoadBalancer#chooseServer
		return loadBalancer.chooseServer("default"); // TODO: better handling of key
	}

这里loadBalancer.chooseServer("default");请求来到ZoneAwareLoadBalancer#chooseServer,源码如下:

public class ZoneAwareLoadBalancer extends DynamicServerListLoadBalancer {
...省略...
@Override
    public Server chooseServer(Object key) {
        if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size()             
关注
打赏
1651329177
查看更多评论
0.0385s