elasticsearch 源码解析(一)——restapi
Rest API作为Elasticsearch一个独特的特性,基于Restful富文本解析协议提供给客户端使用http访问系统资源的一个方便快捷安全的途径。本文从Rest API请求的视角解析整个请求的实现代码。
Rest API示例:
$ curl -XPUT 'http://localhost:9200/my_index/my_type/_mapping' -d '{ "my_type" : { # mapping for my_type }}'
我们可以直接从client的rest模块入手阅读源码
RestClient代码分析
RestClient调用流程图
public RestClient build() { if (failureListener == null) { failureListener = new RestClient.FailureListener(); } CloseableHttpAsyncClient httpClient = AccessController.doPrivileged(new PrivilegedAction<CloseableHttpAsyncClient>() { @Override public CloseableHttpAsyncClient run() { return createHttpClient(); } }); RestClient restClient = new RestClient(httpClient, maxRetryTimeout, defaultHeaders, hosts, pathPrefix, failureListener); httpClient.start(); return restClient;}
public Response performRequest(String method, String endpoint, Map<String, String> params, HttpEntity entity, HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory, Header... headers) throws IOException { SyncResponseListener listener = new SyncResponseListener(maxRetryTimeoutMillis); performRequestAsyncNoCatch(method, endpoint, params, entity, httpAsyncResponseConsumerFactory, listener, headers); return listener.get();}
server中的put-mapping rest action类维护在下面的目录树:
server - elasticsearch - rest - action - admin - indices RestPutMappingAction
请求客户端实现流程图
从图中可以看出,Http请求处理的底层使用的是930x端口的Tcp请求的处理器
,统一了Http和Tcp请求的处理逻辑。以上就是Elasticsearch在服务端对Http请求处理的大致的通用流程,使用RestAction体系绑定9200端口接收Http请求,转发至针对9300端口的TransportAction,统一Rest请求和Transport请求的处理逻辑,使用Netty作为底层数据传输架构,使用递增的RequestId,保存所有的RequestId和对应的ResponseHandler,当接受到对应的RequestId的Response时回调ResponseHandler, 如果响应超时,自动触发TimeOutHandler。
TcpTransport代码深入解析
TCPTransport是Netty4Transport抽象,它主要以节点为粒度定义了通信的过程,包括:ping任务、连接建立,发送信息和协议结构的基本封装等。 TCPTransport核心API如下:
+connectToNode (连接到Node)+sendRequest (发送请求信息)+receivedMessage (接受信息) -handleRequest (接收请求信息) -handleResponse (接收响应信息)+sendResponse (发送响应信息)+pingSchedule (ping任务)
#connectToNode:
public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile, CheckedBiConsumer<Connection, ConnectionProfile, IOException> connectionValidator) throws ConnectTransportException { connectionProfile = resolveConnectionProfile(connectionProfile); if (node == null) { throw new ConnectTransportException(null, "can't connect to a null node"); } closeLock.readLock().lock(); // 确保关闭的时候没法打开连接 try { ensureOpen(); try (Releasable ignored = connectionLock.acquire(node.getId())) { NodeChannels nodeChannels = connectedNodes.get(node); if (nodeChannels != null) { return; } boolean success = false; try { nodeChannels = openConnection(node, connectionProfile); connectionValidator.accept(nodeChannels, connectionProfile); // 需要有一个连接的锁对象,保证没有现有的连接 connectedNodes.put(node, nodeChannels); if (logger.isDebugEnabled()) { logger.debug("connected to node [{}]", node); } try { transportService.onNodeConnected(node); } finally { if (nodeChannels.isClosed()) { if (connectedNodes.remove(node, nodeChannels)) { transportService.onNodeDisconnected(node); } throw new NodeNotConnectedException(node, "connection concurrently closed"); } } success = true; } catch (ConnectTransportException e) { throw e; } catch (Exception e) { throw new ConnectTransportException(node, "general node connection failure", e); } finally { if (success == false) { // 如果失败关闭连接 logger.trace(() -> new ParameterizedMessage("failed to connect to [{}], cleaning dangling connections", node)); IOUtils.closeWhileHandlingException(nodeChannels); } } } } finally { closeLock.readLock().unlock(); }}
#sendRequest:
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { if (closed.get()) { throw new NodeNotConnectedException(node, "connection already closed"); } TcpChannel channel = channel(options.type()); sendRequestToChannel(this.node, channel, requestId, action, request, options, getVersion(), (byte) 0);}private void sendRequestToChannel(final DiscoveryNode node, final TcpChannel channel, final long requestId, final String action, final TransportRequest request, TransportRequestOptions options, Version channelVersion, byte status) throws IOException, TransportException { ... try { final TransportRequestOptions finalOptions = options; // 应该在另一个线程中调用,查看相关issue SendListener onRequestSent = new SendListener(channel, stream, () -> transportService.onRequestSent(node, requestId, action, request, finalOptions), message.length()); internalSendMessage(channel, message, onRequestSent); addedReleaseListener = true; } ...}
#handleResponse:
private void handleResponse(InetSocketAddress remoteAddress, final StreamInput stream, final TransportResponseHandler handler) { final TransportResponse response; try { response = handler.read(stream); response.remoteAddress(new TransportAddress(remoteAddress)); } catch (Exception e) { handleException(handler, new TransportSerializationException( "Failed to deserialize response from handler [" + handler.getClass().getName() + "]", e)); return; } threadPool.executor(handler.executor()).execute(new AbstractRunnable() { @Override public void onFailure(Exception e) { handleException(handler, new ResponseHandlerFailureTransportException(e)); } @Override protected void doRun() throws Exception { handler.handleResponse(response); } });}
#handleRequest:
protected String handleRequest(TcpChannel channel, String profileName, final StreamInput stream, long requestId, int messageLengthBytes, Version version, InetSocketAddress remoteAddress, byte status) throws IOException { final Set<String> features; ... final String action = stream.readString(); transportService.onRequestReceived(requestId, action); TransportChannel transportChannel = null; try { if (TransportStatus.isHandshake(status)) { ... } else { ... transportChannel = new TcpTransportChannel(this, channel, transportName, action, requestId, version, features, profileName, messageLengthBytes); final TransportRequest request = reg.newRequest(stream); request.remoteAddress(new TransportAddress(remoteAddress)); // in case we throw an exception, i.e. when the limit is hit, we don't want to verify validateRequest(stream, requestId, action); threadPool.executor(reg.getExecutor()).execute(new RequestHandler(reg, request, transportChannel)); } } catch (Exception e) { ... } return action;}
第20行便是线程池执行部分,可以看出其本质上是es自己实现了netty的一套处理网络请求的思路,采用transportChannel。
#sendResponse:
private void sendResponse( final Version nodeVersion, final Set<String> features, final TcpChannel channel, final TransportResponse response, final long requestId, final String action, TransportResponseOptions options, byte status) throws IOException { if (compress) { options = TransportResponseOptions.builder(options).withCompress(true).build(); } status = TransportStatus.setResponse(status); ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays); CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bStream, options.compress()); boolean addedReleaseListener = false; try { if (options.compress()) { status = TransportStatus.setCompress(status); } threadPool.getThreadContext().writeTo(stream); stream.setVersion(nodeVersion); stream.setFeatures(features); BytesReference message = buildMessage(requestId, status, nodeVersion, response, stream); final TransportResponseOptions finalOptions = options; // 应该在另一个线程中调用,查看相关issue SendListener listener = new SendListener(channel, stream, () -> transportService.onResponseSent(requestId, action, response, finalOptions), message.length()); internalSendMessage(channel, message, listener); addedReleaseListener = true; } finally { if (!addedReleaseListener) { IOUtils.close(stream); } }}
可见,除了写入流和网络连接以及缓冲区的数据交换没有什么特别的。
下面以Put Mapping的过程为例,讲解相关源码的实现思路:
TransportPutMappingAction解析
从前面的分析可知,HTTP请求最终底层是由Transport实现的,所以可得RestPutMappingAction底层调用的是TrasportPutMappingAction。分析该类,在其中发现一个方法叫masterOperation,那就一起看下mater的操作是怎样的。
代码如下:
protected void masterOperation(final PutMappingRequest request, final ClusterState state, final ActionListener<PutMappingResponse> listener) { try { final Index[] concreteIndices = request.getConcreteIndex() == null ? indexNameExpressionResolver.concreteIndices(state, request) : new Index[] {request.getConcreteIndex()}; PutMappingClusterStateUpdateRequest updateRequest = new PutMappingClusterStateUpdateRequest() .ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout()) .indices(concreteIndices).type(request.type()) .updateAllTypes(request.updateAllTypes()) .source(request.source()); metaDataMappingService.putMapping(updateRequest, new ActionListener<ClusterStateUpdateResponse>() { @Override public void onResponse(ClusterStateUpdateResponse response) { listener.onResponse(new PutMappingResponse(response.isAcknowledged())); } @Override public void onFailure(Exception t) { logger.debug(() -> new ParameterizedMessage("failed to put mappings on indices [{}], type [{}]", concreteIndices, request.type()), t); listener.onFailure(t); } }); } catch (IndexNotFoundException ex) { logger.debug(() -> new ParameterizedMessage("failed to put mappings on indices [{}], type [{}]", request.indices(), request.type()), ex); throw ex; }}
第3行,首先获取请求中真实的index,如果null则从indexNameExpressionResolver中拿到一个。之后封装好请求PutMappingClusterStateUpdateRequest来通知action的变更,参数由ackTimeout,masterNodeTimeout(等待request在master上执行完成的最长时间),indices,type,source和是否updateAllTypes。第10行,metaDataMappingService执行实际也是最终的putMapping调用。putMapping代码:
public void putMapping(final PutMappingClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) { clusterService.submitStateUpdateTask("put-mapping", request, ClusterStateTaskConfig.build(Priority.HIGH, request.masterNodeTimeout()), putMappingExecutor, new AckedClusterStateTaskListener() { @Override public void onFailure(String source, Exception e) { listener.onFailure(e); } @Override public boolean mustAck(DiscoveryNode discoveryNode) { return true; } @Override public void onAllNodesAcked(@Nullable Exception e) { listener.onResponse(new ClusterStateUpdateResponse(true)); } @Override public void onAckTimeout() { listener.onResponse(new ClusterStateUpdateResponse(false)); } @Override public TimeValue ackTimeout() { return request.ackTimeout(); } });}
第4行可以看出该操作的优先级Priority是HIGH,submitStateUpdateTask源码上的注释着重分析下:
大致意思是提交的更新任务是批量执行在同一个executor实例上的。如果在这个executor上有挂起的更新任务,这时提交的更新任务将会加入到这个executor上的同一个批次任务中。从这个注释中可以看出put-mapping并非和shard-failed任务一样会合并一起再提交。(MaterService#runTasks)
后面的流程大致是:
runTasks -> calculateTaskOutputs -> executeTasks -> PutMappingExecutor#execute
PutMappingExecutor:
class PutMappingExecutor implements ClusterStateTaskExecutor<PutMappingClusterStateUpdateRequest> { @Override public ClusterTasksResult<PutMappingClusterStateUpdateRequest> execute(ClusterState currentState, List<PutMappingClusterStateUpdateRequest> tasks) throws Exception { Map<Index, MapperService> indexMapperServices = new HashMap<>(); ClusterTasksResult.Builder<PutMappingClusterStateUpdateRequest> builder = ClusterTasksResult.builder(); try { for (PutMappingClusterStateUpdateRequest request : tasks) { try { for (Index index : request.indices()) { final IndexMetaData indexMetaData = currentState.metaData().getIndexSafe(index); if (indexMapperServices.containsKey(indexMetaData.getIndex()) == false) { MapperService mapperService = indicesService.createIndexMapperService(indexMetaData); indexMapperServices.put(index, mapperService); // 为所有的type增加mapping,需要跨type进行校验 mapperService.merge(indexMetaData, MergeReason.MAPPING_RECOVERY, request.updateAllTypes()); } } currentState = applyRequest(currentState, request, indexMapperServices); builder.success(request); } catch (Exception e) { builder.failure(request, e); } } return builder.build(currentState); } finally { IOUtils.close(indexMapperServices.values()); } }
版权声明: 本文为 InfoQ 作者【罗琦】的原创文章。
原文链接:【http://xie.infoq.cn/article/8a9529242fa5b368f9d115032】。文章转载请联系作者。
罗琦
后浪 2017.12.15 加入
字节跳动工程师
评论