创建索引源码学习
请求入口
接收 HTTP 请求的入口在:
org.elasticsearch.http.netty4.Netty4HttpRequestHandler
class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<Object> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
// here
}
}
复制代码
之后请求交给 RestController,对请求进行检验和分发
public class RestController implements HttpServerTransport.Dispatcher {
@Override
public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) {
...
try {
//here
tryAllHandlers(request, channel, threadContext);
} catch (Exception e) {
try {
channel.sendResponse(new BytesRestResponse(channel, e));
} catch (Exception inner) {
inner.addSuppressed(e);
logger.error(() ->
new ParameterizedMessage("failed to send failure response for uri [{}]", request.uri()), inner);
}
}
}
}
复制代码
请求对象分为 Action 和 Request。Action 即动作/操作,主要职责是 1.封装 Request,Request 的职责就是携带具体请求参数 2.注册对应的 URI.比如 DeleteAction,CreateIndexAction 等所有操作都有对应的 Action 类。Request 的职责就是携带具体参数。其中有一个比较独特的 Action:TransportAction,它和它的子类的职责是将对应 request 发送到正确的节点上。3.业务操作具体选用哪个 Action 是由分发请求前,根据系统内已注册的 URI 确定的。
创建索引流程
客户端提交创建索引的基本信息(索引名称、分区数、副本数等),提交到服务端,服务端将 CreateIndexRequest 封装成 CreateIndexClusterStateUpdateRequest。根据 actionName 获得具体响应的 action,具体入口是:
org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction
复制代码
拿到响应 action 后调用实例属性 MetadataCreateIndexService#createIndex()进入到 onlyCreateIndex(),该实例属性的作用是:负责提交创建索引请求的服务;所以创建索引大致可以分为以下几个主要流程:
master 节点发起集群状态更新任务
创建索引会改变当前集群状态 ClusterState,集群状态只能在主节点上更新,所以 onlyCreateIndex 方法进来后,就会由 ClusterService 调起 MasterService,并在 master 节点上发起:提交一批集群状态更新任务
MasterService#submitStateUpdateTasks;
复制代码
经过一系列调用,在 master 节点运行创建索引任务
再进入到具体的创建索引逻辑
匹配索引模板启动 IndexService 服务
创建索引需要通过 IndexModule 初始化一个 IndexService,IndexService 主要包含创建、删除、关闭分片等操作,即维护一个索引的基本操作。进入创建索引逻辑
public ClusterState applyCreateIndexRequest(ClusterState currentState, CreateIndexClusterStateUpdateRequest request, boolean silent,
BiConsumer<Metadata.Builder, IndexMetadata> metadataTransformer) throws Exception {
normalizeRequestSetting(request);
logger.trace("executing IndexCreationTask for [{}] against cluster state version [{}]", request, currentState.version());
//索引相关检验
validate(request, currentState);
...
...
return applyCreateIndexRequestWithV1Templates(currentState, request, silent, v1Templates, metadataTransformer);
}
复制代码
创建 IndexModule 和 IndexService
private synchronized IndexService createIndexService(...) throws IOException {
final IndexSettings idxSettings = new IndexSettings(indexMetadata, settings, indexScopedSettings);
...
...
final IndexModule indexModule = new IndexModule(idxSettings, analysisRegistry, getEngineFactory(idxSettings),
directoryFactories, () -> allowExpensiveQueries, indexNameExpressionResolver, recoveryStateFactories);
//添加监听器
for (IndexingOperationListener operationListener : indexingOperationListeners) {
indexModule.addIndexOperationListener(operationListener);
}
pluginsService.onIndexModule(indexModule);
for (IndexEventListener listener : builtInListeners) {
indexModule.addIndexEventListener(listener);
}
return indexModule.newIndexService(...);
}
复制代码
创建索引
将索引创建到集群状态中即生成新的 cluster state
static ClusterState clusterStateCreateIndex(ClusterState currentState, Set<ClusterBlock> clusterBlocks, IndexMetadata indexMetadata,
BiFunction<ClusterState, String, ClusterState> rerouteRoutingTable,
BiConsumer<Metadata.Builder, IndexMetadata> metadataTransformer) {
Metadata.Builder builder = Metadata.builder(currentState.metadata())
.put(indexMetadata, false);
if (metadataTransformer != null) {
metadataTransformer.accept(builder, indexMetadata);
}
Metadata newMetadata = builder.build();
String indexName = indexMetadata.getIndex().getName();
ClusterBlocks.Builder blocks = createClusterBlocksBuilder(currentState, indexName, clusterBlocks);
blocks.updateBlocks(indexMetadata);
ClusterState updatedState = ClusterState.builder(currentState).blocks(blocks).metadata(newMetadata).build();
RoutingTable.Builder routingTableBuilder = RoutingTable.builder(updatedState.routingTable())
.addAsNew(updatedState.metadata().index(indexName));
updatedState = ClusterState.builder(updatedState).routingTable(routingTableBuilder.build()).build();
// 如果index状态open,执行allocationService.reroute 将分片分配到节点
return rerouteRoutingTable.apply(updatedState, "index [" + indexName + "] created");
}
复制代码
cluster state 是全局性信息,包含了整个群集中所有分片的元信息(规则, 位置, 大小等信息),并保持每个每节的信息同步。除了 RoutingNodes 结构之外,cluster state 对象是不可变的,该结构是根据 RoutingTable 的需求构建的。
上面创建完索引后,也就会生成一个新的 cluster state,集群状态 cluster state 包含更新的路由,接着就会回调到 AllocationService#reroute,该方法会根据集群节点和路由机制分配分片,并在分片分片分配结束后,更新集群状态,再由主节点调用相关服务将集群状态分发到集群的其他节点上,完成集群状态同步
AllocationService 负责分片路由
AllocationService 这个服务负责管理集群的节点分配,将创建成功后的索引分配到分片上,包括选择节点用于 shard allocation——分片分配,管理新加入集群的新节点和分片的重新路由,他的触发条件有:
新增或删除 index 索引
node 节点的新增或删除
执行 reroute 命令
修改 replica 副本数量
集群重启
复制代码
此时场景是新建索引,分片路由的具体实现逻辑如下
//在存活的节点中重新路由 路由表
//如果返回了相同的ClusterState实例,则不会进行任何更改。
public ClusterState reroute(ClusterState clusterState, String reason) {
// 检查是否存在需要调整的具有自动扩展功能的副本。
//如果需要更改,则返回更新的群集状态;如果不需要更改,则返回相同的群集状态。
ClusterState fixedClusterState = adaptAutoExpandReplicas(clusterState);
//创建一个RoutingNodes
RoutingNodes routingNodes = getMutableRoutingNodes(fixedClusterState);
//洗牌未分配的节点
routingNodes.unassigned().shuffle();
/**
*哪些分片应该分配到哪些节点上?哪个分片作为主分片, 哪个作为副本分片?
*Elasticsearch 主要通过两个基础组件来完成分片分配这个过程的: allocator 和 deciders;
* allocator 寻找最优的节点来分配分片;
* allocator 负责找出拥有分片数量最少的节点列表, 按分片数量递增排序, 分片数量较少的会被优先选择; 对于新建索引, allocator 的目标是以更为均衡的方式把新索引的分片分配到集群的节点中;
* deciders 负责判断并决定是否要进行分配;
* deciders 依次遍历 allocator 给出的节点列表, 判断是否要把分片分配给该节点, 比如是否满足分配过滤规则, 分片是否将超出节点磁盘容量阈值等等;
*/
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, fixedClusterState,
clusterInfoService.getClusterInfo(), snapshotsInfoService.snapshotShardSizes(), currentNanoTime());
reroute(allocation);
if (fixedClusterState == clusterState && allocation.routingNodesChanged() == false) {
return clusterState;
}
return buildResultAndLogHealthChange(clusterState, allocation, reason);
}
复制代码
分发集群状态
主节点将集群状态分发到集群中的其他节点上
try {
ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(summary, newClusterState, previousClusterState);
// new cluster state, notify all listeners
final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta();
if (nodesDelta.hasChanges() && logger.isInfoEnabled()) {
String nodesDeltaSummary = nodesDelta.shortSummary();
if (nodesDeltaSummary.length() > 0) {
logger.info("{}, term: {}, version: {}, delta: {}",
summary, newClusterState.term(), newClusterState.version(), nodesDeltaSummary);
}
}
logger.debug("publishing cluster state version [{}]", newClusterState.version());
publish(clusterChangedEvent, taskOutputs, publicationStartTime);
复制代码
监听器监听创建结果
等待集群中 Active shards 恢复到指定数目或者超时返回,将结果返回客户端。默认情况下:只要 Primary Shard 是 Active 的,也就是 wait_for_active_shards 指定的分片数量(默认为 1),就可以创建索引。这里有两个获取结果的方式,即 isAcknowledged()和 shardsAcknowledged(),如果 cluster state 创建成功,isAcknowledged()会返回 true(然后等待 shardsAcknowledged,如果超时,shardsAcknowledged 返回 false),否则返回 false(不会等待已经 started 的分片,isShardsAcknowledged 也会返回 false)。如果 Active shards 未达到指定的数目,则创建索引请求会阻塞,直到集群中 Active shards 恢复到指定数目或者超时返回。
onlyCreateIndex(request, ActionListener.wrap(response -> {
// 检查isAcknowledged
if (response.isAcknowledged()) {
activeShardsObserver.waitForActiveShards(new String[]{request.index()}, request.waitForActiveShards(), request.ackTimeout(),
shardsAcknowledged -> {
// 检查shardsAcknowledged;
if (shardsAcknowledged == false) {
logger.debug("[{}] index created, but the operation timed out while waiting for " +
"enough shards to be started.", request.index());
}
listener.onResponse(new CreateIndexClusterStateUpdateResponse(response.isAcknowledged(), shardsAcknowledged));
}, listener::onFailure);
} else {
listener.onResponse(new CreateIndexClusterStateUpdateResponse(false, false));
}
}, listener::onFailure));
复制代码
评论