创建索引源码学习
请求入口
接收 HTTP 请求的入口在:
org.elasticsearch.http.netty4.Netty4HttpRequestHandler
class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<Object> { @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { // here }}
复制代码
之后请求交给 RestController,对请求进行检验和分发
public class RestController implements HttpServerTransport.Dispatcher { @Override public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) {... try { //here tryAllHandlers(request, channel, threadContext); } catch (Exception e) { try { channel.sendResponse(new BytesRestResponse(channel, e)); } catch (Exception inner) { inner.addSuppressed(e); logger.error(() -> new ParameterizedMessage("failed to send failure response for uri [{}]", request.uri()), inner); } } }}
复制代码
请求对象分为 Action 和 Request。Action 即动作/操作,主要职责是 1.封装 Request,Request 的职责就是携带具体请求参数 2.注册对应的 URI.比如 DeleteAction,CreateIndexAction 等所有操作都有对应的 Action 类。Request 的职责就是携带具体参数。其中有一个比较独特的 Action:TransportAction,它和它的子类的职责是将对应 request 发送到正确的节点上。3.业务操作具体选用哪个 Action 是由分发请求前,根据系统内已注册的 URI 确定的。
创建索引流程
客户端提交创建索引的基本信息(索引名称、分区数、副本数等),提交到服务端,服务端将 CreateIndexRequest 封装成 CreateIndexClusterStateUpdateRequest。根据 actionName 获得具体响应的 action,具体入口是:
org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction
复制代码
拿到响应 action 后调用实例属性 MetadataCreateIndexService#createIndex()进入到 onlyCreateIndex(),该实例属性的作用是:负责提交创建索引请求的服务;所以创建索引大致可以分为以下几个主要流程:
master 节点发起集群状态更新任务
创建索引会改变当前集群状态 ClusterState,集群状态只能在主节点上更新,所以 onlyCreateIndex 方法进来后,就会由 ClusterService 调起 MasterService,并在 master 节点上发起:提交一批集群状态更新任务
MasterService#submitStateUpdateTasks;
复制代码
经过一系列调用,在 master 节点运行创建索引任务
再进入到具体的创建索引逻辑
匹配索引模板启动 IndexService 服务
创建索引需要通过 IndexModule 初始化一个 IndexService,IndexService 主要包含创建、删除、关闭分片等操作,即维护一个索引的基本操作。进入创建索引逻辑
public ClusterState applyCreateIndexRequest(ClusterState currentState, CreateIndexClusterStateUpdateRequest request, boolean silent, BiConsumer<Metadata.Builder, IndexMetadata> metadataTransformer) throws Exception { normalizeRequestSetting(request); logger.trace("executing IndexCreationTask for [{}] against cluster state version [{}]", request, currentState.version()); //索引相关检验 validate(request, currentState); ... ... return applyCreateIndexRequestWithV1Templates(currentState, request, silent, v1Templates, metadataTransformer); }
复制代码
创建 IndexModule 和 IndexService
private synchronized IndexService createIndexService(...) throws IOException { final IndexSettings idxSettings = new IndexSettings(indexMetadata, settings, indexScopedSettings); ... ... final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry, getEngineFactory(idxSettings), directoryFactories, () -> allowExpensiveQueries, indexNameExpressionResolver, recoveryStateFactories); //添加监听器 for (IndexingOperationListener operationListener : indexingOperationListeners) { indexModule.addIndexOperationListener(operationListener); } pluginsService.onIndexModule(indexModule); for (IndexEventListener listener : builtInListeners) { indexModule.addIndexEventListener(listener); } return indexModule.newIndexService(...); }
复制代码
创建索引
将索引创建到集群状态中即生成新的 cluster state
static ClusterState clusterStateCreateIndex(ClusterState currentState, Set<ClusterBlock> clusterBlocks, IndexMetadata indexMetadata, BiFunction<ClusterState, String, ClusterState> rerouteRoutingTable, BiConsumer<Metadata.Builder, IndexMetadata> metadataTransformer) { Metadata.Builder builder = Metadata.builder(currentState.metadata()) .put(indexMetadata, false); if (metadataTransformer != null) { metadataTransformer.accept(builder, indexMetadata); } Metadata newMetadata = builder.build(); String indexName = indexMetadata.getIndex().getName(); ClusterBlocks.Builder blocks = createClusterBlocksBuilder(currentState, indexName, clusterBlocks); blocks.updateBlocks(indexMetadata); ClusterState updatedState = ClusterState.builder(currentState).blocks(blocks).metadata(newMetadata).build(); RoutingTable.Builder routingTableBuilder = RoutingTable.builder(updatedState.routingTable()) .addAsNew(updatedState.metadata().index(indexName)); updatedState = ClusterState.builder(updatedState).routingTable(routingTableBuilder.build()).build(); // 如果index状态open,执行allocationService.reroute 将分片分配到节点 return rerouteRoutingTable.apply(updatedState, "index [" + indexName + "] created");}
复制代码
cluster state 是全局性信息,包含了整个群集中所有分片的元信息(规则, 位置, 大小等信息),并保持每个每节的信息同步。除了 RoutingNodes 结构之外,cluster state 对象是不可变的,该结构是根据 RoutingTable 的需求构建的。
上面创建完索引后,也就会生成一个新的 cluster state,集群状态 cluster state 包含更新的路由,接着就会回调到 AllocationService#reroute,该方法会根据集群节点和路由机制分配分片,并在分片分片分配结束后,更新集群状态,再由主节点调用相关服务将集群状态分发到集群的其他节点上,完成集群状态同步
AllocationService 负责分片路由
AllocationService 这个服务负责管理集群的节点分配,将创建成功后的索引分配到分片上,包括选择节点用于 shard allocation——分片分配,管理新加入集群的新节点和分片的重新路由,他的触发条件有:
新增或删除 index 索引node 节点的新增或删除执行 reroute 命令修改 replica 副本数量集群重启
复制代码
此时场景是新建索引,分片路由的具体实现逻辑如下
//在存活的节点中重新路由 路由表//如果返回了相同的ClusterState实例,则不会进行任何更改。public ClusterState reroute(ClusterState clusterState, String reason) { // 检查是否存在需要调整的具有自动扩展功能的副本。 //如果需要更改,则返回更新的群集状态;如果不需要更改,则返回相同的群集状态。 ClusterState fixedClusterState = adaptAutoExpandReplicas(clusterState); //创建一个RoutingNodes RoutingNodes routingNodes = getMutableRoutingNodes(fixedClusterState); //洗牌未分配的节点 routingNodes.unassigned().shuffle(); /** *哪些分片应该分配到哪些节点上?哪个分片作为主分片, 哪个作为副本分片? *Elasticsearch 主要通过两个基础组件来完成分片分配这个过程的: allocator 和 deciders; * allocator 寻找最优的节点来分配分片; * allocator 负责找出拥有分片数量最少的节点列表, 按分片数量递增排序, 分片数量较少的会被优先选择; 对于新建索引, allocator 的目标是以更为均衡的方式把新索引的分片分配到集群的节点中; * deciders 负责判断并决定是否要进行分配; * deciders 依次遍历 allocator 给出的节点列表, 判断是否要把分片分配给该节点, 比如是否满足分配过滤规则, 分片是否将超出节点磁盘容量阈值等等; */ RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, fixedClusterState, clusterInfoService.getClusterInfo(), snapshotsInfoService.snapshotShardSizes(), currentNanoTime()); reroute(allocation); if (fixedClusterState == clusterState && allocation.routingNodesChanged() == false) { return clusterState; } return buildResultAndLogHealthChange(clusterState, allocation, reason); }
复制代码
分发集群状态
主节点将集群状态分发到集群中的其他节点上
try { ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(summary, newClusterState, previousClusterState); // new cluster state, notify all listeners final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta(); if (nodesDelta.hasChanges() && logger.isInfoEnabled()) { String nodesDeltaSummary = nodesDelta.shortSummary(); if (nodesDeltaSummary.length() > 0) { logger.info("{}, term: {}, version: {}, delta: {}", summary, newClusterState.term(), newClusterState.version(), nodesDeltaSummary); } }
logger.debug("publishing cluster state version [{}]", newClusterState.version()); publish(clusterChangedEvent, taskOutputs, publicationStartTime);
复制代码
监听器监听创建结果
等待集群中 Active shards 恢复到指定数目或者超时返回,将结果返回客户端。默认情况下:只要 Primary Shard 是 Active 的,也就是 wait_for_active_shards 指定的分片数量(默认为 1),就可以创建索引。这里有两个获取结果的方式,即 isAcknowledged()和 shardsAcknowledged(),如果 cluster state 创建成功,isAcknowledged()会返回 true(然后等待 shardsAcknowledged,如果超时,shardsAcknowledged 返回 false),否则返回 false(不会等待已经 started 的分片,isShardsAcknowledged 也会返回 false)。如果 Active shards 未达到指定的数目,则创建索引请求会阻塞,直到集群中 Active shards 恢复到指定数目或者超时返回。
onlyCreateIndex(request, ActionListener.wrap(response -> { // 检查isAcknowledged if (response.isAcknowledged()) { activeShardsObserver.waitForActiveShards(new String[]{request.index()}, request.waitForActiveShards(), request.ackTimeout(), shardsAcknowledged -> { // 检查shardsAcknowledged; if (shardsAcknowledged == false) { logger.debug("[{}] index created, but the operation timed out while waiting for " + "enough shards to be started.", request.index()); } listener.onResponse(new CreateIndexClusterStateUpdateResponse(response.isAcknowledged(), shardsAcknowledged)); }, listener::onFailure); } else { listener.onResponse(new CreateIndexClusterStateUpdateResponse(false, false)); }}, listener::onFailure));
复制代码
评论