ES Client 性能测试初探
- 2023-01-04 北京
本文字数:6597 字
阅读完需:约 22 分钟
最近在工作中协助研发进行了 ES 优化,效果还是非常明显的,几乎翻倍。除了通过各种业务接口测试 ES 性能以外,还可以直接请求 ES 接口,绕过服务,这样应该数据回更加准确。所以,ES Client 学起来。
准备工作
首先,先准备了一个 ES 服务,这里就不多赘述了,大家自己在尝试的时候一定主意好 ES Server 和 ES Client 的版本要一致。其次,新建项目,添加依赖。
学习资料
搜一下,能搜到很多的 ES 学习资料,建议先去看看大厂出品的基础知识了解一下 ES 功能。然后就可以直接看 ES 的 API 了。下面是 ES 官方的文档地址:https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.7/java-rest-high-search.html
如果能能查看自己公司项目源码的小伙伴可以多研究研发的代码,能够更好结合业务理解 ES API 的使用。
ES Client
HTTP 请求
这里说一下,很多 ES 查询功能都是通过 HTTP 请求完成的,GET 请求,body 传参,一开始还是比较懵逼的。查了一些资料需要自己实现是个 body 携带数据的 HTTPGET 请求,下面是我的实现代码:
package com.funtester.httpclient
import org.apache.http.client.methods.HttpEntityEnclosingRequestBase
import javax.annotation.concurrent.NotThreadSafe
/** * HttpGet请求携带body参数 */@NotThreadSafeclass HttpGetByBody extends HttpEntityEnclosingRequestBase {
static final String METHOD_NAME = "GET";
/** * 获取方法(必须重载) * * @return */ @Override String getMethod() { return METHOD_NAME; }
/** * PS:不能照抄{@link org.apache.http.client.methods.HttpPost} * @param uri */ HttpGetByBody(final String uri) { this(new URI(uri)) }
HttpGetByBody(final URI uri) { super(); setURI(uri); }
HttpGetByBody() { super(); }}
ES Client
如果使用 HTTP 接口进行 ES 操作,需要组合多层级的参数,这个写起来会比较麻烦、可读性也比较差,而且更加容易出错。所以,还是使用 ES Client 作为操作 ES 的基础框架。
如果翻看 ES Client 源码,最终也是通过 HttpClient 发起 HTTP 请求的,这中间进行了很多的封装。这里分享一下 ES Client 的 HTTP Client 创建代码部分:
private CloseableHttpAsyncClient createHttpClient() { //default timeouts are all infinite RequestConfig.Builder requestConfigBuilder = RequestConfig.custom() .setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_MILLIS) .setSocketTimeout(DEFAULT_SOCKET_TIMEOUT_MILLIS); if (requestConfigCallback != null) { requestConfigBuilder = requestConfigCallback.customizeRequestConfig(requestConfigBuilder); }
try { HttpAsyncClientBuilder httpClientBuilder = HttpAsyncClientBuilder.create().setDefaultRequestConfig(requestConfigBuilder.build()) //default settings for connection pooling may be too constraining .setMaxConnPerRoute(DEFAULT_MAX_CONN_PER_ROUTE).setMaxConnTotal(DEFAULT_MAX_CONN_TOTAL) .setSSLContext(SSLContext.getDefault()) .setTargetAuthenticationStrategy(new PersistentCredentialsAuthenticationStrategy()); if (httpClientConfigCallback != null) { httpClientBuilder = httpClientConfigCallback.customizeHttpClient(httpClientBuilder); }
final HttpAsyncClientBuilder finalBuilder = httpClientBuilder; return AccessController.doPrivileged(new PrivilegedAction<CloseableHttpAsyncClient>() { @Override public CloseableHttpAsyncClient run() { return finalBuilder.build(); } }); } catch (NoSuchAlgorithmException e) { throw new IllegalStateException("could not create the default ssl context", e); } }
可以看出 ES Client 用到了 HttpClient 的异步 Client,我猜是用 future 实现同步返回响应结果,这个没仔细看,有错请指出。这里也回答我的自己的一个疑惑,ES Client 是支持并发的。
ES Client 封装
就我自己的观察,ES Client 的封装程度非常高,完全可以拿来就用。我担心自己过几天之后就不知道改怎么用这些 ES Client 的 API 了,所以又进行了一次封装,权当是一个学习笔记类。
封装代码有点多,放到了文末。
测试用例
添加数据
这个可以用来跑一部分数据到 ES 里。
package com.funtest.groovytest
import com.alibaba.fastjson.JSONObjectimport com.funtester.es.ESClientimport com.funtester.frame.SourceCodeimport com.funtester.frame.execute.FunQpsConcurrent
import java.util.concurrent.atomic.AtomicInteger
class ESC extends SourceCode {
static void main(String[] args) { def client = new ESClient("127.0.0.1", 9200, "http") def data = new JSONObject() data.name = "FunTester" data.age = getRandomInt(100) def index = new AtomicInteger(0) def test = { data.put("time", index.getAndIncrement()) client.index("fun", "tt", data) } new FunQpsConcurrent(test, "ES添加数据").start() }
}
如果想测试添加、删除功能,只需要把test闭包内容修改即可。
def test = { data.put("time", index.getAndIncrement()) client.delete("fun", "tt", client.index("fun", "tt", data)) }
下面是搜索功能的性能测试用例:
package com.funtest.groovytest
import com.alibaba.fastjson.JSONObjectimport com.funtester.es.ESClientimport com.funtester.frame.SourceCodeimport com.funtester.frame.execute.FunQpsConcurrentimport org.elasticsearch.index.query.QueryBuilders
import java.util.concurrent.atomic.AtomicInteger
class ESC extends SourceCode {
static void main(String[] args) { def client = new ESClient("127.0.0.1", 9200, "http") def data = new JSONObject() data.name = "FunTester" data.age = getRandomInt(100) def index = new AtomicInteger(0) def test = { client.search("fun", QueryBuilders.matchQuery("time", getRandomInt(10))) } new FunQpsConcurrent(test, "ES搜索").start() }
}
ES Client API 封装类
package com.funtester.es
import com.funtester.frame.SourceCodeimport groovy.util.logging.Log4j2import org.apache.http.HttpHostimport org.elasticsearch.action.delete.DeleteRequestimport org.elasticsearch.action.get.GetRequestimport org.elasticsearch.action.get.GetResponseimport org.elasticsearch.action.index.IndexRequestimport org.elasticsearch.action.index.IndexResponseimport org.elasticsearch.action.search.SearchRequestimport org.elasticsearch.action.search.SearchResponseimport org.elasticsearch.action.search.SearchScrollRequestimport org.elasticsearch.client.RequestOptionsimport org.elasticsearch.client.RestClientimport org.elasticsearch.client.RestHighLevelClientimport org.elasticsearch.common.unit.TimeValueimport org.elasticsearch.index.query.QueryBuilderimport org.elasticsearch.search.SearchHitsimport org.elasticsearch.search.builder.SearchSourceBuilderimport org.elasticsearch.search.fetch.subphase.FetchSourceContext
import java.util.concurrent.TimeUnit
/** * ES客户端API练习类 */@Log4j2class ESClient extends SourceCode {
String host
int port
String scheme
RestHighLevelClient client
ESClient(String host, int port = 9200, String scheme = "http") { this.host = host this.port = port this.scheme = scheme // 设置验证信息,填写账号及密码 // CredentialsProvider credentialsProvider = new BasicCredentialsProvider() // credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("user", "passwd")) def builder = RestClient.builder(new HttpHost(host, port, scheme)) // 设置认证信息 // builder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() { // // @Override // public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) { // return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider) // } // }) builder.setMaxRetryTimeoutMillis(1000) client = new RestHighLevelClient(builder) }
/** * 添加数据 * @param index * @param type * @param data * @return */ def index(String index, type, Map data) { IndexRequest indexRequest = new IndexRequest(index, type).source(data) IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT) indexResponse.getId() }
/** * 获取数据 * @param index * @param type * @param id * @return */ def get(String index, type, id) { // 查询文档 GetRequest getRequest = new GetRequest(index, type, id) GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT) if (getResponse.isExists()) { getResponse.getSourceAsString() } }
/** * 数据是否存在 * @param index * @param type * @param id * @return */ def exists(String index, type, id) { GetRequest getRequest = new GetRequest(index, type, id) getRequest.fetchSourceContext(new FetchSourceContext(false)) getRequest.storedFields("_none_") client.exists(getRequest, RequestOptions.DEFAULT) }
/** * 删除数据 * @param index * @param type * @param id * @return */ def delete(String index, type, id) { DeleteRequest deleteRequest = new DeleteRequest(index, type, id) client.delete(deleteRequest, RequestOptions.DEFAULT) }
/** * 搜索数据 * @param index * @param query * @param size * @return */ def search(String index, QueryBuilder query, int size = 10) { SearchRequest searchRequest = new SearchRequest(index) SearchSourceBuilder sourceBuilder = new SearchSourceBuilder() sourceBuilder.query(query) sourceBuilder.from(0) sourceBuilder.size(size) sourceBuilder.timeout(new TimeValue(1, TimeUnit.SECONDS)) searchRequest.source(sourceBuilder) client.search(searchRequest, RequestOptions.DEFAULT) }
/** * 滚动搜索 * @param index * @param query * @param size */ def searchScroll(String index, QueryBuilder query, int size = 10) { SearchRequest searchRequest = new SearchRequest(index) SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder() searchSourceBuilder.query(query) searchSourceBuilder.size(size) searchRequest.source(searchSourceBuilder) searchRequest.scroll(TimeValue.timeValueMinutes(1L)) SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT) String scrollId = searchResponse.getScrollId() SearchHits hits = searchResponse.getHits() def searchHits = hits.getHits() while (searchHits != null && searchHits.length > 0) { SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId) scrollRequest.scroll(TimeValue.timeValueMinutes(1L)) searchResponse = client.scroll(scrollRequest, RequestOptions.DEFAULT) scrollId = searchResponse.getScrollId() searchHits = searchResponse.getHits().getHits() }
}
def close() { client.close() }}
FunTester 原创专题推荐~~
-- By FunTester
版权声明: 本文为 InfoQ 作者【FunTester】的原创文章。
原文链接:【http://xie.infoq.cn/article/cdf7326bfc58ffead68d1bbf8】。文章转载请联系作者。
FunTester
公众号:FunTester,800篇原创,欢迎关注 2020-10-20 加入
Fun·BUG挖掘机·性能征服者·头顶锅盖·Tester










评论