- 简介
- 配置信息
- 传输模块配置
- 通用网络配置
- Transport 总体架构
- 网络层
- 1. 网络模块初始化
- 2. Netty4Transport
- 3. Netty4HttpServerTransport
- 服务层
- 1. 连接到节点
- 2.发送请求
- 3. 定义对Response的处理
- 4. 定义对请求的处理
- REST解析和处理
- RPC实现
- RPC的注册和映射
- 1. ActionModule类中的注册
- 2. TransportService类中的注册
- 根据Action获取处理类
- 1. REST请求触发
- 2. TcpTransport收到RPC请求
- 思考与总结
- 关注我的公众号【宝哥大数据】,更多干货
传输模块用于集群内节点之间的内部通信。从一个节点到另一个节点的每个调用都使用传输模块。例如,当一个节点处理HTTP GET请求时,实际上是由持有该数据的另一个节点处理的,这就需要处理HTTP GET请求的节点将请求通过传输模块转发给另一个节点。
传输机制是完全异步的,这意味着没有阻塞线程等待响应。使用异步通信的好处是解决了C10k问题,也是广播请求/收集结果(例如,ES中的搜索)的理想解决方案。
配置信息 传输模块配置TCP transport是传输模块基于TCP的实现,有以下配置,如下表所示。
默认情况下,传输模块使用9300端口通信。该端口承载了三种不同的业务:客户端的JavaAPI通信,节点间的内部通信,以及自动发现的通信。使用transport profiles, 可以将三者配置到不同的地址和端口。例如:
transport.profiles.default.port: 9300-9400
transport.profiles.default.bind_host: 10.0.0.1
transport.profiles.client.port: 9500-9600
transport.profiles.client.bind_host: 192.168.0.1
transport.profiles.dmz.port: 9700-9800
transport.profiles.dmz.bind_host: 172.16.1.2
这在部分场景下很有用,例如,想保护集群内部通信的9300端口,只对外开放9200端口,但又不想影响Java API的使用,那么可以为Java API单独配置一个服务端口。
传输模块有一个专用的tracer 日志,当它被激活时,日志系统会记录传入和传出的请求。可以通过动态设置org.elasticsearch.transport.TransportService.tracer
为TRACE级别来开启:
curl -X PUT "localhost:9200/_cluster/settings" -H 'Content-Type: application/json' -d'
{
"transient" : {
"logger.org.elasticsearch.transport.TransportService.tracer" : "TRACE"
}
}
还可以使用一组include和exclude通配符模式来控制tracer 的内容。默认情况下,除了ping的故障检测请求,所有请求都将被跟踪:
curl -X PUT "localhost:9200/_ cluster/settings" -H 'Content- -Type: application/json' -d'
{
"transient" : {
"transport.tracer.include" : "*",
"transport.tracer.exclude" : "internal:discovery/zen/ fd*"
}
}
通用网络配置
除了传输模块配置,还有一些其他的网络配置信息。
network.host
节点将绑定到此主机名或IP地址,并将此主机公布到集群中的其他节点。接受一个IP地址、主机名、一个特殊值,或者是这些内容任意组合的数组。默认为_local_
。
discovery.zen.ping.unicast.hosts
为了加入集群,节点需要知道集群中一些其他节点的主机名或IP地址。此设置提供节点将尝试联系其他节点的初始列表,接受IP地址或主机名。如果主机名被解析为多个IP地址,那么每个IP地址都将用于discovery。轮询DNS也可以在discovery中使用,每次返回一个不同的IP地址,如果IP地址不存在则抛出异常,并在下一轮ping时重新解析(取决于JVM DNS缓存)。默认值为[“127.0.0.1”, “[:1]”]。
http.port
用于接受HTTP请求的绑定端口。接受单个值或范围。如果指定了一个范围,则该节点将绑定到该范围内的第一个可用端口。默认为9200~9300。
transport.tcp.port
为节点之间的通信绑定端口。接受单个值或范围。如果指定了一个范围,则该节点将绑定到该范围内的第一个可用端口。默认为9300~9400。 network.host
允许设置以下特殊值,如下表所示。
默认情况下,这些特殊的值在IPv4和IPv6上都可以正常工作,但是也可以使用:ipv4
或:pv6
说明符来明确指定。例如,en0:ipv4_
只绑定到en0的IPv4地址。
network.host
是常用的设置监听地址的方式,同时设置绑定主机和发布主机。在一些高级用例中,可能需要为它们设置不同值。
network.bind_host
指定节点应该绑定到哪个网络接口,以便监听传入的请求。一个节点可以绑定到多个接口,例如,两个网卡,或者一个站点本地地址和一个回环地址。默认为network.host
。
network.publish_host
发布主机是节点向集群中发布的单个网口,以便其他节点可以连接它。目前,ES可以绑定到多个地址,但只发布一个地址。如果没有指定,则默认为来自network..host
的“最佳”地址。按IPv4/IPv6栈优先排序,然后是可达性。如果为network.host
设置多个地址,但在节点到节点的通信中依赖于特定的地址,那么应该显式地设network.publish_host
。
以上两个配置都可以像network.host
一样配置。它们接收IP地址、主机名和特殊值。
基于TCP的所有组件(如HTTP和传输模块)共享以下高级设置,如下表所示。
ES的传输模块和HTTP传输模块都是基于Netty实现的。Netty是一个Java实现的高性能异步网络通信库,基于epoll/kqueue实现事件处理。
我们说的传输模块,目前只有一种实现,就是TCP传输模块。如上节所述,TCP传输模块有三类用处:内部节点间通信(我们称为RPC)、JavaAPI 客户端,以及节点发现。HTTP模块负责服务用户的REST请求。
网络层网络层是对内部各种传输模块的抽象,使得上层发送/接收数据时不必关心底层的实现,使用Netty 还是其他类库,上层并不关心。 在内部实现中,传输模块和HTTP模块统一封装到NetworkModule类中。顾名思义,该类是在TCP传输模块和HTTP传输模块之上封装的,实现了对各种传输模块的初始化,上层的发送和接收依赖对网络模块的引用。
该类的几个重要数据成员如下表所示。
上述三个成员在NetworkModule的构造函数(节点启动时调用)中通过插件方式加载。
主要对外接口如下表所示。
初始化NetworkModule传输模块和HTTP传输模块之后,上层就可以通过该类对外提供的接口获取某个传输模块了。该类返回的各种传输模块和拦截器都是虚类型,因此本质上就是对网络层的一个抽象。
NetworkModule内部组件的初始化是通过插件方式加载的。在其构造函数中传入NetworkPlugin列表,NetworkPlugin 是一个接口类, Netty4Plugin 从这个接口实现,如下图所示。
在Netty4Plugin中,分别构建了Netty4Transport 和Netty4HttpServerTransport,用于传输模块和HTTP传输模块:
public class Netty4Plugin extends Plugin implements NetworkPlugin {
//构建Netty4Transport作为Transport
public Map getTransports(...) {
return Collections.singletonMap (NETTY_TRANSPORT_NAME, () -> new Netty4Transport (settings, threadPool, networkService, bigArrays, namedWriteableRegistry, circuitBreakerService));
}
// 构建Netty4HttpServerTransport MF h HttpServerTransport
public Map getHttpTransports(...) {
return Collections.singletonMap (NETTY_HTTP_TRANSPORT_NAME, () -> new Netty4HttpServerTransport (settings, networkService,bigArrays, threadPool, xContentRegistry, dispatcher));
}
}
根据加载的NetworkPlugin 插件和定义好的REST处理器初始化NetworkModule:
//已注册好的REST请求处理器
final RestController restController = actionModule.getRestController() ;
//初始化Networ kModule
final NetworkModule networkModule = new NetworkModule (settings, false, pluginsService.filterPlugins (NetworkPlugin.class), threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, restController);
2. Netty4Transport
Netty4Transport用于RPC等内部通信,其继承自TcpTransport, 类结构如下图所示。
在初始化时构建了Client 和Server:
protected void doStart () {
//初始化Client
bootstrap = createBootstrap() ;
if (NetworkService . NETWORK_ SERVER.get (settings)) {
for(ProfileSettings profileSettings : profileSettings) {
//初始化Serve
createServerBootstrap (profileSettings) ;
bindServer (profileSettings) ;
}
}
}
3. Netty4HttpServerTransport
Netty4HttpServerTransport 用于响应REST请求,其继承自HttpServerTransport,如下图所示:
同样在Netty4HttpServerTransport#doStart中创建一个 HTTP Server 监听端口,当收到用户请求时,调用dispatchRequest 对不同的请求执行相应的处理。哪种请求被哪个类处理这种信息注册在ActionModule类中。
服务层服务层指网络模块的.上层应用层,基于网络模块提供的Transport 来收/发数据。本节重点分析节点间通信,该通信使用TransportService类实现,在网络模块提供的Transport 基础上,该类提供连接到节点、发送数据、注册事件响应函数等方法。其初始化过程如下:
//通过网络模块获取已初始化的Transport
final Transport transport = networkModule.getTransportSupplier().get() ;
//基于网络模块的Transport构建TransportService
final TransportService transportService = newTransportService(settings, transport,threadPool, networkModule.getTransportInterceptor(),localNodeFactory, settingsModule.getClusterSettings());
在节点内部任何通信前,首先需要连接到集群的其他节点。
1. 连接到节点在默认配置下,ES的每个节点与其他节点都会保持13个长连接。每个连接有各自的用途。可以通过配置调节某种业务使用的连接数。
当本节点连接到某个特定的节点时, TransportService通过网络层提供的transport.connectToNode完成连接。在完成连接后,内部维护一个NodeChannels类对象,表示节点到节点的连接。其中包括多个TCP连接(默认为13个),并记录了每个连接的用途。目前,这些连接有以下几类用途,定义在TransportRequestOptions.Type中。
public enum Type {
RECOVERY,//用于恢复.
BULK,//用于批量写入
REG, //其他用途,例如,加入集群等
STATE, //传输集群状态
PING //用 作nodeFD或masterFD的ping请求
}
这些连接被ConnectionProfile统一管理:
static ConnectionProfile buildDe faul tConnect ionProfile (Settings settings) {
//默认为 2个
int connectionsPerNodeRecovery = CONNECTIONS_PER_NODE_RECOVERY.get(settings);
//默认为3个
int connections PerNodeBulk = CONNECTIONS_PER_NODE_BULK.get(settings) ;
//默认为6个
int connect ionsPerNodeReg = CONNECTIONS_PER_NODE_REG.get(settings) ;
//默认为1个
int connectionsPerNodeState = CONNECTIONS_PER_NODE_STATE.ge (settings) ;
//默认为1个
int connectionsPerNodePing = CONNECTIONS_PER_NODE_PING.get(settings) ;
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
builder.addConnections (connectionsPerNodeBulk, TransportRequestoptions.Type.BULK);
// .....
return builder.build();
}
节点间每种用途的连接数量可以通过以下配置进行调整:
transport.connections_per_node.recovery
transport.connections_per_node.bulk
transport.connections_per_node.reg
transport.connections_per_node.state
transport.connections_per_node.ping
NodeChannels类保存了这些连接,当发送数据时,根据请求类型找到对应的连接来发送数据。
public final classNodeChannelsimplementsConnection{
//保存每个请求对应的连接
private final Map typeMapping;
//保存已建立的TCP连接
private final List channels;
//目的节点是哪个
private final DiscoveryNode node;
}
建立连接过程如下,如果13个连接中有一个连接失败,则整体认为失败,关闭已建立的连接。
public final NodeChannels openConnection (DiscoveryNode node, ConnectionProfile connectionProfile) throws IOException {
//获取总连接数
int numConnections = connectionProfile.getNumConnections();
List channels = new ArrayList (numConnections);
for (int i = 0; i client.admin().cluster().getSnapshots(getSnapshotsRequest, new RestToXContentListener (channel));
}
当Netty收到HTTP请求后,调用Netty4HttpServerTransport#dispatchRequest, 该方法根据 定义好的Action调用对应的Rest*Action处理类。
RPC实现RPC是远程过程调用的简称,当一个节点需要另一个节点执行某些操作时,例如,创建、删除索引等,向这个节点发送一个RPC请求,ES的RPC基于TCP实现,底层是Netty 的Netty4Transport。每个RPC在内部称为Action,有唯一的名称, 例如,cluster:monitor/main。 当传输模块收到一个RPC请求时,会根据这个Action名称获取对应的处理类。
TransportService类是在网络层之.上对RPC的发送与接收的服务层封装,虽然从模块设计角度来说,网络层的设计对内部是否使用Netty框架是解耦的,除Netty外,也可以使用其他通信框架,但是为了让读者更容易理解,我们看一下从TransportService到Netty4Transport的联系,
如下图所示。
在上图中, Node2调用sendRequest发送请求,发送时传入定义好的TransportResponseHandler,TransportService调用Netty4Transport 的sendRequest 发送数据。当远程节点处理完毕,Netty4Transport的handleResponse方法最终回调发送请求时定义的TransportResponseHandler。
Node1接收请求,通过registerRequestHandler注册Action和对应的处理类TransportRequest-Handler。TransportService 类中维护了Action 与处理类的对应关系。当Netty4Transport 收到请求后,handleRequest方法中调用TransportService 类的getRequestHandler(action)通过(客户端请求中的) Action 获取相应的处理类,最终调用TransportRequestHandler执行对RPC的处理逻辑。
RPC的注册和映射一个RPC请求与处理模块的对应关系在两方面维护:
- 在ActionModule类中注册Action与处理类的映射;
- 通过TransportService#registerRequestHandler方法注册Action名称与对应处理器的映射。
这两种映射关系同时存在。RPC先在ActionModule 类中注册,再调用TransportService#registerRequestHandler在TransportService 类中注册。在大部分情况下,网络层收到请求后根据TransportService注册的Action信息获取对应的处理模块。
1. ActionModule类中的注册与REST Action 的注册类似,内部RPC也注册在ActionModule类中,描述某个Action应该被哪个类处理。一个Action可以理解为RPC调用的名称。Action 与处理类的映射关系如下:
static Map setupActions (List actionPlugins) {
ActionRegistry actions = new ActionRegistry() ;
actions.register (MainAction.INSTANCE, TransportMainAction.class) ;
actions.register (NodesInfoAction.INSTANCE,
TransportNodesInfoAction.class) ;
actions.register (RemoteIn foAction.INSTANCE,
TransportRemoteInfoAction.class) ;
//省略大部分 Action的注册
}
register函数的第-一个参数是名称规则为*Action的类,以比较简单的MainAction为例,其 类结构如下图所示。
这个类主要定义了Action的名称及返回响应,同样以MainAction为例,其实现如下:
public class MainAction extends Action {
//定义Action名称,后续会根据Action名称找到对应的处理类
public static final String NAME = "cluster:monitor/main";
public static final MainAction INSTANCE = new MainAction() ;
public MainRequestBuilder newRequestBuilder (ElasticsearchClient client) {
return new MainRequestBuilder (client,INSTANCE) ;
}
public MainResponse newResponse() {
return new MainResponse() ;
}
}
第二个参数是名称规则为Transport*Action的类,在这个类中定义对此Action的具体处理。 以TransportMainAction类为例,其类结构如下图所示。
注意,许多Transport* Action类都会继承自HandledTransportAction。 而在HandledTransport-Action类的构造函数中,会调用TransportServce#registerRequestHandler 在TransportService类中注册处理器。因此,许多在ActionModule类中注册的RPC信息会自动在TransportService中添加映射关系。
以TransportMainAction类为例,其实现如下:
public class TransportMainAction extends HandledTransportAction {
public Transpor tMainAction() {
}
//定义对此RPC的处理.
protected void doExecute (MainRequest request, ActionListener listener) {
}
}
doExcute函数汇总需要实现最重要的对RPC请求的具体处理逻辑。
2. TransportService类中的注册在TransportService中注册RPC信息是为了在收到传输层的请求后,通过Action字符串找到对应的处理类。注册过程需要提供两个关键信息: Action 名称与请求处理类(TransportRequestHandler对象)。
在TransportService类中通过registerRequestHandler注册RPC信息的来源可以分为两种,一种 是来自ActionModule的注册,Transport* Action类的父类HandledTransportAction在其构造函数中自动调用registerRequestHandler;另一种 是来自其他模块的直接调用registerRequestHandler,例如,TransportReplicationAction 和MasterFaultDetection。
以HandledTransportAction类在TransportService 中的注册为例,其注册过程如下:
transportService.registerRequestHandler (actionName,request, ThreadPool.Name.SAME,false canTripCircuitBreaker, new TransportHandler());
TransportService将映射维护在一个Map中:
Map
Map的key为Action名称,RequestHandlerRegistry 中封装了与RPC相关的Action名称、处理器等信息。通过它可以找到对应的处理模块。为registerRequestHandler 传入的最后一个参数就是定义的处理器。这个处理器需要从TransportRequestHandler 类继承。如前所述,在TransportService类中注册RPC的时机来源于ActionModule和HandledTransportAction的构造函数,我们以HandledTransportAction构造函数中注册时为例,其处理器定义如下:
class TransportHandler implements Transpor tReques tHandler {
//当收到RPC请求
public final void messageReceived (final Request request, final TransportChannel channel, Task task) throws Exception {
//执行这个RPC请求
execute(task, request, new ActionListener() {
//执行成功, 将产生的响应回复客户端
public void onResponse (Response response) {
channel.sendResponse (response) ;
}
//执行失败,向客户端回复失败原因
public void onFailure (Exception e) {
channel.sendResponse (e) ;
}
}) ;
}
}
对请求的处理方法execute定义在TransportAction 类中,它先检测请求的合法性,然后调用Transport*Action中定义的doExecute函数执行真正的RPC处理逻辑。
public final void execute (Task task, Request request, ActionListener listener) {
//验证请求
ActionRequestValidationException validationException = request.validate() ;
RequestFilterChain requestFilterChain = new RequestFilterChain(this, logger) ;
//调用Action定义的doExecute函数执行用户定义的处理
requestFilterChain.proceed(task, actionName,request, listener) ;
}
根据Action获取处理类
当收到一个RPC请求进行处理时,由于触发点的不同,有多种途径找到这个RPC对应的处理模块是哪个。
1. REST请求触发某些REST请求会触发内部的RPC请求,在这种情况下,在NodeClient#executeLocally方法中通过Action获取TransportAction,Actions 是ActionModule类中注册的RPC列表。
TransportAction transportAction (Gener icAction action) {
Transpor tAction transportAction = actions.get(action) ;
return transportAction;
}
获取TransportAction 后,调用execute执行处理。处理过程与,上一节所述相同,在requestFilterChain.proceed方法中调用此Action的doExecute函数进行处理。
2. TcpTransport收到RPC请求从TCP传输层(也就是9300端口)收到一个RPC请求是最常见的方式。当收到一个请求时,首先在TcpTransport#messageReceived 中进行基本的处理,然后到handleRequest方法中处理请求,在这个方法中,调用TransportService通过Action获取处理类。
//通过Action获取处理模块
final RequestHandlerRegistry reg = transportService.getRequestHandler(action);
//调用处理模块执行对RPC的处理逻辑
threadPool.executor(reg.getExecutor()).execute (new RequestHandler(reg, request, transportChannel));
思考与总结
- 本章主要分析了REST API和内部RPC的解析与调用,以及网络层与服务层的关系。
- 默认情况下,ES的每个节点与其他节点都保持13个长连接,这在集群规模较大时,
例如,达到1000节点时,会维护非常多的连接。在这种情况下,如果重新启动集群,由于需要在短时间内建立大量连接,则新建连接的请求有可能被操作系统认为是SYN攻击。
关注我的公众号【宝哥大数据】,更多干货