写点什么

商品中心—商品 B 端搜索系统的实现文档(二)

  • 2025-06-26
    福建
  • 本文字数:20032 字

    阅读完需:约 66 分钟

8.步骤四:基于索引实现搜索功能


(1)基于 suggest 索引的自动补全实现


实现自动补全的代码比较简单,其原理是:把搜索词汇和倒排索引里的所有前缀匹配的词条进行 score 比较,然后把分数最高的那些返回,其中会涉及到 suggest 索引的 word1(IK 分词器 + pinyin 分词器)。

 

具体步骤如下:


步骤一:构建CompletionSuggestion条件步骤二:封装搜索请求步骤三:通过restHighLevelClient查询ElasticSearch步骤四:获取响应中的补全的词的列表
复制代码


@RestController@RequestMapping("/api/common")public class CommonSearchController {    ...    //通用服务组件    @Autowired    private CommonSearchService commonSearchService;
//输入内容自动补全接口 @GetMapping("/autoComplete") public JsonResult autoComplete(@RequestBody AutoCompleteRequest request) throws IOException { List<String> completedWords = commonSearchService.autoComplete(request); return JsonResult.buildSuccess(completedWords); } ...}
@Datapublic class AutoCompleteRequest { //索引名称 private String indexName; //字段名称 private String fieldName; //需要补全的词(用户输入的内容) private String text; //返回多少个补全后的词 private int count;}
//通用查询服务实现类@Servicepublic class CommonSearchServiceImpl implements CommonSearchService { private static final String MY_SUGGEST = "my_suggest";
@Autowired private RestHighLevelClient restHighLevelClient;
@Override public List<String> autoComplete(AutoCompleteRequest request) throws IOException { //1.构建CompletionSuggestion条件 CompletionSuggestionBuilder completionSuggestionBuilder = SuggestBuilders.completionSuggestion(request.getFieldName()); completionSuggestionBuilder.prefix(request.getText()); completionSuggestionBuilder.skipDuplicates(true); completionSuggestionBuilder.size(request.getCount());
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.sort(new ScoreSortBuilder().order(SortOrder.DESC)); searchSourceBuilder.suggest(new SuggestBuilder().addSuggestion(MY_SUGGEST, completionSuggestionBuilder));
//2.封装搜索请求 SearchRequest searchRequest = new SearchRequest(); searchRequest.indices(request.getIndexName()); searchRequest.source(searchSourceBuilder);
//3.查询ElasticSearch SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
//4.获取响应中的补全的词的列表 CompletionSuggestion completionSuggestion = searchResponse.getSuggest().getSuggestion(MY_SUGGEST); List<CompletionSuggestion.Entry.Option> options = completionSuggestion.getEntries().get(0).getOptions();
List<String> result = new ArrayList<>(); for (CompletionSuggestion.Entry.Option option : options) { result.add(option.getText().string()); }
return result; } ...}
复制代码


(2)输入框中的拼写纠错实现


实现拼写纠错的代码也比较简单,其原理是:把输入的有拼写错误的搜索词汇,先自动进行纠错。然后再和倒排索引里的所有匹配的词条进行 score 比较,最后把分数最高的那一条返回,其中会涉及到 suggest 索引的 word2。

 

具体步骤如下:


步骤一:构建PhraseSuggestion条件步骤二:封装搜索请求步骤三:通过restHighLevelClient查询ElasticSearch步骤四:获取响应中纠错后的词
复制代码


@RestController@RequestMapping("/api/common")public class CommonSearchController {    //通用服务组件    @Autowired    private CommonSearchService commonSearchService;    ...
//输入内容拼写纠错接口 @GetMapping("/spellingCorrection") public JsonResult spellingCorrection(@RequestBody SpellingCorrectionRequest request) throws IOException { String correctedWord = commonSearchService.spellingCorrection(request); return JsonResult.buildSuccess(correctedWord); }}
@Datapublic class SpellingCorrectionRequest { //索引名称 private String indexName; //字段名称 private String fieldName; //用户输入的内容 private String text;}
//通用查询服务实现类@Servicepublic class CommonSearchServiceImpl implements CommonSearchService { private static final String MY_SUGGEST = "my_suggest";
@Autowired private RestHighLevelClient restHighLevelClient; ...
@Override public String spellingCorrection(SpellingCorrectionRequest request) throws IOException { //1.构建PhraseSuggestion条件 PhraseSuggestionBuilder phraseSuggestionBuilder = new PhraseSuggestionBuilder(request.getFieldName()); phraseSuggestionBuilder.text(request.getText()); phraseSuggestionBuilder.size(1);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.sort(new ScoreSortBuilder().order(SortOrder.DESC)); searchSourceBuilder.suggest(new SuggestBuilder().addSuggestion(MY_SUGGEST, phraseSuggestionBuilder));
//2.封装搜索请求 SearchRequest searchRequest = new SearchRequest(); searchRequest.indices(request.getIndexName()); searchRequest.source(searchSourceBuilder);
//3.查询ElasticSearch SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
//4.获取响应中纠错后的词 PhraseSuggestion phraseSuggestion = searchResponse.getSuggest().getSuggestion(MY_SUGGEST); List<PhraseSuggestion.Entry.Option> options = phraseSuggestion.getEntries().get(0).getOptions();
return Optional.ofNullable(options).filter(e -> !e.isEmpty()).map(e -> e.get(0)).map(e -> e.getText().string()).orElse(""); }}
复制代码


(3)商品 B 端的商品搜索代码实现


搜索流程应为:输入搜索词 -> 拼写纠错 -> 自动补全 -> 全文检索。

 

具体步骤如下:


步骤一:构建match条件步骤二:设置搜索高亮配置(现在基本面向移动端,所以高亮处理也没太必要)步骤三:设置搜索分页参数步骤四:封装搜索请求步骤五:调用restHighLevelClient查询ElasticSearch步骤六:对结果进行高亮处理
复制代码


@RestController@RequestMapping("/api/product")public class ProductSearchController {    //商品服务组件    @Autowired    private ProductService productService;
//商品全文检索接口 @GetMapping("/fullTextSearch") public JsonResult fullTextSearch(@RequestBody FullTextSearchRequest request) throws IOException { SearchResponse searchResponse = productService.fullTextSearch(request); Map<String, Object> resultMap = new HashMap<>(); SearchHit[] hits = searchResponse.getHits().getHits(); long totalCount = searchResponse.getHits().getTotalHits().value; resultMap.put("hits", hits); resultMap.put("totalCount", totalCount); resultMap.put("pageNum", request.getPageNum()); resultMap.put("pageSize", request.getPageSize()); return JsonResult.buildSuccess(resultMap); } ...}
@Datapublic class FullTextSearchRequest { //索引名字 private String indexName; //查询参数:key为字段的名字,value为字段的关键词,可以指定从哪些字段里检索 private Map<String, String> queryTexts; //高亮字段 private String highLightField; //当前页 private int pageNum; //每页条数 private int pageSize;}
//商品查询服务实现类@Servicepublic class ProductServiceImpl implements ProductService { @Autowired private RestHighLevelClient restHighLevelClient;
@Override public SearchResponse fullTextSearch(FullTextSearchRequest request) throws IOException { SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.trackTotalHits(true);
//1.构建match条件 request.getQueryTexts().forEach((field, text) -> { searchSourceBuilder.query(QueryBuilders.matchQuery(field, text)); });
//2.设置搜索高亮配置(现在基本面向移动端,所以高亮处理也没太必要) HighlightBuilder highlightBuilder = new HighlightBuilder(); highlightBuilder.field(request.getHighLightField()); highlightBuilder.preTags("<span stype=color:red>"); //搜索结果里,商品标题和搜索词匹配的部分会显示为红色 highlightBuilder.postTags("</span>"); highlightBuilder.numOfFragments(0); searchSourceBuilder.highlighter(highlightBuilder);
//3.设置搜索分页参数 int from = (request.getPageNum() - 1) * request.getPageSize(); searchSourceBuilder.from(from); searchSourceBuilder.size(request.getPageSize());
//4.封装搜索请求 SearchRequest searchRequest = new SearchRequest(request.getIndexName()); searchRequest.source(searchSourceBuilder);
//5.查询ElasticSearch SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
//6.对结果进行高亮处理 SearchHits hits = searchResponse.getHits(); for (SearchHit hit : hits) { HighlightField highlightField = hit.getHighlightFields().get(request.getHighLightField()); Map<String, Object> sourceAsMap = hit.getSourceAsMap(); Text[] fragments = highlightField.fragments(); StringBuilder builder = new StringBuilder(); for (Text fragment : fragments) { builder.append(fragment.string()); } sourceAsMap.put(request.getHighLightField(), builder.toString()); } return searchResponse; } ...}
复制代码


(4)搜索结果为空时的自动推荐代码实现


如果全文检索的结果为空,那么可以继续调用自动推荐进行相似搜索。

 

搜索流程应为:输入搜索词 -> 拼写纠错(completion) -> 自动补全(phrase) -> 全文检索(match) -> 自动推荐(term)。

 

具体步骤如下:


步骤1:构建TermSuggestion条件步骤2:封装搜索请求步骤3:调用restHighLevelClient查询ElasticSearch步骤4:获取响应中推荐给用户的词
复制代码


@GetMapping("/recomendWhenMissing")public JsonResult recommendWhenMissing(@RequestBody RecommendWhenMissingRequest request) throws IOException {    String recommendWord = commonSearchService.recommendWhenMissing(request);    return JsonResult.buildSuccess(recommendWord);}
@Overridepublic String recommendWhenMissing(RecommendWhenMissingRequest request) throws IOException { //1.构建TermSuggestion条件 TermSuggestionBuilder termSuggestionBuilder = new TermSuggestionBuilder(request.getFieldName()); termSuggestionBuilder.text(request.getText()); termSuggestionBuilder.analyzer(IK_SMART); termSuggestionBuilder.minWordLength(2); termSuggestionBuilder.stringDistance(TermSuggestionBuilder.StringDistanceImpl.NGRAM);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.sort(new ScoreSortBuilder().order(SortOrder.DESC)); searchSourceBuilder.suggest(new SuggestBuilder().addSuggestion(MY_SUGGEST, termSuggestionBuilder));
//2.封装搜索请求 SearchRequest searchRequest = new SearchRequest(); searchRequest.indices(request.getIndexName()); searchRequest.source(searchSourceBuilder);
//3.查询ElasticSearch SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
//4.获取响应中推荐给用户的词 TermSuggestion termSuggestion = searchResponse.getSuggest().getSuggestion(MY_SUGGEST); List<TermSuggestion.Entry.Option> options = termSuggestion.getEntries().get(0).getOptions();
return Optional.ofNullable(options).map(e -> e.get(0)).map(e -> e.getText().string()).orElse("");}
复制代码


(5)基于多条件对商品进行结构化搜索


具体步骤如下:


步骤1:解析queryDSL步骤2:设置搜索分页参数步骤3:封装搜索请求步骤4:调用restHighLevelClient查询ElasticSearch
复制代码


@RestController@RequestMapping("/api/product")public class ProductSearchController {    //商品服务组件    @Autowired    private ProductService productService;    ...
//商品结构化搜索接口 @GetMapping("/structuredSearch") public JsonResult structuredSearch(@RequestBody StructuredSearchRequest request) throws IOException { SearchResponse searchResponse = productService.structuredSearch(request); Map<String, Object> resultMap = new HashMap<>(); SearchHit[] hits = searchResponse.getHits().getHits(); long totalCount = searchResponse.getHits().getTotalHits().value; resultMap.put("hits", hits); resultMap.put("totalCount", totalCount); resultMap.put("pageNum", request.getPageNum()); resultMap.put("pageSize", request.getPageSize()); return JsonResult.buildSuccess(resultMap); }}
@Datapublic class StructuredSearchRequest { //索引名字 private String indexName; //Query DSL:ES查询语法,是按照JSON来组织 //按照ElasticSearch的规范写的Query DSL,是一个JSON对象 //解析的时候转成JSON字符串,客户端API可以直接解析字符串 private Map<String, Object> queryDsl; //当前页 private int pageNum; //每页条数 private int pageSize;}
@Servicepublic class ProductServiceImpl implements ProductService { @Autowired private RestHighLevelClient restHighLevelClient; ...
@Override public SearchResponse structuredSearch(StructuredSearchRequest request) throws IOException { SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.trackTotalHits(true);
//1.解析queryDSL String queryDsl = JSON.toJSONString(request.getQueryDsl()); SearchModule searchModule = new SearchModule(Settings.EMPTY, false, Collections.emptyList()); NamedXContentRegistry namedXContentRegistry = new NamedXContentRegistry(searchModule.getNamedXContents()); XContent xContent = XContentFactory.xContent(XContentType.JSON); XContentParser xContentParser = xContent.createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, queryDsl); searchSourceBuilder.parseXContent(xContentParser);
//2.设置搜索分页参数 int from = (request.getPageNum() - 1) * request.getPageSize(); searchSourceBuilder.from(from); searchSourceBuilder.size(request.getPageSize());
//3.封装搜索请求 SearchRequest searchRequest = new SearchRequest(request.getIndexName()); searchRequest.source(searchSourceBuilder);
//4.查询ElasticSearch return restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); }}
复制代码


9.步骤五:大数据量写入 ES 和搜索性能的调优


(1)单线程将百万商品数据写入 ES


一.创建索引


PUT /demo_plan_sku_index_01{     "settings": {        "number_of_shards": 3,        "number_of_replicas": 1    },    "mappings": {        "properties": {            "skuId": {                "type": "keyword"            },            "skuName": {                "type": "text",                "analyzer": "ik_max_word",                "search_analyzer": "ik_smart"            },            "category": {                "type": "keyword"            },            "basePrice": {                "type": "integer"            },            "vipPrice": {                "type": "integer"            },            "saleCount": {                "type": "integer"            },            "commentCount": {                "type": "integer"            },            "skuImgUrl": {                "type": "keyword",                "index": false            },            "createTime": {                "type": "date",                "format": "yyyy-MM-dd HH:mm:ss"            },            "updateTime": {                "type": "date",                "format": "yyyy-MM-dd HH:mm:ss"            }        }    } }
复制代码


二.请求接口


/api/mockData/mockData1
复制代码


三.请求参数


写入 demo_plan_sku_index_01 索引,每次批量插入 1000 条商品数据,一共执行 1000 次批量插入。


{    "indexName":"demo_plan_sku_index_01",    "batchSize":1000,    "batchTimes":1000}
复制代码


四.请求响应


该次测试耗时 62s,写入了 100 万条数据。每个线程每秒可以写入 1.6 万条数据,所以单线程每秒差不多执行了 16 个 BulkRequest 批量写入。60ms 可以执行一次 BulkRequest 批量写入,每个 BulkRequest 会包含 1000 条数据。100 万条数据大概会占用几百 MB,所以很多数据都可以驻留在 ES 机器的 OS Cache 里,有利搜索。



{     "success": true,    "data": {        "totalCount": 1000000,        "elapsedSeconds": 62,        "perSecond": 16130    },    "errorCode": null,    "errorMessage": null}
复制代码


(2)多线程将百万商品数据写入 ES


一.创建索引


//demo_plan_sku_index_02和demo_plan_sku_index_03一样的PUT /demo_plan_sku_index_02{    "settings": {        "number_of_shards": 3,        "number_of_replicas": 1    },    "mappings": {        "properties": {            "skuId": {                "type": "keyword"            },            "skuName": {                 "type": "text",                "analyzer": "ik_max_word",                "search_analyzer": "ik_smart"            },            "category": {                "type":"keyword"            },            "basePrice": {                "type": "integer"            },            "vipPrice": {                "type": "integer"            },            "saleCount": {                "type": "integer"            },            "commentCount": {                "type": "integer"            },            "skuImgUrl": {                "type": "keyword",                "index": false            },            "createTime": {                "type": "date",                "format": "yyyy-MM-dd HH:mm:ss"            },            "updateTime": {                "type": "date",                "format": "yyyy-MM-dd HH:mm:ss"            }        }    } }
复制代码


二.请求接口


/api/mockData/mockData2
复制代码


三.请求参数


操作 demo_plan_sku_index_02 索引,每次批量插⼊1000 条商品数据,⼀共执⾏1000 次批量插⼊,使⽤30 个线程同时执⾏。


{    "indexName": "demo_plan_sku_index_02",    "batchSize": 1000,    "batchTimes": 1000,    "threadCount": 30}
复制代码


操作 demo_plan_sku_index_03 索引,每次批量插⼊1000 条商品数据,⼀共执⾏1000 次批量插⼊,使⽤60 个线程同时执⾏。


{    "indexName": "demo_plan_sku_index_03",    "batchSize": 1000,    "batchTimes": 1000,    "threadCount": 60}
复制代码


四.请求响应


该次测试耗时 11 秒,每秒写入 9 万条数据,总共使用 11 秒完成 100 万条数据的写入。由于有 30 个线程在并发地一起跑,所以每个线程每秒可以写入 3000 条数据。即每个线程每秒能将 3 个 BulkRequest 批量写入到 ES,每个 BulkRequest 的写入需要 300ms 左右。

 

对比单线程写入百万数据到 ES 时,每个线程每秒可以写入 1.6 万条数据。而这里多线程写入百万数据到 ES 时,每个线程每秒才写入 3000 天数据。

 

可见,并不是线程数量越多越好。线程数量越多会导致对 CPU 负载和消耗越大,要耗费更多时间进行线程上下文切换。CPU 负载高了之后,线程处理同样的任务,吞吐量和速率会下降。CPU 只要不超过 80%,其实都可以接受。



//下面是30个线程时的响应{    "success": true,    "data": {        "totalCount": 1000000,        "elapsedSeconds": 11,        "perSecond": 90909    },    "errorCode": null,    "errorMessage": null}//下面是60个线程时的响应{    "success": true,    "data": {        "totalCount": 1000000,        "elapsedSeconds": 10,        "perSecond": 100000    },    "errorCode": null,    "errorMessage": null}
复制代码


总结:多线程 + Bulk 批量写入,10 秒就可以完成百万级数据的写入。会有一个最佳的线程数,超过这个临界点,线程数越多反而效果会下降。

 

(3)数据写入到 ES 的存储层原理简析


首先 ES 会将收到的写入请求,将数据写到一个叫 index buffer 的 JVM 缓冲区中。然后会有一个线程,每隔 1 秒定时将这个 JVM 缓冲区的数据 refresh 刷新到 OS Page Cache。当数据刷到 OS Page Cache 时,就可以被 ES 搜索到了。过一段时间后,OS Page Cache 的数据会被 flush 到 ES 的磁盘文件里。

 

为了保证数据不丢失,会把数据也写入到内存 translog 里面,默认内存 translog 会每隔 5 秒进行刷盘到 translog 磁盘文件。

 

写入到单节点的数据还会进行副本复制到其他节点。

 

(4)将数据写入到 ES 的性能影响因素


因素一:refresh 间隔,默认会每隔 1 秒刷新 JVM 缓冲的数据到 OS Page Cache。这会影响数据写入的速度,在写入全量数据的场景,可以将间隔调大一点。比如 120 秒,通过减少频繁的 refresh 来提升性能。

 

因素二:副本复制会影响写入的速度。在写入全量数据的场景,同样没必要进行副本的复制。可以先将数据都写入到一个节点,之后再慢慢进行副本的复制。

 

因素三:index buffer 的大小。在写入全量数据的场景,可以调大 index buffer 的大小。

 

因素四:translog 的刷盘策略。在写入全量数据的场景,可以调整 translog 为异步刷盘,并且刷盘间隔调大一些。存放 translog 的内存大小也调大一些,让其存放更多的数据才去进行刷盘。

 

(5)全量数据写入 ES 的性能调优方案


下面这些参数的调整是针对写入全量数据的场景,全量写入完毕后应恢复原来的值。

 

一.调整 refresh_interval 参数(可以动态配置)。在全量写⼊数据的场景下,对"写⼊后 1s 就要能搜索到"的要求没有那么⾼。所以可以把这个值设置为 120s,来减少频繁的 refresh 和 lucene 段合并⾏为。



二.调整 number_of_replicas 参数(可以动态配置)。ElasticSearch 的副本数是可以动态调整的,写⼊时可以先把副本数设置为 0,缩短数据写⼊的流程。批量导⼊完成之后,重新设置回副本数。



三.调整 index_buffer_size 参数。把 JVM 缓冲区的大小调大,可以让数据先写入到内存。避免 JVM 缓存区内存太小,很快写满而需要频繁刷盘。



四.调整 translog 参数(可以动态配置)。把 translog 的相关参数调大,避免尽量触发 translog 刷盘策略。



综上可知:首先在 elasticsearch.yml 中修改 ES 的配置,然后重启 ES 集群的三个节点。


$ vi /app/elasticsearch/elasticsearch-7.9.3/config/elasticsearch.yml# 写⼊优化参数indices.memory.index_buffer_size: 30%indices.memory.min_index_buffer_size: 128m
复制代码


然后在创建索引时对索引进行如下配置:


{    "settings": {        "number_of_shards": 3,        "number_of_replicas": 0,        "index.refresh_interval": "120s",        "index.translog.durability": "async",        "index.translog.sync_interval": "120s",        "index.translog.flush_threshold_size": "2048mb"    }}
复制代码


(6)百万商品数据写入 ES 的调优性能



可见,调优后的写入性能提升了一倍多。完成全量数据写入 ES 后,就可以动态调整索引的 settings 来恢复默认的配置。


(7)亿级商品数据的搜索性能测试


一.全文搜索测试


请求接口:


/api/product/fullTextSearch
复制代码


请求参数:


{    "pageNum": 1,    "pageSize": 100,    "indexName": "demo_plan_sku_index",    "highLightField": "skuName",    "queryTexts": {        "skuName": "华为⼿机"    }}
复制代码


比如搜索"华为手机",那么首先会对搜索词使用 ik_smart 进行分词,分成"华为"和"手机",之后再去倒排索引里对"华为"和"手机"这两分词进行搜索。

 

在上亿的商品数据里进行全文检索,耗时几百 ms 算是很快了,符合标准。查询多次的耗时详情如下,其中匹配的文档数有 35 万。



二.结构化搜索测试


请求接口:


/api/product/structuredSearch
复制代码


请求参数:


{    "pageNum": 1,    "pageSize": 100,    "indexName": "career_plan_sku_index",    "queryDsl": {        "query": {            "bool": {                "must": [{                    "term": {                        "category": {                            "value": "⼿机"                        }                    }                }],                "filter": [{                    "range": {                        "basePrice": {                            "gte": 1000,                            "lte": 3000                        }                    }                }]            }        },        "sort": [{            "basePrice": {                "order":"desc"            }        }]    } }
复制代码


比如搜索手机分类下的商品按某价格区间倒序排列,刚开始需要花几秒。因为首先根据分类和价格区间去索引里查找数据,之后还需要按照价格排序。排序的过程可能会导致大量数据从磁盘读入内存,再写入临时磁盘文件进行排序,排序之后还需要分页提取。所以第一次整个过程比较慢。

 

后续再次搜索时,大量数据已经读入内存,不用再去进行磁盘 IO 了,所以会变快。查询多次的耗时详情如下,其中匹配的文档数有 35 万。



(8)ES 搜索性能优化的方案分析


ES 的性能是特别棒的,在合理的机器配置下,其实是不怎么需要做优化的。当我们的业务遇到查询瓶颈时再根据业务场景的特点从以下⼏点看看哪个能再去优化。而且 ES 比较适合全文检索,根据分词进行匹配打分排序,在上亿数据量之下也有非常好的搜索效果。但是 ES 面对结构化搜索则不稳定,使用多个条件来进行查询、按照指定条件进行排序,可能性能很差。因为其中可能会命中大量数据,然后产生大量的临时磁盘 IO。

 

一.ES 的查询缓存会保存在 OS 内存中。所以需要给操作系统的内存保留足够空间,不过一般都会把机器内存的一半给 JVM,另一半给 OS Cache。

 

二.磁盘 IO 性能和 CPU 性能。对于普通的搜索,磁盘 IO 的性能最影响搜索性能。对与计算⽐较多的搜索,CPU 的性能会是⼀个瓶颈。

 

三.建立预索引 Pre-Index。适⽤于数字类型的字段和经常做范围搜索的场景,比如可以把数字类型的字段转换成 keyword 类型的字段,把 range 查询转换为 terms 查询。

 

四.把 long 类型的 skuID 设置为 keyword 类型

 

五.强制合并一些只读的索引,避免从多个磁盘文件去搜索。

 

总结:最关键的其实是给 OS Cache 多预留一些内存,尽量让节点的数据都能加载到内存里。比如节点是 32G 内存的,16G 给 JVM,16G 给 OS Cache,然后节点的数据也控制好在 16G 内。否则如果 OS Cache 才 16G,但节点的数据已经几百 G 了,那搜索时必然会进行大量的磁盘 IO。也就是要想让 ES 提升搜索性能,主要靠将数据驻留在 OS Cache 里。所以要用大内存机器部署 ES 节点,尽量让每个节点上的主 shard 的数据量和 OS Cache 的内存量差不多。这样在搜索时,尽可能去 OS Cache 里查询数据,从而避免进行磁盘 IO。

 

10.elasticsearch-analysis-ik⼯程的表结构


⼀共有两张表:extension_word 扩展词库表,stop_word 停⽤词库表。


CREATE TABLE `extension_word` (    `id` int(11) NOT NULL AUTO_INCREMENT,    `word` varchar(64) NOT NULL,    `create_time` datetime NOT NULL,    `update_time` datetime NOT NULL,    PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE `stop_word` ( `id` int(11) NOT NULL AUTO_INCREMENT, `word` varchar(64) NOT NULL, `create_time` datetime NOT NULL, `update_time` datetime NOT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
复制代码

 

11.elasticsearch-analysis-ik⼯程的执行步骤


步骤一:读取数据库连接配置⽂件

步骤二:连接数据库

步骤三:查询扩展词库表和停⽤词库表

步骤四:添加到字典中

步骤五:使⽤⼀个线程周期性执⾏上⾯2-4 步

 

12.elasticsearch-analysis-ik⼯程的代码


(1)添加的 DictLoader 类


代码位置:


org.wltea.analyzer.dic.DictLoader
复制代码


//加载MySQL中的词库内容,单例public class DictLoader {    private static final Logger LOGGER = ESPluginLoggerFactory.getLogger(DictLoader.class.getName());    private static final DictLoader INSTANCE = new DictLoader();    private final String url;    private final String username;    private final String password;    private final AtomicBoolean extensionWordFistLoad = new AtomicBoolean(false);    private final AtomicReference<String> extensionWordLastLoadTimeRef = new AtomicReference<>(null);    private final AtomicBoolean stopWordFistLoad = new AtomicBoolean(false);    private final AtomicReference<String> stopWordLastLoadTimeRef = new AtomicReference<>(null);
//单例类,构造函数是私有的 private DictLoader() { //创建一个Properties配置数据对象,用来获取MySQL JDBC连接的配置 Properties mysqlConfig = new Properties(); //PathUtils会从指定目录下,对指定的文件名进行拼接,然后返回全路径名 //所以这里会把"IK分词器配置目录 + jdbc.properties"拼接成"jdbc.properties的成全路径名" Path configPath = PathUtils.get(Dictionary.getSingleton().getDictRoot(), "jdbc.properties"); try { //根据全路径名构建输入流,然后加载到mysqlConfig对象中,这样就可以从mysqlConfig对象读取配置值了 mysqlConfig.load(new FileInputStream(configPath.toFile())); this.url = mysqlConfig.getProperty("jdbc.url"); this.username = mysqlConfig.getProperty("jdbc.username"); this.password = mysqlConfig.getProperty("jdbc.password"); } catch (IOException e) { throw new IllegalStateException("加载jdbc.properties配置文件发生异常"); }
try { //加载MySQL驱动的类 Class.forName("com.mysql.cj.jdbc.Driver"); } catch (ClassNotFoundException e) { throw new IllegalStateException("加载数据库驱动时发生异常"); } }
public static DictLoader getInstance() { return INSTANCE; }
public void loadMysqlExtensionWords() { //每次从MySQL里加载词库时会执行一条SQL语句 //这时就必须要有一个和MySQL之间建立的网络连接,才能发送SQL语句出去
//由于这里会每分钟执行一次SQL语句 //所以每次执行SQL语句的时候就创建一个数据库的网络连接Connection,执行完SQL后再把该Connection释放即可
Connection connection = null; Statement statement = null; ResultSet resultSet = null;
String sql;
//第一次执行时会通过CAS操作把extensionWordFistLoad变量由false改成true,并且查全量词汇 //之后的执行,extensionWordFistLoad变量已经变为true,所以CAS操作会不成功,于是只查增量词汇 if (extensionWordFistLoad.compareAndSet(false, true)) { //首次加载会从数据库查全量的词汇 sql = "SELECT word FROM extension_word"; } else { //后面按照最近的修改时间来加载增量的词 sql = "SELECT word FROM extension_word WHERE update_time >= '" + extensionWordLastLoadTimeRef.get() + "'"; }
//每次生成了加载词库的SQL后,都会去设置一个本次加载的时间 SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String nowString = dateFormat.format(new Date()); //设置最近一次加载词库的时间,extensionWordLastLoadTimeRef也是Atomic变量,线程安全的 extensionWordLastLoadTimeRef.set(nowString);
//加载扩展词词库内容 try { //使用传统的JDBC编程获取连接 connection = DriverManager.getConnection(url, username, password); //创建statement statement = connection.createStatement(); //执行SQL语句获取结果集 resultSet = statement.executeQuery(sql); LOGGER.info("从MySQL加载extensionWord, sql={}", sql);
Set<String> extensionWords = new HashSet<>(); while (resultSet.next()) { String word = resultSet.getString("word"); if (word != null) { extensionWords.add(word); //为了方便看日志,可以把加载到的扩展词全都打印出来了 LOGGER.info("从MySQL加载extensionWord, word={}", word); } }
//放到字典里 Dictionary.getSingleton().addWords(extensionWords); } catch (Exception e) { LOGGER.error("从MySQL加载extensionWord发生异常", e); } finally { //把结果集resultSet、statement、连接connection都进行释放 if (resultSet != null) { try { resultSet.close(); } catch (SQLException e) { LOGGER.error(e); } }
if (statement != null) { try { statement.close(); } catch (SQLException e) { LOGGER.error(e); } }
if (connection != null) { try { connection.close(); } catch (SQLException e) { LOGGER.error(e); } } } }
public void loadMysqlStopWords() { Connection connection = null; Statement statement = null; ResultSet resultSet = null;
String sql; if (stopWordFistLoad.compareAndSet(false, true)) { sql = "SELECT word FROM stop_word"; } else { sql = "SELECT word FROM stop_word WHERE update_time >= '" + stopWordLastLoadTimeRef.get() + "'"; }
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String nowString = dateFormat.format(new Date()); stopWordLastLoadTimeRef.set(nowString);
//加载词库内容 try { connection = DriverManager.getConnection(url, username, password); statement = connection.createStatement(); resultSet = statement.executeQuery(sql); LOGGER.info("从MySQL加载stopWord, sql={}", sql);
Set<String> stopWords = new HashSet<>(); while (resultSet.next()) { String word = resultSet.getString("word"); if (word != null) { stopWords.add(word); LOGGER.info("从MySQL加载stopWord,word={}", word); } } // 放到字典里 Dictionary.getSingleton().addStopWords(stopWords); } catch (Exception e) { LOGGER.error("从MySQL加载extensionWord发生异常", e); } finally { if (resultSet != null) { try { resultSet.close(); } catch (SQLException e) { LOGGER.error(e); } }
if (statement != null) { try { statement.close(); } catch (SQLException e) { LOGGER.error(e); } }
if (connection != null) { try { connection.close(); } catch (SQLException e) { LOGGER.error(e); } } } }}
复制代码


(2)修改自带的 Dictionary 类


代码位置:


org.wltea.analyzer.dic.Dictionary#initial
复制代码


public class Dictionary {    ...    //词典单例实例    private static Dictionary singleton;    private static ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);    ...
//词典初始化 //由于IK Analyzer的词典采用Dictionary类的静态方法进行词典初始化 //只有当Dictionary类被实际调用时才会开始载入词典,这将延长首次分词操作的时间 //该方法提供了一个在应用加载阶段就初始化字典的手段 public static synchronized void initial(Configuration cfg) { if (singleton == null) { synchronized (Dictionary.class) { if (singleton == null) { singleton = new Dictionary(cfg); singleton.loadMainDict(); singleton.loadSurnameDict(); singleton.loadQuantifierDict(); singleton.loadSuffixDict(); singleton.loadPrepDict(); singleton.loadStopWordDict();
//在这里开启一个线程,每隔一段时间去mysql里面加载一下词库里的内容 new Thread(() -> { while (true) { try { DictLoader.getInstance().loadMysqlExtensionWords(); DictLoader.getInstance().loadMysqlStopWords(); TimeUnit.SECONDS.sleep(60); } catch (InterruptedException e) { e.printStackTrace(); } } }).start();
if (cfg.isEnableRemoteDict()) { //建立监控线程 for (String location : singleton.getRemoteExtDictionarys()) { //10秒是初始延迟可以修改的,60是间隔时间,单位秒 pool.scheduleAtFixedRate(new Monitor(location), 10, 60, TimeUnit.SECONDS); } for (String location : singleton.getRemoteExtStopWordDictionarys()) { pool.scheduleAtFixedRate(new Monitor(location), 10, 60, TimeUnit.SECONDS); } } } } } } ...}
复制代码


13.demo-product-es⼯程的介绍


(1)该⼯程⾥⾯有两个搜索相关的接⼝


一.全⽂搜索接⼝

二.结构化查询接⼝

 

(2)该工程有两个对⽤户输⼊进⾏处理的接⼝


一.输⼊内容⾃动补全接⼝

二.输⼊内容拼写纠错接⼝

 

(3)该工程有三个初始化数据的接⼝


一.单线程批量插⼊商品数据接⼝

二.多线程批量插⼊商品数据接⼝

三.单线程批量插⼊suggest 数据接⼝

 

该⼯程依赖了 ElasticSearch 的 rest⾼级客户端库:elasticsearch-rest-high-level-client,所有对 ElasticSearch 的操作都是通过 rest⾼级客户端库来完成的。

 

14.demo-product-es⼯程的商品索引


商品索引⽤来存储所有的商品信息。

 

(1)索引结构


商品模型的字段以满⾜测试需要为主不复杂,⼀共有 10 个字段。商品的索引名为:demo_plan_sku_index_序号。因为需要做多次不同的测试,有的测试是使⽤不同的索引,⽽且在实现接⼝时并没有把接⼝写死,可以指定操作那个索引,所以索引后⾯加了⼀个序号。

 

索引的 mappings 如下:


PUT /demo_plan_sku_index_15 {     "settings": {        "number_of_shards": 3,        "number_of_replicas": 1    },    "mappings": {        "properties": {            "skuId": {                "type": "keyword"            },            "skuName": {                "type": "text",                "analyzer": "ik_max_word",                "search_analyzer": "ik_smart"            },            "category": {                "type": "keyword"            },            "basePrice": {                "type": "integer"            },            "vipPrice": {                "type": "integer"            },            "saleCount": {                "type": "integer"            },            "commentCount": {                "type": "integer"            },            "skuImgUrl": {                "type": "keyword",                "index": false            },            "createTime": {                "type": "date",                "format": "yyyy-MM-dd HH:mm:ss"            },            "updateTime": {                "type": "date",                "format": "yyyy-MM-dd HH:mm:ss"            }        }    } }
复制代码


(2)数据类型说明


elasticsearch 相关⽂档链接:


数据类型⽂档:https://www.elastic.co/guide/en/elasticsearch/reference/7.9/mapping-types.html
text数据类型⽂档:https://www.elastic.co/guide/en/elasticsearch/reference/7.9/text.html
keyword数据类型⽂档:https://www.elastic.co/guide/en/elasticsearch/reference/7.9/keyword.html
数字类型⽂档:https://www.elastic.co/guide/en/elasticsearch/reference/7.9/number.html
时间类型⽂档:https://www.elastic.co/guide/en/elasticsearch/reference/7.9/number.html
复制代码


(3)使⽤的数据类型说明


一.skuName 商品名称

商品名称是⼀个字符串。我们要对商品名称进⾏全⽂检索,所以 skuName 字段使⽤了 text 类型。⽤analyzer 指定使⽤ik_max_word 分词器,这样在保存数据时商品名称会被尽可能多的分为多个词。⽤search_analyzer 指定搜索时使⽤ik_smart 分词器,这样尽可能做更符合⽤户期望的分词。

 

二.skuId 商品 id

商品 id⼀般是⼀个 long 类型的数字。我们可以使⽤ElasticSearch 的数字类型,但是我们使⽤的是 keyword 类型。因为⽂档⾥建议:如果没有要范围查询场景,且期望查询速度更快,数字类型的字段应使⽤keyword 类型。对于商品 id 来说,正好是⽂档中所说的情况。


⽂档链接:https://www.elastic.co/guide/en/elasticsearch/reference/7.9/number.html
复制代码


三.category 商品分类

商品分类是⼀个字符串。我们不会对商品分类做全⽂检索,⽽是对商品分类做 term 精准匹配的操作,所以使⽤keyword 类型。

 

四.basePrice 商品价 | vipPrice 商品会员价 | saleCount 商品销量 | commentCount 商品评论数

这⼏个字段都是数字。对于数字类型字段,⽂档中提到应在满⾜使⽤场景要求的情况下使⽤占⽤空间更⼩的类型,这⾥我们都使⽤Integer 类型。


⽂档链接:https://www.elastic.co/guide/en/elasticsearch/reference/7.9/number.html
复制代码


五.skuImgUrl 商品图⽚

商品图⽚是⼀个图⽚的 url 地址。我们不会对这个字段做任何搜索操作,也不需要索引这个字段,所以使⽤了 index:false 指定了不要索引这个字段。


⽂档链接:https://www.elastic.co/guide/en/elasticsearch/reference/7.9/keyword.html
复制代码


六.createTime 创建时间和 updateTime 修改时间

这两个字段是时间类型的字段,对应的 ElasticSearch 类型为 date,然后使⽤了 format 指定了时间的格式。


15.demo-product-es⼯程的 suggest 索引


suggest 索引⽤来存储和⽤户输⼊⾃动补全、拼写纠错、搜索推荐相关的数据的索引。这里的搜索推荐指的是:当没有⽤户要搜索的商品时推荐其他的商品。

 

(1)索引结构


⼀共有两个字段:word1 是⽤来做⾃动补全的,word2 是⽤来做拼写纠错和搜索推荐的。

 

索引的 mapping 如下:


PUT /demo_plan_sku_suggest_15{    "settings": {        "number_of_shards": 3,        "number_of_replicas": 1,        "analysis": {            "analyzer": {                "ik_and_pinyin_analyzer": {                    "type": "custom",                    "tokenizer": "ik_smart",                    "filter": "my_pinyin"                }            },            "filter": {                "my_pinyin": {                    "type": "pinyin",                    "keep_first_letter": true,                    "keep_full_pinyin": true,                    "keep_original": true,                    "remove_duplicated_term": true                }            }        }    },    "mappings": {        "properties": {            "word1": {                "type": "completion",                "analyzer": "ik_and_pinyin_analyzer"            },            "word2": {                "type": "text"            }        }    } }
复制代码


(2)数据类型说明


word1⽤来做⾃动补全的,ElasticSearch 中有专⻔对应的 completion 数据类型。


https://www.elastic.co/guide/en/elasticsearch/reference/7.9/mapping-types.html
复制代码


在上⾯创建索引时我们⾃⼰定义了⼀个 analyzer:ik_and_pinyin_analyzer,这个 analyzer 同时使⽤了 ik 分词器和 pinyin 分词器,这样⽤户输⼊汉字或者拼⾳的时候都能做⾃动补全。


文章转载自:东阳马生架构

原文链接:https://www.cnblogs.com/mjunz/p/18945187

体验地址:http://www.jnpfsoft.com/?from=001YH

用户头像

还未添加个人签名 2023-06-19 加入

还未添加个人简介

评论

发布
暂无评论
商品中心—商品B端搜索系统的实现文档(二)_架构_不在线第一只蜗牛_InfoQ写作社区