写点什么

创建索引源码学习

作者:liang1993
  • 2022 年 3 月 14 日
  • 本文字数:5080 字

    阅读完需:约 17 分钟

创建索引源码学习

请求入口

接收 HTTP 请求的入口在:


org.elasticsearch.http.netty4.Netty4HttpRequestHandler
class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<Object> { @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { // here }}
复制代码


之后请求交给 RestController,对请求进行检验和分发


public class RestController implements HttpServerTransport.Dispatcher {    @Override    public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) {...        try {            //here            tryAllHandlers(request, channel, threadContext);        } catch (Exception e) {            try {                channel.sendResponse(new BytesRestResponse(channel, e));            } catch (Exception inner) {                inner.addSuppressed(e);                logger.error(() ->                    new ParameterizedMessage("failed to send failure response for uri [{}]", request.uri()), inner);            }        }    }}
复制代码


请求对象分为 Action 和 Request。Action 即动作/操作,主要职责是 1.封装 Request,Request 的职责就是携带具体请求参数 2.注册对应的 URI.比如 DeleteAction,CreateIndexAction 等所有操作都有对应的 Action 类。Request 的职责就是携带具体参数。其中有一个比较独特的 Action:TransportAction,它和它的子类的职责是将对应 request 发送到正确的节点上。3.业务操作具体选用哪个 Action 是由分发请求前,根据系统内已注册的 URI 确定的。

创建索引流程


客户端提交创建索引的基本信息(索引名称、分区数、副本数等),提交到服务端,服务端将 CreateIndexRequest 封装成 CreateIndexClusterStateUpdateRequest。根据 actionName 获得具体响应的 action,具体入口是:


org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction
复制代码


拿到响应 action 后调用实例属性 MetadataCreateIndexService#createIndex()进入到 onlyCreateIndex(),该实例属性的作用是:负责提交创建索引请求的服务;所以创建索引大致可以分为以下几个主要流程:

master 节点发起集群状态更新任务

创建索引会改变当前集群状态 ClusterState,集群状态只能在主节点上更新,所以 onlyCreateIndex 方法进来后,就会由 ClusterService 调起 MasterService,并在 master 节点上发起:提交一批集群状态更新任务


MasterService#submitStateUpdateTasks;
复制代码


经过一系列调用,在 master 节点运行创建索引任务


MasterService#runTasks
复制代码


再进入到具体的创建索引逻辑

匹配索引模板启动 IndexService 服务

创建索引需要通过 IndexModule 初始化一个 IndexService,IndexService 主要包含创建、删除、关闭分片等操作,即维护一个索引的基本操作。进入创建索引逻辑


public ClusterState applyCreateIndexRequest(ClusterState currentState, CreateIndexClusterStateUpdateRequest request, boolean silent,                                                BiConsumer<Metadata.Builder, IndexMetadata> metadataTransformer) throws Exception {    normalizeRequestSetting(request);    logger.trace("executing IndexCreationTask for [{}] against cluster state version [{}]", request, currentState.version());    //索引相关检验    validate(request, currentState);    ...    ...    return applyCreateIndexRequestWithV1Templates(currentState, request, silent, v1Templates, metadataTransformer);    }
复制代码


创建 IndexModule 和 IndexService


private synchronized IndexService createIndexService(...) throws IOException {    final IndexSettings idxSettings = new IndexSettings(indexMetadata, settings, indexScopedSettings);    ...    ...    final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry, getEngineFactory(idxSettings),            directoryFactories, () -> allowExpensiveQueries, indexNameExpressionResolver, recoveryStateFactories);        //添加监听器        for (IndexingOperationListener operationListener : indexingOperationListeners) {            indexModule.addIndexOperationListener(operationListener);        }        pluginsService.onIndexModule(indexModule);        for (IndexEventListener listener : builtInListeners) {            indexModule.addIndexEventListener(listener);        }        return indexModule.newIndexService(...);    }
复制代码

创建索引

将索引创建到集群状态中即生成新的 cluster state



static ClusterState clusterStateCreateIndex(ClusterState currentState, Set<ClusterBlock> clusterBlocks, IndexMetadata indexMetadata, BiFunction<ClusterState, String, ClusterState> rerouteRoutingTable, BiConsumer<Metadata.Builder, IndexMetadata> metadataTransformer) { Metadata.Builder builder = Metadata.builder(currentState.metadata()) .put(indexMetadata, false); if (metadataTransformer != null) { metadataTransformer.accept(builder, indexMetadata); } Metadata newMetadata = builder.build(); String indexName = indexMetadata.getIndex().getName(); ClusterBlocks.Builder blocks = createClusterBlocksBuilder(currentState, indexName, clusterBlocks); blocks.updateBlocks(indexMetadata); ClusterState updatedState = ClusterState.builder(currentState).blocks(blocks).metadata(newMetadata).build(); RoutingTable.Builder routingTableBuilder = RoutingTable.builder(updatedState.routingTable()) .addAsNew(updatedState.metadata().index(indexName)); updatedState = ClusterState.builder(updatedState).routingTable(routingTableBuilder.build()).build(); // 如果index状态open,执行allocationService.reroute 将分片分配到节点 return rerouteRoutingTable.apply(updatedState, "index [" + indexName + "] created");}
复制代码


cluster state 是全局性信息,包含了整个群集中所有分片的元信息(规则, 位置, 大小等信息),并保持每个每节的信息同步。除了 RoutingNodes 结构之外,cluster state 对象是不可变的,该结构是根据 RoutingTable 的需求构建的。


上面创建完索引后,也就会生成一个新的 cluster state,集群状态 cluster state 包含更新的路由,接着就会回调到 AllocationService#reroute,该方法会根据集群节点和路由机制分配分片,并在分片分片分配结束后,更新集群状态,再由主节点调用相关服务将集群状态分发到集群的其他节点上,完成集群状态同步

AllocationService 负责分片路由

AllocationService 这个服务负责管理集群的节点分配,将创建成功后的索引分配到分片上,包括选择节点用于 shard allocation——分片分配,管理新加入集群的新节点和分片的重新路由,他的触发条件有:


新增或删除 index 索引node 节点的新增或删除执行 reroute 命令修改 replica 副本数量集群重启
复制代码


此时场景是新建索引,分片路由的具体实现逻辑如下


//在存活的节点中重新路由  路由表//如果返回了相同的ClusterState实例,则不会进行任何更改。public ClusterState reroute(ClusterState clusterState, String reason) {        // 检查是否存在需要调整的具有自动扩展功能的副本。        //如果需要更改,则返回更新的群集状态;如果不需要更改,则返回相同的群集状态。        ClusterState fixedClusterState = adaptAutoExpandReplicas(clusterState);        //创建一个RoutingNodes        RoutingNodes routingNodes = getMutableRoutingNodes(fixedClusterState);        //洗牌未分配的节点        routingNodes.unassigned().shuffle();        /**         *哪些分片应该分配到哪些节点上?哪个分片作为主分片, 哪个作为副本分片?          *Elasticsearch 主要通过两个基础组件来完成分片分配这个过程的: allocator 和 deciders;         *  allocator 寻找最优的节点来分配分片;         *      allocator 负责找出拥有分片数量最少的节点列表, 按分片数量递增排序, 分片数量较少的会被优先选择; 对于新建索引, allocator 的目标是以更为均衡的方式把新索引的分片分配到集群的节点中;         *  deciders 负责判断并决定是否要进行分配;         *      deciders 依次遍历 allocator 给出的节点列表, 判断是否要把分片分配给该节点, 比如是否满足分配过滤规则, 分片是否将超出节点磁盘容量阈值等等;         */        RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, fixedClusterState,            clusterInfoService.getClusterInfo(), snapshotsInfoService.snapshotShardSizes(), currentNanoTime());        reroute(allocation);        if (fixedClusterState == clusterState && allocation.routingNodesChanged() == false) {            return clusterState;        }        return buildResultAndLogHealthChange(clusterState, allocation, reason);    }
复制代码

分发集群状态

主节点将集群状态分发到集群中的其他节点上


try {      ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(summary, newClusterState, previousClusterState);        // new cluster state, notify all listeners      final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta();        if (nodesDelta.hasChanges() && logger.isInfoEnabled()) {            String nodesDeltaSummary = nodesDelta.shortSummary();            if (nodesDeltaSummary.length() > 0) {                logger.info("{}, term: {}, version: {}, delta: {}",                    summary, newClusterState.term(), newClusterState.version(), nodesDeltaSummary);            }        }
logger.debug("publishing cluster state version [{}]", newClusterState.version()); publish(clusterChangedEvent, taskOutputs, publicationStartTime);
复制代码

监听器监听创建结果

等待集群中 Active shards 恢复到指定数目或者超时返回,将结果返回客户端。默认情况下:只要 Primary Shard 是 Active 的,也就是 wait_for_active_shards 指定的分片数量(默认为 1),就可以创建索引。这里有两个获取结果的方式,即 isAcknowledged()和 shardsAcknowledged(),如果 cluster state 创建成功,isAcknowledged()会返回 true(然后等待 shardsAcknowledged,如果超时,shardsAcknowledged 返回 false),否则返回 false(不会等待已经 started 的分片,isShardsAcknowledged 也会返回 false)。如果 Active shards 未达到指定的数目,则创建索引请求会阻塞,直到集群中 Active shards 恢复到指定数目或者超时返回。


onlyCreateIndex(request, ActionListener.wrap(response -> {        // 检查isAcknowledged    if (response.isAcknowledged()) {        activeShardsObserver.waitForActiveShards(new String[]{request.index()}, request.waitForActiveShards(), request.ackTimeout(),            shardsAcknowledged -> {                // 检查shardsAcknowledged;                if (shardsAcknowledged == false) {                    logger.debug("[{}] index created, but the operation timed out while waiting for " +                                     "enough shards to be started.", request.index());                }                listener.onResponse(new CreateIndexClusterStateUpdateResponse(response.isAcknowledged(), shardsAcknowledged));            }, listener::onFailure);    } else {        listener.onResponse(new CreateIndexClusterStateUpdateResponse(false, false));    }}, listener::onFailure));
复制代码


发布于: 刚刚阅读数: 2
用户头像

liang1993

关注

还未添加个人签名 2022.03.11 加入

还未添加个人简介

评论

发布
暂无评论
创建索引源码学习_elasticsearch_liang1993_InfoQ写作平台