写点什么

谈谈 ES 6.8 到 7.10 的功能变迁(5)- 任务和集群管理

作者:极限实验室
  • 2025-02-26
    北京
  • 本文字数:8970 字

    阅读完需:约 29 分钟

谈谈 ES 6.8 到 7.10 的功能变迁(5)- 任务和集群管理

这一篇我们继续了解 ES 7.10 相较于 ES 6.8 调优的集群管理和任务管理的方法,主要有断联查询的主动取消、投票节点角色、异步查询和可搜索快照四个功能。

Query 自动取消

对于一个完善的产品来说,当一个任务发起链接主动断联的时候,服务端与之相关的任务应该也都被回收。但是这个特性到了 elasticsearch 7.4 版本才有了明确的声明。


Elasticsearch now automatically terminates queries sent through the _search endpoint when the initiating connection is closed.


相关的 PR 和 issue 在这里,对源码有兴趣的同学可以挖掘一下。


PR:https://github.com/elastic/elasticsearch/pull/43332


issue:https://github.com/elastic/elasticsearch/issues/43105


简单来说,ES 接受在某个查询的 http 链接断掉的时候,与其相关的父子任务的自动取消。原来的场景下可能需要手工一个个关闭。

实际测试

利用 painless 模拟复杂查询,下面这个查询在测试集群上能维持 5s 左右


GET /_search?max_concurrent_shard_requests=1{    "query": {        "bool": {            "must": [                {                    "script": {                        "script": {                            "lang": "painless",                            "source": """                                long sum = 0;                                for (int i = 0; i < 100000; i++) {                                    sum += i;                                }                                return true;                            """                        }                    }                },                {                    "script": {                        "script": {                            "lang": "painless",                            "source": """                                long product = 1;                                for (int i = 1; i < 100000; i++) {                                    product *= i;                                }                                return true;                            """                        }                    }                },                {                    "script": {                        "script": {                            "lang": "painless",                            "source": """                                long factorial = 1;                                for (int i = 1; i < 100000; i++) {                                    factorial *= i;                                }                                long squareSum = 0;                                for (int j = 0; j < 100000; j++) {                                    squareSum += j * j;                                }                                return true;                            """                        }                    }                },                {                    "script": {                        "script": {                            "lang": "painless",                            "source": """                                long fib1 = 0;                                long fib2 = 1;                                long next;                                for (int i = 0; i < 100000; i++) {                                    next = fib1 + fib2;                                    fib1 = fib2;                                    fib2 = next;                                }                                return true;                            """                        }                    }                }            ]        }    }}
复制代码


查看任务被终止的状态


GET /_tasks?detailed=true&actions=*search*
复制代码


测试脚本,判断上面该查询被取消后是否还可以查到任务


import requestsimport multiprocessingimport timefrom requests.exceptions import RequestExceptionfrom datetime import datetime
# Elasticsearch 地址#ES_URL = "http://localhost:9210" # 6.8版本地址ES_URL = "http://localhost:9201"
# 耗时查询的 DSLLONG_RUNNING_QUERY = {"size":0, "query": { "bool": { "must": [ { "script": { "script": { "lang": "painless", "source": """ long sum = 0; for (int i = 0; i < 100000; i++) { sum += i; } return true; """ } } }, { "script": { "script": { "lang": "painless", "source": """ long product = 1; for (int i = 1; i < 100000; i++) { product *= i; } return true; """ } } }, { "script": { "script": { "lang": "painless", "source": """ long factorial = 1; for (int i = 1; i < 100000; i++) { factorial *= i; } long squareSum = 0; for (int j = 0; j < 100000; j++) { squareSum += j * j; } return true; """ } } }, { "script": { "script": { "lang": "painless", "source": """ long fib1 = 0; long fib2 = 1; long next; for (int i = 0; i < 100000; i++) { next = fib1 + fib2; fib1 = fib2; fib2 = next; } return true; """ } } } ] } }}
# 用于同步的事件对象query_finished = multiprocessing.Event()# 新增:进程终止标志位process_terminated = multiprocessing.Event()
# 定义一个函数用于添加时间戳到日志def log_with_timestamp(message,*message1): timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f"[{timestamp}] {message}+{message1}")
# 发起查询的函数def run_query(): try: log_with_timestamp("发起查询...") session = requests.Session() response = session.post( f"{ES_URL}/_search", json=LONG_RUNNING_QUERY, stream=True # 启用流式请求,允许后续中断 ) try: # 尝试读取响应内容(如果连接未被中断) if response.status_code == 200: log_with_timestamp("查询完成,结果:", response.json()) else: log_with_timestamp("查询失败,错误信息:", response.text) except RequestException as e: log_with_timestamp("请求被中断:", e) finally: # 标记查询完成 query_finished.set()

# 中断连接的信号函数def interrupt_signal(): time.sleep(1) # 等待 1 秒 log_with_timestamp("发出中断查询信号...") # 标记可以中断查询了 query_finished.set()

# 检测任务是否存在的函数def check_task_exists(): # 等待进程终止标志位 process_terminated.wait() max_retries = 3 retries = 0 time.sleep(1) #1s后检查 while retries < max_retries: log_with_timestamp("检查任务是否存在...") tasks_url = f"{ES_URL}/_tasks?detailed=true&actions=*search*" try: tasks_response = requests.get(tasks_url) if tasks_response.status_code == 200: tasks = tasks_response.json().get("nodes") if tasks: log_with_timestamp("任务仍存在:", tasks) else: log_with_timestamp("任务已消失") break else: log_with_timestamp("获取任务列表失败,错误信息:", tasks_response.text) except RequestException as e: log_with_timestamp(f"检测任务失败(第 {retries + 1} 次重试): {e}") retries += 1 time.sleep(1) # 等待 1 秒后重试 if retries == max_retries: log_with_timestamp("达到最大重试次数,无法检测任务状态。")

# 主函数def main(): # 启动查询进程 query_process = multiprocessing.Process(target=run_query) query_process.start()
# 启动中断信号进程 interrupt_process = multiprocessing.Process(target=interrupt_signal) interrupt_process.start()
# 等待中断信号 query_finished.wait()
# 检查查询进程是否还存活并终止它 if query_process.is_alive(): log_with_timestamp("尝试中断查询进程...") query_process.terminate() log_with_timestamp("查询进程已终止") # 新增:设置进程终止标志位 process_terminated.set()
# 启动任务检测进程 check_process = multiprocessing.Process(target=check_task_exists) check_process.start()
# 等待所有进程完成 query_process.join() interrupt_process.join() check_process.join()

if __name__ == "__main__": main()
复制代码


实际测试结果:


# 6.8 版本[2025-02-08 15:17:21] 发起查询...+()[2025-02-08 15:17:22] 发出中断查询信号...+()[2025-02-08 15:17:22] 尝试中断查询进程...+()[2025-02-08 15:17:22] 查询进程已终止+()[2025-02-08 15:17:23] 检查任务是否存在...+()[2025-02-08 15:17:23] 任务仍存在:+({'fYMNv_KxQGCGzhgfMxPXuA': {......}},)
复制代码


可以看到在查询任务被终止后 1s 再去检查,任务仍然存在


# 7.10 版本[2025-02-08 15:18:16] 发起查询...+()[2025-02-08 15:18:17] 发出中断查询信号...+()[2025-02-08 15:18:17] 尝试中断查询进程...+()[2025-02-08 15:18:17] 查询进程已终止+()[2025-02-08 15:18:18] 检查任务是否存在...+()[2025-02-08 15:18:18] 任务已消失+()
复制代码


这里可以看到任务已经检测不到了。

关于 timeout 配置

这里展开讨论下,timeout 配置。超时回收处理是一个‘best effort’行为。


(Optional, time units) Specifies the period of time to wait for a response. If no response is received before the timeout expires, the request fails and returns an error. Defaults to no timeout.


the search request is more of a best effort and does not guarantee that the request will never last longer than the specified amount of time.

异步搜索

使用方法

可以让用户进行异步的搜索,可以通过相关参数进行检查维护该搜索的状态和结果。比较合适查询量较大但对延迟要求较低的查询,进行精细化的管理控制。


注意:这里的参数基本都是添加到 url 上的,并不是添加到 request body 上的。


POST test_index/_async_search?keep_on_completion=true{  "query": {    "match_all": {}  }}
复制代码


注:这里为了产生查询结果 id 使用了 keep_on_completion 参数,这个参数的使用见下面解释。


返回结果,和一般的查询结果不同的是,添加了结果 id 和查询的一些状态数据。


{  "id": "Fmk2b0VjM2FEVE9Dbk9TemVyOTlkMncbOFlwRGU2OWZTa2kxNEpoT0Q2bVZrZzozODIz",//结果id,可以用于后续的复查  "is_partial": false,//是否为部分完成结果  "is_running": false,//是否还在查询  "start_time_in_millis": 1738978637287,//查询产生时间戳  "expiration_time_in_millis": 1739410637287,//查询结果过期时间戳  "response": {    "took": 1,    "timed_out": false,    "_shards": {      "total": 1,      "successful": 1,      "skipped": 0,      "failed": 0    },    "hits": {      "total": {        "value": 3,        "relation": "eq"      },      "max_score": 1,      "hits": [······]    }  }}
复制代码


管理查询结果


//查询结果和第一次返回的内容一致GET /_async_search/Fmk2b0VjM2FEVE9Dbk9TemVyOTlkMncbOFlwRGU2OWZTa2kxNEpoT0Q2bVZrZzozODIz
//主动删除查询结果DELETE /_async_search/Fmk2b0VjM2FEVE9Dbk9TemVyOTlkMncbOFlwRGU2OWZTa2kxNEpoT0Q2bVZrZzozODIz
复制代码

关键参数

  • wait_for_completion_timeout:参数(默认为 1 秒),这个参数用来设置异步查询的等待时间。当异步搜索在此时间内完成时,响应将不包括 ID,结果也不会存储在集群中

  • keep_on_completion:参数(默认为 false)可以设置为 true,可以强制存储查询结果,即便在 wait_for_completion_timeout 设置时间内完成搜索,该结果也能被查询到。

  • keep_alive:指定异步搜索结果可以被保存多长时间,默认为 5d(5 天)。在此期间之后,正在进行的异步搜索和任何保存的搜索结果将被删除。

  • batched_reduce_size:是 Elasticsearch 中的一个配置参数,默认值为 5。它的作用是控制分片结果的部分归并频率,具体来说,它决定了协调节点(coordinating node)在接收到多少个分片的响应后,会执行一次部分结果归并(partial reduction)。

  • pre_filter_shard_size:是 Elasticsearch 中与查询执行相关的一个参数,它的默认值为 1,并且不可更改。这个参数的作用是强制 Elasticsearch 在执行查询之前,先进行一轮预过滤(pre-filter),以确定哪些分片(shard)可能包含与查询匹配的文档,从而跳过那些肯定不包含匹配文档的分片。

查询结果存储位置

异步查询的结果部分存储在.async-search中,但是进行了程序加密,内容对使用者不可见。


GET .async-search/_search// 返回的结果···"hits": [      {        "_index": ".async-search",        "_type": "_doc",        "_id": "bPNotcTCTV-gSIiZLuK0IA",        "_score": 1,        "_source": {          "result": "i6+xAwFERm1KUVRtOTBZMVJEVkZZdFoxTkphVnBNZFVzd1NVRWJPRmx3UkdVMk9XWlRhMmt4TkVwb1QwUTJiVlpyWnpvek1EWTEAAQEDAD+AAAADP4AAAAAAABR0Sm9yNDVRQlQ3bzBsZTdsYmp0TgAAAARfZG9jAP//////////AwALeyJhIjoxMTExfQoAAAAAAAAAAQEAAAAWOFlwRGU2OWZTa2kxNEpoT0Q2bVZrZwp0ZXN0X2luZGV4Fk5fYmphNXM1UWtpcnU4RXdleVlGSUEAAAA/gAAAAAAAFHRab3I0NVFCVDdvMGxlN2xlVHNrAAAABF9kb2MA//////////8DAAt7ImEiOjExMTJ9CgAAAAAAAAABAQAAABY4WXBEZTY5ZlNraTE0SmhPRDZtVmtnCnRlc3RfaW5kZXgWTl9iamE1czVRa2lydThFd2V5WUZJQQAAAD+AAAAAAAAUdHBvcjQ1UUJUN28wbGU3bGZqc28AAAAEX2RvYwD//////////wMAC3siYSI6MTExM30KAAAAAAAAAAEBAAAAFjhZcERlNjlmU2tpMTRKaE9ENm1Wa2cKdGVzdF9pbmRleBZOX2JqYTVzNVFraXJ1OEV3ZXlZRklBAAAAAAAAAAAAAgABAQEAAAAAAAsAAAAAAAABlOMuvCQAAAGU/O6IJA==",          "headers": {},          "expiration_time": 1739410278436,          "response_headers": {}        }      },···
复制代码

只投票候选节点

这是一个主候选节点角色的优化,能相对固定 master 节点的位置,减少了选举候选节点过多的问题。

作用

Voting - only master - eligible node(仅参与投票的具备主节点资格的节点)在 Elasticsearch 集群中有以下作用:


  1. 参与主节点选举:该节点参与主节点选举过程,但本身不会成为集群选出的主节点,主要作为选举中的决胜因素(打破平局)。

  2. 保障高可用性:在高可用性(HA)集群中,至少需要三个具备主节点资格的节点,其中至少两个不能是仅参与投票的节点,这样即使有一个节点故障,集群仍能选出主节点。

  3. 分担选举及状态发布任务:和普通具备主节点资格的节点一样,在集群状态发布期间承担特定任务。

  4. 灵活承担其他角色:可以同时承担集群中的其他角色,如数据节点;也可以作为专用节点,不承担其他角色。

配置

三个节点的集群:可以配置两个普通主节点资格节点和一个仅参与投票的节点。这样在一个普通主节点故障时,剩下的普通主节点和仅参与投票的节点一起可以完成主节点选举,保证集群的正常运行。理论上,主候选节点数量能满足不同区域间的主备切换要求即可,其余可以都是投票节点。

可搜索快照

注意:这是一个收费功能

实现机制

可搜索快照让你能够通过使用快照来保障数据恢复能力,而非在集群内维护副本分片,从而降低运营成本。


当你将快照中的索引挂载为可搜索快照时,Elasticsearch 会将索引分片复制到集群内的本地存储中。这能确保搜索性能与搜索其他任何索引相当,并尽量减少对访问快照存储库的需求。如果某个节点发生故障,可搜索快照索引的分片会自动从快照存储库中恢复。


搜索可搜索快照索引与搜索其他任何索引的方式相同。搜索性能与常规索引相当,因为在挂载可搜索快照时,分片数据会被复制到集群中的节点上


如果某个节点发生故障,且需要从快照中恢复可搜索快照分片,在 Elasticsearch 将分片分配到其他节点的短暂时间内,集群健康状态将不会显示为绿色。在这些分片重新分配完成之前,对这些分片的搜索将会失败或返回部分结果。


对于搜索频率较低的数据,这能显著节省成本。使用可搜索快照,不再需要额外的索引分片副本以避免数据丢失,这有可能将搜索该数据所需的节点本地存储容量减少一半。同时可搜索快照依赖于备份使用的快照,也不需要额外的空间。

使用建议

  1. 从含多索引的快照挂载单个索引时,建议进行使用分隔,创建仅含目标索引的快照副本并挂载,方便独立管理备份与可搜索快照生命周期。

  2. 挂载为可搜索快照索引前,建议将索引强制合并为每分片一个段,减少从存储库读取数据的操作和成本。

实际测试

基础配置

前提条件:需要一个镜像使用存储,这里使用 minIO 作为测试


  1. 安装 S3 插件,并注册快照库信息


# 在线安装插件
elasticsearch-plugin install repository-s3
# 设置访问minio的信息,elasticsearch的bin目录下,使用minIO中设置的用户名密码
./elasticsearch-keystore add s3.client.default.access_key./elasticsearch-keystore add s3.client.default.secret_key
# 重载安全设置,然后重启节点POST _nodes/reload_secure_settings

# 注册快照库PUT _snapshot/my-minio-repository{ "type": "s3", "settings": { "bucket": "es-bucket", "endpoint": "http://127.0.0.1:9002", "compress": true }}
复制代码


  1. 挂载需要的快照索引


POST /_snapshot/my-minio-repository/snapshot_es_prp_cmain_20240829/_mount?wait_for_completion=true{  "index": "es_prp_cmain_insured_itemkind_detail_formal_20240829",  "renamed_index": "test_searchable_snapshot",//挂载时对索引进行重命名  "index_settings": {    "index.number_of_replicas": 0  },  "ignore_index_settings": [ "index.refresh_interval" ]}
复制代码


  1. 检查空间占用


GET _cat/indices/test_searchable_snapshot?vhealth status index                    uuid                   pri rep docs.count docs.deleted store.size pri.store.sizegreen  open   test_searchable_snapshot qROj2flcRdiGOZaejeAmQQ   1   0      10000            0     21.3mb         21.3mb
复制代码


在系统上也看到了对应 uuid 的文件目录


[root@hcss-ecs 0]# ls_state  snapshot_cache  translog[root@hcss-ecs 0]# pwd/data/elasticsearch-7.10.2/data/nodes/0/indices/qROj2flcRdiGOZaejeAmQQ/0
复制代码

小结

这篇的内容讲解测试的相对较细,对于查询的自动取消和异步查询增加了 ES 查询任务的灵活性;只投票节点也是加强了主节点选举的稳定性;可搜索快照是成本和功能的均衡方法,对于日志场景的使用是一个不错的选择。

推荐阅读

发布于: 刚刚阅读数: 5
用户头像

简单、易用、极致、创新 2021-11-22 加入

极限实验室(INFINI Labs)致力于打造极致易用的数据探索与分析体验。

评论

发布
暂无评论
谈谈 ES 6.8 到 7.10 的功能变迁(5)- 任务和集群管理_elasticsearch_极限实验室_InfoQ写作社区