这里先要先有自己编译好的es源码环境。本篇文章不涉及编译源码部分。 感兴趣的话,你可以可以尝试一下,自己添加一个api试试。跟着这片文章是完全可以的。
从现象到本质:我们的一个请求通常是这样子的
es的请求是restFull风格的请求,由请求方式和请求的URL组成。如果想要自定义添加一个API应该从哪里开始呢?
1.第一步需要定义一个类来继承BaseRestHandler类。这里为了方便查看,以 DELET index这个为例子
package org.elasticsearch.rest.action.admin.indices; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.action.RestToXContentListener; import java.io.IOException; public class RestDeleteIndexAction extends BaseRestHandler { public RestDeleteIndexAction(Settings settings, RestController controller) { super(settings); // 在这里定义需要的 url路径,定义了请求方式。 controller.registerHandler(RestRequest.Method.DELETE, "/", this); controller.registerHandler(RestRequest.Method.DELETE, "/{index}", this); } @Override public String getName() { return "delete_index_action"; } @Override public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { // 这里定义了请求,将rest请求转义成自己特定功能的请求。所以需要模仿DeleteIndexRequest类来定义一个自己的去请求类。 DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(Strings.splitStringByCommaToArray(request.param("index"))); deleteIndexRequest.timeout(request.paramAsTime("timeout", deleteIndexRequest.timeout())); deleteIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", deleteIndexRequest.masterNodeTimeout())); deleteIndexRequest.indicesOptions(IndicesOptions.fromRequest(request, deleteIndexRequest.indicesOptions())); // 在这里去关联了处理该请求的类 return channel -> client.admin().indices().delete(deleteIndexRequest, new RestToXContentListener<>(channel)); } }
RestDeleteIndexAction绑定了 url,和预处理请求,将RestRequest请求构造成 es内部请求。并关联了处理类
channel -> client.admin().indices().delete(deleteIndexRequest, new RestToXContentListener<>(channel));
我们点进client.admin().indices().delete() 这是一个接口IndicesAdminClient,向下跟进它的实现方法,到了AbstractClient类,看到了实现了delete()方法,在方法中可以看到,实际上这里也只是做关联,它把处理这个url请求关联到了DeleteIndexAction上,指定该请求去找***Action这个类。
@Override public void delete(final DeleteIndexRequest request, final ActionListener<AcknowledgedResponse> listener) { execute(DeleteIndexAction.INSTANCE, request, listener); }
如果想添加方法,可以再添加类似于delete()的方法。
看一下这个DeleteIndexAction类的代码
这个类中关联了相应的
import org.elasticsearch.action.Action; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.action.support.master.AcknowledgedResponse; public class DeleteIndexAction extends Action<DeleteIndexRequest, AcknowledgedResponse, DeleteIndexRequestBuilder> { public static final DeleteIndexAction INSTANCE = new DeleteIndexAction(); public static final String NAME = "indices:admin/delete"; private DeleteIndexAction() { super(NAME); } @Override public AcknowledgedResponse newResponse() { return new AcknowledgedResponse(); } @Override public DeleteIndexRequestBuilder newRequestBuilder(ElasticsearchClient client) { return new DeleteIndexRequestBuilder(client, this); } }
ActionMode类中,关联了这个actoin和处理这个action 的类 (package org.elasticsearch.action)
static Map<String, ActionHandler<?, ?>> setupActions(List<ActionPlugin> actionPlugins) { // Subclass NamedRegistry for easy registration class ActionRegistry extends NamedRegistry<ActionHandler<?, ?>> { ActionRegistry() { super("action"); } public void register(ActionHandler<?, ?> handler) { register(handler.getAction().name(), handler); } public <Request extends ActionRequest, Response extends ActionResponse> void register( GenericAction<Request, Response> action, Class<? extends TransportAction<Request, Response>> transportAction, Class<?>... supportTransportActions) { register(new ActionHandler<>(action, transportAction, supportTransportActions)); } } ActionRegistry actions = new ActionRegistry(); // 删减出来的下边的代码。action都是在这里进行绑定的。 actions.register(DeleteIndexAction.INSTANCE, TransportDeleteIndexAction.class); return unmodifiableMap(actions.getRegistry()); }
ActionMode类中 注册restAction 到handler里边
public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) { List<AbstractCatAction> catActions = new ArrayList<>(); Consumer<RestHandler> registerHandler = a -> { if (a instanceof AbstractCatAction) { catActions.add((AbstractCatAction) a); } }; // 有删减代码,删减部分都和下行一样。都是用来注册restAction的。 registerHandler.accept(new RestDeleteIndexAction(settings, restController); }
再看一下 restAction是什么(上边有提到)
public class RestDeleteIndexAction extends BaseRestHandler { public RestDeleteIndexAction(Settings settings, RestController controller) { super(settings); controller.registerHandler(RestRequest.Method.DELETE, "/", this); controller.registerHandler(RestRequest.Method.DELETE, "/{index}", this); } @Override public String getName() { return "delete_index_action"; } @Override public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { String[] indices = Strings.splitStringByCommaToArray(request.param("index")); IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest(); indicesStatsRequest.setAddFreezingIndices(true); indicesStatsRequest.indices(indices); final IndicesOptions strictExpandIndicesOptions = IndicesOptions.strictExpand(); indicesStatsRequest.indicesOptions(strictExpandIndicesOptions); indicesStatsRequest.all(); return channel -> client.admin().indices().stats(indicesStatsRequest, new RestActionListener<IndicesStatsResponse>(channel) { @Override public void processResponse(IndicesStatsResponse indicesStatsResponse) throws Exception { ImmutableOpenMap.Builder<String, IndexStats> builder = ImmutableOpenMap.builder(); for(String index : indices) { IndexStats indexStats = indicesStatsResponse.getIndices().get(index); builder.put(index, indexStats); } DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indices); deleteIndexRequest.indicesStats(builder.build()); deleteIndexRequest.timeout(request.paramAsTime("timeout", deleteIndexRequest.timeout())); deleteIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", deleteIndexRequest.masterNodeTimeout())); deleteIndexRequest.indicesOptions(IndicesOptions.fromRequest(request, deleteIndexRequest.indicesOptions())); client.admin().indices().delete(deleteIndexRequest, new RestToXContentListener<>(channel)); } }); } }