写点什么

elasticsearch 源码解析(一)——restapi

用户头像
罗琦
关注
发布于: 2020 年 05 月 16 日
elasticsearch源码解析(一)——restapi



Rest API作为Elasticsearch一个独特的特性,基于Restful富文本解析协议提供给客户端使用http访问系统资源的一个方便快捷安全的途径。本文从Rest API请求的视角解析整个请求的实现代码。

Rest API示例:

$ curl -XPUT 'http://localhost:9200/my_index/my_type/_mapping' -d '
{
"my_type" : {
# mapping for my_type
}
}
'

我们可以直接从client的rest模块入手阅读源码

RestClient代码分析

RestClient调用流程图



public RestClient build() {
if (failureListener == null) {
failureListener = new RestClient.FailureListener();
}
CloseableHttpAsyncClient httpClient = AccessController.doPrivileged(new PrivilegedAction<CloseableHttpAsyncClient>() {
@Override
public CloseableHttpAsyncClient run() {
return createHttpClient();
}
});
RestClient restClient = new RestClient(httpClient, maxRetryTimeout, defaultHeaders, hosts, pathPrefix, failureListener);
httpClient.start();
return restClient;
}



public Response performRequest(String method, String endpoint, Map<String, String> params,
HttpEntity entity, HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory,
Header... headers) throws IOException {
SyncResponseListener listener = new SyncResponseListener(maxRetryTimeoutMillis);
performRequestAsyncNoCatch(method, endpoint, params, entity, httpAsyncResponseConsumerFactory,
listener, headers);
return listener.get();
}

server中的put-mapping rest action类维护在下面的目录树:

server
- elasticsearch
- rest
- action
- admin
- indices
RestPutMappingAction

请求客户端实现流程图

从图中可以看出,​Http请求处理的底层使用的是930x端口的Tcp请求的处理器​,统一了Http和Tcp请求的处理逻辑。以上就是Elasticsearch在服务端对Http请求处理的大致的通用流程,使用RestAction体系绑定9200端口接收Http请求,转发至针对9300端口的TransportAction,统一Rest请求和Transport请求的处理逻辑,使用Netty作为底层数据传输架构,使用递增的RequestId,保存所有的RequestId和对应的ResponseHandler,当接受到对应的RequestId的Response时回调ResponseHandler, 如果响应超时,自动触发TimeOutHandler。

TcpTransport代码深入解析

TCPTransport是Netty4Transport抽象,它主要以节点为粒度定义了通信的过程,包括:ping任务、连接建立,发送信息和协议结构的基本封装等。 TCPTransport核心API如下:

+connectToNode (连接到Node)
+sendRequest (发送请求信息)
+receivedMessage (接受信息)
-handleRequest (接收请求信息)
-handleResponse (接收响应信息)
+sendResponse (发送响应信息)
+pingSchedule (ping任务)

#connectToNode:

public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile, CheckedBiConsumer<Connection, ConnectionProfile, IOException> connectionValidator)
throws ConnectTransportException {
connectionProfile = resolveConnectionProfile(connectionProfile);
if (node == null) {
throw new ConnectTransportException(null, "can't connect to a null node");
}
closeLock.readLock().lock(); // 确保关闭的时候没法打开连接
try {
ensureOpen();
try (Releasable ignored = connectionLock.acquire(node.getId())) {
NodeChannels nodeChannels = connectedNodes.get(node);
if (nodeChannels != null) {
return;
}
boolean success = false;
try {
nodeChannels = openConnection(node, connectionProfile);
connectionValidator.accept(nodeChannels, connectionProfile);
// 需要有一个连接的锁对象,保证没有现有的连接
connectedNodes.put(node, nodeChannels);
if (logger.isDebugEnabled()) {
logger.debug("connected to node [{}]", node);
}
try {
transportService.onNodeConnected(node);
} finally {
if (nodeChannels.isClosed()) {
if (connectedNodes.remove(node, nodeChannels)) {
transportService.onNodeDisconnected(node);
}
throw new NodeNotConnectedException(node, "connection concurrently closed");
}
}
success = true;
} catch (ConnectTransportException e) {
throw e;
} catch (Exception e) {
throw new ConnectTransportException(node, "general node connection failure", e);
} finally {
if (success == false) { // 如果失败关闭连接
logger.trace(() -> new ParameterizedMessage("failed to connect to [{}], cleaning dangling connections", node));
IOUtils.closeWhileHandlingException(nodeChannels);
}
}
}
} finally {
closeLock.readLock().unlock();
}
}

#sendRequest:

public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws IOException, TransportException {
if (closed.get()) {
throw new NodeNotConnectedException(node, "connection already closed");
}
TcpChannel channel = channel(options.type());
sendRequestToChannel(this.node, channel, requestId, action, request, options, getVersion(), (byte) 0);
}
private void sendRequestToChannel(final DiscoveryNode node, final TcpChannel channel, final long requestId, final String action,
final TransportRequest request, TransportRequestOptions options, Version channelVersion,
byte status) throws IOException,
TransportException {
...
try {
final TransportRequestOptions finalOptions = options;
// 应该在另一个线程中调用,查看相关issue
SendListener onRequestSent = new SendListener(channel, stream,
() -> transportService.onRequestSent(node, requestId, action, request, finalOptions), message.length());
internalSendMessage(channel, message, onRequestSent);
addedReleaseListener = true;
}
...
}

#handleResponse:

private void handleResponse(InetSocketAddress remoteAddress, final StreamInput stream, final TransportResponseHandler handler) {
final TransportResponse response;
try {
response = handler.read(stream);
response.remoteAddress(new TransportAddress(remoteAddress));
} catch (Exception e) {
handleException(handler, new TransportSerializationException(
"Failed to deserialize response from handler [" + handler.getClass().getName() + "]", e));
return;
}
threadPool.executor(handler.executor()).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
handleException(handler, new ResponseHandlerFailureTransportException(e));
}
@Override
protected void doRun() throws Exception {
handler.handleResponse(response);
}
});
}

#handleRequest:

protected String handleRequest(TcpChannel channel, String profileName, final StreamInput stream, long requestId,
int messageLengthBytes, Version version, InetSocketAddress remoteAddress, byte status)
throws IOException {
final Set<String> features;
...
final String action = stream.readString();
transportService.onRequestReceived(requestId, action);
TransportChannel transportChannel = null;
try {
if (TransportStatus.isHandshake(status)) {
...
} else {
...
transportChannel = new TcpTransportChannel(this, channel, transportName, action, requestId, version, features, profileName,
messageLengthBytes);
final TransportRequest request = reg.newRequest(stream);
request.remoteAddress(new TransportAddress(remoteAddress));
// in case we throw an exception, i.e. when the limit is hit, we don't want to verify
validateRequest(stream, requestId, action);
threadPool.executor(reg.getExecutor()).execute(new RequestHandler(reg, request, transportChannel));
}
} catch (Exception e) {
...
}
return action;
}

第20行便是线程池执行部分,可以看出其本质上是es自己实现了netty的一套处理网络请求的思路,采用transportChannel。

#sendResponse:

private void sendResponse(
final Version nodeVersion,
final Set<String> features,
final TcpChannel channel,
final TransportResponse response,
final long requestId,
final String action,
TransportResponseOptions options,
byte status) throws IOException {
if (compress) {
options = TransportResponseOptions.builder(options).withCompress(true).build();
}
status = TransportStatus.setResponse(status);
ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays);
CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bStream, options.compress());
boolean addedReleaseListener = false;
try {
if (options.compress()) {
status = TransportStatus.setCompress(status);
}
threadPool.getThreadContext().writeTo(stream);
stream.setVersion(nodeVersion);
stream.setFeatures(features);
BytesReference message = buildMessage(requestId, status, nodeVersion, response, stream);
final TransportResponseOptions finalOptions = options;
// 应该在另一个线程中调用,查看相关issue
SendListener listener = new SendListener(channel, stream,
() -> transportService.onResponseSent(requestId, action, response, finalOptions), message.length());
internalSendMessage(channel, message, listener);
addedReleaseListener = true;
} finally {
if (!addedReleaseListener) {
IOUtils.close(stream);
}
}
}

可见,除了写入流和网络连接以及缓冲区的数据交换没有什么特别的。

下面以Put Mapping的过程为例,讲解相关源码的实现思路:

TransportPutMappingAction解析

从前面的分析可知,HTTP请求最终底层是由Transport实现的,所以可得RestPutMappingAction底层调用的是TrasportPutMappingAction。分析该类,在其中发现一个方法叫masterOperation,那就一起看下mater的操作是怎样的。

代码如下:

protected void masterOperation(final PutMappingRequest request, final ClusterState state, final ActionListener<PutMappingResponse> listener) {
try {
final Index[] concreteIndices = request.getConcreteIndex() == null ? indexNameExpressionResolver.concreteIndices(state, request) : new Index[] {request.getConcreteIndex()};
PutMappingClusterStateUpdateRequest updateRequest = new PutMappingClusterStateUpdateRequest()
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
.indices(concreteIndices).type(request.type())
.updateAllTypes(request.updateAllTypes())
.source(request.source());
metaDataMappingService.putMapping(updateRequest, new ActionListener<ClusterStateUpdateResponse>() {
@Override
public void onResponse(ClusterStateUpdateResponse response) {
listener.onResponse(new PutMappingResponse(response.isAcknowledged()));
}
@Override
public void onFailure(Exception t) {
logger.debug(() -> new ParameterizedMessage("failed to put mappings on indices [{}], type [{}]", concreteIndices, request.type()), t);
listener.onFailure(t);
}
});
} catch (IndexNotFoundException ex) {
logger.debug(() -> new ParameterizedMessage("failed to put mappings on indices [{}], type [{}]", request.indices(), request.type()), ex);
throw ex;
}
}

第3行,首先获取请求中真实的index,如果null则从indexNameExpressionResolver中拿到一个。之后封装好请求PutMappingClusterStateUpdateRequest来通知action的变更,参数由ackTimeout,masterNodeTimeout(等待request在master上执行完成的最长时间),indices,type,source和是否updateAllTypes。第10行,metaDataMappingService执行实际也是最终的putMapping调用。putMapping代码:

public void putMapping(final PutMappingClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
clusterService.submitStateUpdateTask("put-mapping",
request,
ClusterStateTaskConfig.build(Priority.HIGH, request.masterNodeTimeout()),
putMappingExecutor,
new AckedClusterStateTaskListener() {
@Override
public void onFailure(String source, Exception e) {
listener.onFailure(e);
}
@Override
public boolean mustAck(DiscoveryNode discoveryNode) {
return true;
}
@Override
public void onAllNodesAcked(@Nullable Exception e) {
listener.onResponse(new ClusterStateUpdateResponse(true));
}
@Override
public void onAckTimeout() {
listener.onResponse(new ClusterStateUpdateResponse(false));
}
@Override
public TimeValue ackTimeout() {
return request.ackTimeout();
}
});
}

第4行可以看出该操作的优先级Priority是HIGH,submitStateUpdateTask源码上的注释着重分析下:

大致意思是提交的更新任务是批量执行在同一个executor实例上的。如果在这个executor上有挂起的更新任务,这时提交的更新任务将会加入到这个executor上的同一个批次任务中。从这个注释中可以看出put-mapping并非和shard-failed任务一样会合并一起再提交。(MaterService#runTasks)

后面的流程大致是:

runTasks -> calculateTaskOutputs -> executeTasks -> PutMappingExecutor#execute

PutMappingExecutor:

class PutMappingExecutor implements ClusterStateTaskExecutor<PutMappingClusterStateUpdateRequest> {
@Override
public ClusterTasksResult<PutMappingClusterStateUpdateRequest> execute(ClusterState currentState,
List<PutMappingClusterStateUpdateRequest> tasks) throws Exception {
Map<Index, MapperService> indexMapperServices = new HashMap<>();
ClusterTasksResult.Builder<PutMappingClusterStateUpdateRequest> builder = ClusterTasksResult.builder();
try {
for (PutMappingClusterStateUpdateRequest request : tasks) {
try {
for (Index index : request.indices()) {
final IndexMetaData indexMetaData = currentState.metaData().getIndexSafe(index);
if (indexMapperServices.containsKey(indexMetaData.getIndex()) == false) {
MapperService mapperService = indicesService.createIndexMapperService(indexMetaData);
indexMapperServices.put(index, mapperService);
// 为所有的type增加mapping,需要跨type进行校验
mapperService.merge(indexMetaData, MergeReason.MAPPING_RECOVERY, request.updateAllTypes());
}
}
currentState = applyRequest(currentState, request, indexMapperServices);
builder.success(request);
} catch (Exception e) {
builder.failure(request, e);
}
}
return builder.build(currentState);
} finally {
IOUtils.close(indexMapperServices.values());
}
}



发布于: 2020 年 05 月 16 日阅读数: 91
用户头像

罗琦

关注

后浪 2017.12.15 加入

字节跳动工程师

评论

发布
暂无评论
elasticsearch源码解析(一)——restapi