这里先要先有自己编译好的es源码环境。本篇文章不涉及编译源码部分。 感兴趣的话,你可以可以尝试一下,自己添加一个api试试。跟着这片文章是完全可以的。
从现象到本质:我们的一个请求通常是这样子的
es的请求是restFull风格的请求,由请求方式和请求的URL组成。如果想要自定义添加一个API应该从哪里开始呢?
1.第一步需要定义一个类来继承BaseRestHandler类。这里为了方便查看,以 DELET index这个为例子
package org.elasticsearch.rest.action.admin.indices;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import java.io.IOException;
public class RestDeleteIndexAction extends BaseRestHandler {
public RestDeleteIndexAction(Settings settings, RestController controller) {
super(settings);
// 在这里定义需要的 url路径,定义了请求方式。
controller.registerHandler(RestRequest.Method.DELETE, "/", this);
controller.registerHandler(RestRequest.Method.DELETE, "/{index}", this);
}
@Override
public String getName() {
return "delete_index_action";
}
@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
// 这里定义了请求,将rest请求转义成自己特定功能的请求。所以需要模仿DeleteIndexRequest类来定义一个自己的去请求类。
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(Strings.splitStringByCommaToArray(request.param("index")));
deleteIndexRequest.timeout(request.paramAsTime("timeout", deleteIndexRequest.timeout()));
deleteIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", deleteIndexRequest.masterNodeTimeout()));
deleteIndexRequest.indicesOptions(IndicesOptions.fromRequest(request, deleteIndexRequest.indicesOptions()));
// 在这里去关联了处理该请求的类
return channel -> client.admin().indices().delete(deleteIndexRequest, new RestToXContentListener(channel));
}
}
RestDeleteIndexAction绑定了 url,和预处理请求,将RestRequest请求构造成 es内部请求。并关联了处理类
channel -> client.admin().indices().delete(deleteIndexRequest, new RestToXContentListener(channel));
我们点进client.admin().indices().delete() 这是一个接口IndicesAdminClient,向下跟进它的实现方法,到了AbstractClient类,看到了实现了delete()方法,在方法中可以看到,实际上这里也只是做关联,它把处理这个url请求关联到了DeleteIndexAction上,指定该请求去找***Action这个类。
@Override
public void delete(final DeleteIndexRequest request, final ActionListener listener) {
execute(DeleteIndexAction.INSTANCE, request, listener);
}
如果想添加方法,可以再添加类似于delete()的方法。
看一下这个DeleteIndexAction类的代码
这个类中关联了相应的
import org.elasticsearch.action.Action;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
public class DeleteIndexAction extends Action {
public static final DeleteIndexAction INSTANCE = new DeleteIndexAction();
public static final String NAME = "indices:admin/delete";
private DeleteIndexAction() {
super(NAME);
}
@Override
public AcknowledgedResponse newResponse() {
return new AcknowledgedResponse();
}
@Override
public DeleteIndexRequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new DeleteIndexRequestBuilder(client, this);
}
}
ActionMode类中,关联了这个actoin和处理这个action 的类 (package org.elasticsearch.action)
static Map setupActions(List actionPlugins) {
// Subclass NamedRegistry for easy registration
class ActionRegistry extends NamedRegistry {
ActionRegistry() {
super("action");
}
public void register(ActionHandler handler) {
register(handler.getAction().name(), handler);
}
public void register(
GenericAction action, Class transportAction,
Class... supportTransportActions) {
register(new ActionHandler(action, transportAction, supportTransportActions));
}
}
ActionRegistry actions = new ActionRegistry();
// 删减出来的下边的代码。action都是在这里进行绑定的。
actions.register(DeleteIndexAction.INSTANCE, TransportDeleteIndexAction.class);
return unmodifiableMap(actions.getRegistry());
}
ActionMode类中 注册restAction 到handler里边
public void initRestHandlers(Supplier nodesInCluster) {
List catActions = new ArrayList();
Consumer registerHandler = a -> {
if (a instanceof AbstractCatAction) {
catActions.add((AbstractCatAction) a);
}
};
// 有删减代码,删减部分都和下行一样。都是用来注册restAction的。
registerHandler.accept(new RestDeleteIndexAction(settings, restController);
}
再看一下 restAction是什么(上边有提到)
public class RestDeleteIndexAction extends BaseRestHandler {
public RestDeleteIndexAction(Settings settings, RestController controller) {
super(settings);
controller.registerHandler(RestRequest.Method.DELETE, "/", this);
controller.registerHandler(RestRequest.Method.DELETE, "/{index}", this);
}
@Override
public String getName() {
return "delete_index_action";
}
@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
String[] indices = Strings.splitStringByCommaToArray(request.param("index"));
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.setAddFreezingIndices(true);
indicesStatsRequest.indices(indices);
final IndicesOptions strictExpandIndicesOptions = IndicesOptions.strictExpand();
indicesStatsRequest.indicesOptions(strictExpandIndicesOptions);
indicesStatsRequest.all();
return channel -> client.admin().indices().stats(indicesStatsRequest, new RestActionListener(channel) {
@Override
public void processResponse(IndicesStatsResponse indicesStatsResponse) throws Exception {
ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder();
for(String index : indices) {
IndexStats indexStats = indicesStatsResponse.getIndices().get(index);
builder.put(index, indexStats);
}
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indices);
deleteIndexRequest.indicesStats(builder.build());
deleteIndexRequest.timeout(request.paramAsTime("timeout", deleteIndexRequest.timeout()));
deleteIndexRequest.masterNodeTimeout(request.paramAsTime("master_timeout", deleteIndexRequest.masterNodeTimeout()));
deleteIndexRequest.indicesOptions(IndicesOptions.fromRequest(request, deleteIndexRequest.indicesOptions()));
client.admin().indices().delete(deleteIndexRequest, new RestToXContentListener(channel));
}
});
}
}