写点什么

商品中心—商品 B 端搜索系统的说明文档

  • 2025-06-23
    福建
  • 本文字数:17092 字

    阅读完需:约 56 分钟

1.商品 B 端搜索系统的运行流程 + 缓存和索引设计


(1)运行流程



(2)Redis 缓存设计


使用 Redis 缓存用户的搜索词记录,比如保存最近的 10 个搜索词记录,使⽤的数据结构:list。

key的格式:history_search_words:{userId}value的格式:["⽜奶", "鸡蛋", "⻁⽪凤⽖", "正⼤蒸饺"]
复制代码


(3)索引设计


一.商品索引


PUT /sku_info_index{    "settings": {        "number_of_shards": 3,        "number_of_replicas": 1    },    "mappings": {        "properties": {            "skuName": {                "type": "text",                "analyzer": "ik_max_word",                "search_analyzer": "ik_smart"            },            "basePrice": {                "type": "integer"            },            "vipPrice": {                "type": "integer"            },            "brandId": {                "type": "keyword"            },            "brandName": {                "type": "keyword"            },            "saleCount": {                "type": "integer"            },            "createTime": {                "type": "date",                "format": "yyyy-MM-dd HH:mm:ss"            },            "updateTime": {                "type": "date",                "format": "yyyy-MM-dd HH:mm:ss"            }        }    }}
复制代码


二.索引字段说明



三.数据示例


{    "_index": "sku_info_index",    "_type": "_doc",    "_id": "8000177337",    "_score": 1.0,    "_source": {        "skuName": "Apple iPhone 13 Pro Max 256GB 苍岭绿⾊ ⽀持移动联通电信5G 双卡双待⼿机",        "brandName": "苹果",        "createTime": "2022-03-12 08:24:57",        "brandId": 4,        "vipPrice": 9799,        "updateTime": "2022-03-12 08:24:57",        "basePrice": 9999    }}
复制代码


四.搜索补全索引


put /completion_word_index{    "settings": {        "number_of_shards": 3,        "number_of_replicas": 1,        "analysis": {            "analyzer": {                "ik_and_pinyin_analyzer": {                    "type": "custom",                    "tokenizer": "ik_smart",                    "filter": "my_pinyin"                }            },            "filter": {                "my_pinyin": {                    "type": "pinyin",                    "keep_first_letter": true,                    "keep_full_pinyin": true,                    "keep_original": true,                    "remove_duplicated_term": true                }            }        }    },    "mappings": {        "properties": {            "completion_word": {                "type": "completion",                "analyzer": "ik_and_pinyin_analyzer"            }        }    }}
复制代码


2.商品 B 端搜索系统监听数据变更与写入 ES 索引


(1)消息处理系统添加数据监听配置


一.data_change_listen_config 表


INSERT INTO data_change_listen_config (id, table_name, key_column, filter_flag, del_flag, create_user, create_time, update_user, update_time) VALUES (1, 'sku_info', 'sku_id', 1, 1, 0, '2022-02-25 13:42:28', 0, '2022-02-25 13:42:28');INSERT INTO data_change_listen_config (id, table_name, key_column, filter_flag, del_flag, create_user, create_time, update_user, update_time) VALUES (2, 'item_info', 'item_id', 1, 1, 0, '2022-02-25 13:42:28', 0, '2022-02-25 13:42:28');
复制代码


二.data_change_column_config 表


INSERT INTO data_change_column_config (id, listen_id, listen_column, del_flag, create_user, create_time, update_user, update_time)VALUES (1, 1, 'sku_name', 1, 0, '2022-02-25 13:43:28', 0, '2022-02-25 13:43:28');INSERT INTO data_change_column_config (id, listen_id, listen_column, del_flag, create_user, create_time, update_user, update_time)VALUES (2, 1, 'channel', 1, 0, '2022-02-25 13:43:28', 0, '2022-02-25 13:43:28');INSERT INTO data_change_column_config (id, listen_id, listen_column, del_flag, create_user, create_time, update_user, update_time)VALUES (3, 1, 'features', 1, 0, '2022-02-25 13:43:28', 0, '2022-02-25 13:43:28');INSERT INTO data_change_column_config (id, listen_id, listen_column, del_flag, create_user, create_time, update_user, update_time)VALUES (4, 1, 'vip_price', 1, 0, '2022-02-25 13:43:28', 0, '2022-02-25 13:43:28');INSERT INTO data_change_column_config (id, listen_id, listen_column, del_flag, create_user, create_time, update_user, update_time)VALUES (5, 1, 'base_price', 1, 0, '2022-02-25 13:43:28', 0, '2022-02-25 13:43:28');INSERT INTO data_change_column_config (id, listen_id, listen_column, del_flag, create_user, create_time, update_user, update_time)VALUES (6, 2, 'brand_id', 1, 0, '2022-02-25 13:43:28', 0, '2022-02-25 13:43:28');
复制代码


三.data_change_message_config 表


INSERT INTO data_change_message_config (id, listen_id, notify_column, message_topic, delay_level, message_type, del_flag, create_user, create_time, update_user, update_time)VALUES (1, 1, 'id,sku_id', 'product_update_topic', 3, 1, 1, 0, '2022-02-25 13:45:24', 0, '2022-02-25 13:45:24');INSERT INTO data_change_message_config (id, listen_id, notify_column, message_topic, delay_level, message_type, del_flag, create_user, create_time, update_user, update_time)VALUES (3, 2, 'id,item_id', 'product_update_topic', 3, 1, 1, 0, '2022-02-25 13:45:24', 0, '2022-02-25 13:45:24');
复制代码


(2)商品 B 端搜索系统下的数据变更消息消费者


@Configurationpublic class ConsumerBeanConfig {    //配置内容对象    @Autowired    private RocketMQProperties rocketMQProperties;        //监听商品修改的MQ消息    @Bean("productUpdateTopic")    public DefaultMQPushConsumer productUpdateTopic(ProductUpdateListener productUpdateListener) throws MQClientException {        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketMqConstant.PRODUCT_UPDATE_CONSUMER_GROUP);        consumer.setNamesrvAddr(rocketMQProperties.getNameServer());        consumer.subscribe(RocketMqConstant.PRODUCT_UPDATE_TOPIC, "*");        consumer.registerMessageListener(productUpdateListener);        consumer.start();        return consumer;    }}
//搜索模块在商品变更的时候更新商品索引@Componentpublic class ProductUpdateListener implements MessageListenerConcurrently { @Autowired private MessageHandlerManager messageHandlerManager;
@Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { try { for (MessageExt messageExt : list) { String msg = new String(messageExt.getBody()); log.info("执行商品索引数据更新逻辑,消息内容:{}", msg); TableDataChangeDTO tableDataChangeDTO = JsonUtil.json2Object(msg, TableDataChangeDTO.class); //处理消息 messageHandlerManager.handleMessage(tableDataChangeDTO); } } catch (Exception e){ log.error("consume error, 商品索引数据更新失败", e); //本次消费失败,下次重新消费 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}
@Componentpublic class MessageHandlerManager { //继承了MessageHandler的ItemInfoTableMessageHandler和SkuInfoTableMessageHandler都会被注入到这里 @Autowired private List<MessageHandler> messageHandlers; public void handleMessage(TableDataChangeDTO tableDataChangeDTO) throws IOException { MessageHandler messageHandlerToUse = messageHandlers.stream() .filter(e -> StringUtils.equals(e.tableName(), tableDataChangeDTO.getTableName())) .findFirst() .orElse(null); if (messageHandlerToUse == null) { return; } messageHandlerToUse.handleMessage(tableDataChangeDTO); }}
复制代码


(3)sku 表变更消息处理器


@Componentpublic class SkuInfoTableMessageHandler implements MessageHandler {    @Autowired    private ProductSearchRepository productSearchRepository;        @Override    public String tableName() {        return "sku_info";    }        @Override    public void handleMessage(TableDataChangeDTO tableDataChangeDTO) throws IOException {        String skuId = String.valueOf(tableDataChangeDTO.getKeyId());        //到数据库查询索引相关的信息        ProductSearchDO productSearchDO = productSearchRepository.queryProductSearchInfo(skuId);        //保存索引数据到ES        productSearchRepository.saveProductSearchInfos(Collections.singletonList(productSearchDO));    }}
@Repositorypublic class ProductSearchRepository { private static final String SKU_INFO_INDEX = "sku_info_index"; @Autowired private RestHighLevelClient restHighLevelClient; @Autowired private SkuInfoMapper skuInfoMapper; //根据skuId查询和商品索引相关的信息 public ProductSearchDO queryProductSearchInfo(String skuId) { return skuInfoMapper.queryProductSearchInfo(skuId); } //批量保存商品索引数据 public void saveProductSearchInfos(List<ProductSearchDO> productSearchDOS) throws IOException { BulkRequest bulkRequest = new BulkRequest(); for (ProductSearchDO productSearchDO : productSearchDOS) { Map<String, Object> jsonMap = new HashMap<>(); jsonMap.put("skuName", productSearchDO.getSkuName()); jsonMap.put("basePrice", productSearchDO.getBasePrice()); jsonMap.put("vipPrice", productSearchDO.getVipPrice()); jsonMap.put("brandId", productSearchDO.getBrandId()); jsonMap.put("brandName", productSearchDO.getBrandName()); jsonMap.put("createTime", new Date()); jsonMap.put("updateTime", new Date()); IndexRequest indexRequest = new IndexRequest(SKU_INFO_INDEX).id(productSearchDO.getSkuId()).source(jsonMap); bulkRequest.add(indexRequest); } restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT); } ...}
复制代码


(4)item 表变更消息处理器


@Componentpublic class ItemInfoTableMessageHandler implements MessageHandler {    @Autowired    private ProductSearchRepository productSearchRepository;        @Override    public String tableName() {        return "item_info";    }        @Override    public void handleMessage(TableDataChangeDTO tableDataChangeDTO) throws IOException {        String itemId = String.valueOf(tableDataChangeDTO.getKeyId());        List<ProductSearchDO> productSearchDOS = productSearchRepository.queryProductSearchInfos(itemId);        productSearchRepository.saveProductSearchInfos(productSearchDOS);    }}
@Repositorypublic class ProductSearchRepository { private static final String SKU_INFO_INDEX = "sku_info_index"; @Autowired private RestHighLevelClient restHighLevelClient; @Autowired private SkuInfoMapper skuInfoMapper; //根据itemId查询和商品索引相关的信息 public List<ProductSearchDO> queryProductSearchInfos(String itemId) { return skuInfoMapper.queryProductSearchInfos(itemId); } //批量保存商品索引数据 public void saveProductSearchInfos(List<ProductSearchDO> productSearchDOS) throws IOException { BulkRequest bulkRequest = new BulkRequest(); for (ProductSearchDO productSearchDO : productSearchDOS) { Map<String, Object> jsonMap = new HashMap<>(); jsonMap.put("skuName", productSearchDO.getSkuName()); jsonMap.put("basePrice", productSearchDO.getBasePrice()); jsonMap.put("vipPrice", productSearchDO.getVipPrice()); jsonMap.put("brandId", productSearchDO.getBrandId()); jsonMap.put("brandName", productSearchDO.getBrandName()); jsonMap.put("createTime", new Date()); jsonMap.put("updateTime", new Date()); IndexRequest indexRequest = new IndexRequest(SKU_INFO_INDEX).id(productSearchDO.getSkuId()).source(jsonMap); bulkRequest.add(indexRequest); } restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT); } ...}
复制代码


3.商品 B 端搜索系统的历史搜索词的实现


(1)商品 B 端保存历史搜索词的接⼝


使用场景:商家输入搜索词搜索商品的时候接口说明:把商家搜索过的词保存到Redis的List数据结构中
复制代码


//商品搜索服务@DubboService(version = "1.0.0", interfaceClass = ProductSearchApi.class, retries = 0)public class ProductSearchApiImpl implements ProductSearchApi {    @Resource    private RedisCache redisCache;        @Resource    private ProductSearchRepository productSearchRepository;        //保存历史搜索词接口    @Override    public JsonResult<HistorySearchWordResultDTO> saveHistorySearchWord(HistorySearchWordRequest request) {        //在队列头部添加新的历史搜索词        redisCache.lpush(HistorySearchWordConstants.getKey(request.getUserId()), request.getHistorySearchWord());        //修改队列只保存固定数量的搜索词        redisCache.ltrim(HistorySearchWordConstants.getKey(request.getUserId()), 0, HistorySearchWordConstants.HISTORY_WORD_COUNT_PER_USER - 1);        return JsonResult.buildSuccess(new HistorySearchWordResultDTO(true));    }    ...}
//保存用户历史搜索词请求@Datapublic class HistorySearchWordRequest implements Serializable { //用户id private Long userId; //新的历史搜索词 private String historySearchWord;}
复制代码


(2)商品 B 端查询历史搜索词的接⼝


使用场景:展示商家的搜索历史记录的时候接口说明:从Redis列表中查询商家的历史搜索词
复制代码


//商品搜索服务@DubboService(version = "1.0.0", interfaceClass = ProductSearchApi.class, retries = 0)public class ProductSearchApiImpl implements ProductSearchApi {    @Resource    private RedisCache redisCache;        @Resource    private ProductSearchRepository productSearchRepository;        //查询历史搜索词接口    @Override    public JsonResult<HistorySearchWordDTO> listHistorySearchWords(HistorySearchWordQuery request) {        List<String> result = redisCache.lrange(HistorySearchWordConstants.getKey(request.getUserId()), 0, HistorySearchWordConstants.HISTORY_WORD_COUNT_PER_USER - 1);        return JsonResult.buildSuccess(new HistorySearchWordDTO(result));    }    ...}
//查询商家历史搜索词请求@Datapublic class HistorySearchWordQuery implements Serializable { //用户id private Long userId;}
复制代码


4.商品 B 端搜索系统的搜索词补全的实现


(1)商品 B 端搜索系统的添加搜索补全词的接⼝


使用场景:运营人员添加搜索补全词的时候接口说明:把搜索补全词保存到ES的搜索补全词索引中
复制代码


//搜索词@DubboService(version = "1.0.0", interfaceClass = CompletionSearchWordApi.class, retries = 0)public class CompletionSearchWordApiImpl implements CompletionSearchWordApi {    @Autowired    private CompletionSearchWordService completionSearchWordService;        //保存搜索补全词接口    @Override    public JsonResult<CompletionSearchWordResultDTO> saveCompletionSearchWord(CompletionSearchWordRequest request) {        try {            CompletionSearchWordResultDTO resultDTO = completionSearchWordService.saveCompletionSearchWord(request);            return JsonResult.buildSuccess(resultDTO);        } catch (ProductBizException e) {            log.error("biz error: request={}", JSON.toJSONString(request), e);            return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg());        } catch (Exception e) {            log.error("system error: request={}", JSON.toJSONString(request), e);            return JsonResult.buildError(e.getMessage());        }    }    ...}
@Servicepublic class CompletionSearchWordServiceImpl implements CompletionSearchWordService { @Autowired private CompletionSearchWordRepository completionSearchWordRepository; //保存搜索补全词 @Override public CompletionSearchWordResultDTO saveCompletionSearchWord(CompletionSearchWordRequest request) throws IOException { return completionSearchWordRepository.saveCompletionSearchWord(request); } ...}
//运营添加搜索补全词请求@Datapublic class CompletionSearchWordRequest implements Serializable { //索引名称 private String indexName; //字段名称 private String fieldName; //要添加的补全词 private List<String> completionSearchWords;}
@Repositorypublic class CompletionSearchWordRepository { @Autowired private RestHighLevelClient restHighLevelClient; //保存搜索补全词 public CompletionSearchWordResultDTO saveCompletionSearchWord(CompletionSearchWordRequest request) throws IOException { BulkRequest bulkRequest = new BulkRequest(request.getIndexName()); List<String> completionSearchWords = request.getCompletionSearchWords(); for (String completionSearchWord : completionSearchWords) { bulkRequest.add(new IndexRequest().source(XContentType.JSON, request.getFieldName(), completionSearchWord)); } restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT); return new CompletionSearchWordResultDTO(true); } ...}
复制代码


(2)商品 B 端搜索系统查询搜索补全词的接口


使用场景:后台展示搜索补全词列表的时候接口说明:从ES的搜索补全词索引中分页查询数据
复制代码


//搜索词@DubboService(version = "1.0.0", interfaceClass = CompletionSearchWordApi.class, retries = 0)public class CompletionSearchWordApiImpl implements CompletionSearchWordApi {    @Autowired    private CompletionSearchWordService completionSearchWordService;        //查询补全词接口    @Override    public JsonResult<PageResult<CompletionSearchWordDTO>> listCompletionSearchWordPage(QueryCompletionSearchWordPageRequest request) {        try {            PageResult<CompletionSearchWordDTO> resultDTO = completionSearchWordService.listCompletionSearchWordPage(request);            return JsonResult.buildSuccess(resultDTO);        } catch (ProductBizException e) {            log.error("biz error: request={}", JSON.toJSONString(request), e);            return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg());        } catch (Exception e) {            log.error("system error: request={}", JSON.toJSONString(request), e);            return JsonResult.buildError(e.getMessage());        }    }    ...}
@Servicepublic class CompletionSearchWordServiceImpl implements CompletionSearchWordService { @Autowired private CompletionSearchWordRepository completionSearchWordRepository; //查询搜索补全词 @Override public PageResult<CompletionSearchWordDTO> listCompletionSearchWordPage(QueryCompletionSearchWordPageRequest request) throws IOException { return completionSearchWordRepository.listCompletionSearchWordPage(request); } ...}
//后台查询搜索词列表请求@Datapublic class QueryCompletionSearchWordPageRequest extends PageRequest { //索引名称 private String indexName; //字段名称 private String fieldName; //补全词 private String completionSearchWord;}
@Repositorypublic class CompletionSearchWordRepository { @Autowired private RestHighLevelClient restHighLevelClient; ... //查询搜索补全词 public PageResult<CompletionSearchWordDTO> listCompletionSearchWordPage(QueryCompletionSearchWordPageRequest request) throws IOException { SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); if (StringUtils.isNotBlank(request.getCompletionSearchWord())) { searchSourceBuilder.query(QueryBuilders.matchQuery(request.getFieldName(), request.getCompletionSearchWord())); } int from = (request.getPageNum() - 1) * request.getPageSize(); searchSourceBuilder.from(from); searchSourceBuilder.size(request.getPageSize()); SearchRequest searchRequest = new SearchRequest(request.getIndexName()); searchRequest.source(searchSourceBuilder); SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); PageResult<CompletionSearchWordDTO> pageResult = new PageResult<>(); List<CompletionSearchWordDTO> pageContent = new ArrayList<>(); SearchHit[] hits = searchResponse.getHits().getHits(); for (SearchHit hit : hits) { pageContent.add(new CompletionSearchWordDTO(((String) hit.getSourceAsMap().get(request.getFieldName())))); } pageResult.setContent(pageContent); pageResult.setTotalElements(searchResponse.getHits().getTotalHits().value); pageResult.setSize(request.getPageSize()); pageResult.setNumber(request.getPageNum()); return pageResult; } ...}
复制代码


(3)商品 B 端搜索词补全的接口


使用场景:商家在搜索框输入搜索词的时候接口说明:根据输入的搜索词从ES的搜索补全词索引中查询对应的词
复制代码


//商品搜索@DubboService(version = "1.0.0", interfaceClass = ProductSearchApi.class, retries = 0)public class ProductSearchApiImpl implements ProductSearchApi {    @Resource    private RedisCache redisCache;        @Resource    private ProductSearchRepository productSearchRepository;    ...        //搜索词补全接口    @Override    public JsonResult<CompletionSearchWordsDTO> listCompletionSearchWords(CompletionSearchWordQuery request) {        try {            CompletionSearchWordsDTO result = productSearchRepository.listCompletionSearchWords(request);            return JsonResult.buildSuccess(result);        } catch (Exception e) {            e.printStackTrace();            return JsonResult.buildError(e.getMessage());        }    }    ...}
//补全用户搜索词请求@Datapublic class CompletionSearchWordQuery { //索引名称 private String indexName; //字段名称 private String fieldName; //需要补全的词(用户输入的内容) private String text; //返回多少个补全后的词 private int count;}
//商品搜索@Repositorypublic class ProductSearchRepository { private static final String MY_SUGGEST = "my_suggest"; @Resource private RestHighLevelClient restHighLevelClient; //搜索词补全 public CompletionSearchWordsDTO listCompletionSearchWords(CompletionSearchWordQuery request) throws IOException { //1.构建CompletionSuggestion条件 CompletionSuggestionBuilder completionSuggestionBuilder = SuggestBuilders.completionSuggestion(request.getFieldName()); completionSuggestionBuilder.prefix(request.getText()); completionSuggestionBuilder.skipDuplicates(true); completionSuggestionBuilder.size(request.getCount()); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.sort(new ScoreSortBuilder().order(SortOrder.DESC)); searchSourceBuilder.suggest(new SuggestBuilder().addSuggestion(MY_SUGGEST, completionSuggestionBuilder)); //2.封装搜索请求 SearchRequest searchRequest = new SearchRequest(); searchRequest.indices(request.getIndexName()); searchRequest.source(searchSourceBuilder); //3.查询elasticsearch SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); //4.获取响应中的补全的词的列表 CompletionSuggestion completionSuggestion = searchResponse.getSuggest().getSuggestion(MY_SUGGEST); List<CompletionSuggestion.Entry.Option> options = completionSuggestion.getEntries().get(0).getOptions(); List<String> result = new ArrayList<>(); for (CompletionSuggestion.Entry.Option option : options) { result.add(option.getText().string()); } return new CompletionSearchWordsDTO(result); } ...}
复制代码


5.商品 B 端搜索系统的搜索接口实现


(1)商品 B 端的搜索查询接口


使用场景:商家搜索商品的时候接口说明:根据输入的搜索词从商品索引中查询skuId列表
复制代码


//商品搜索@DubboService(version = "1.0.0", interfaceClass = ProductSearchApi.class, retries = 0)public class ProductSearchApiImpl implements ProductSearchApi {    ...    //商品搜索查询接口    @Override    public JsonResult<PorductSearchDTO> searchProducts(ProductSearchQuery request) {        try {            PorductSearchDTO result = productSearchRepository.searchProducts(request);            return JsonResult.buildSuccess(result);        } catch (Exception e) {            e.printStackTrace();            return JsonResult.buildError(e.getMessage());        }    }    ...}
//商品搜索请求@Datapublic class ProductSearchQuery extends PageQuery { //索引名字 private String indexName; //查询参数 private Map<String, String> queryTexts; //高亮字段 private String highLightField;}
//商品搜索@Repositorypublic class ProductSearchRepository { private static final String MY_SUGGEST = "my_suggest"; @Resource private RestHighLevelClient restHighLevelClient; ... //商品搜索查询接口 public PorductSearchDTO searchProducts(ProductSearchQuery request) throws IOException { SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.trackTotalHits(true); //1.构建match条件 request.getQueryTexts().forEach((field, text) -> { searchSourceBuilder.query(QueryBuilders.matchQuery(field, text)); }); //2.设置搜索高亮配置 HighlightBuilder highlightBuilder = new HighlightBuilder(); highlightBuilder.field(request.getHighLightField()); highlightBuilder.preTags("<span stype=color:red>"); highlightBuilder.postTags("</span>"); highlightBuilder.numOfFragments(0); searchSourceBuilder.highlighter(highlightBuilder); //3.设置搜索分页参数 int from = (request.getPageNum() - 1) * request.getPageSize(); searchSourceBuilder.from(from); searchSourceBuilder.size(request.getPageSize()); //4.封装搜索请求 SearchRequest searchRequest = new SearchRequest(request.getIndexName()); searchRequest.source(searchSourceBuilder); //5.查询elasticsearch SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); //6.对结果进行高亮处理 SearchHits hits = searchResponse.getHits(); for (SearchHit hit : hits) { HighlightField highlightField = hit.getHighlightFields().get(request.getHighLightField()); Map<String, Object> sourceAsMap = hit.getSourceAsMap(); Text[] fragments = highlightField.fragments(); StringBuilder builder = new StringBuilder(); for (Text fragment : fragments) { builder.append(fragment.string()); } sourceAsMap.put(request.getHighLightField(), builder.toString()); } //7.封装返回结果 return buildPorductSearchDTO(hits, request.getPageNum(), request.getPageSize()); } ...}
复制代码


(2)商品 B 端的结构化查询接口


使用场景:商家对搜索结果过滤和排序的时候接口说明:根据用户输入的过滤和排序条件从商品索引中查询skuId列表
复制代码


//商品搜索@DubboService(version = "1.0.0", interfaceClass = ProductSearchApi.class, retries = 0)public class ProductSearchApiImpl implements ProductSearchApi {    ...    //商品结构化查询接口    @Override    public JsonResult<PorductSearchDTO> structuredSearchProducts(ProductStructuredQuery request) {        try {            PorductSearchDTO result = productSearchRepository.structuredSearchProducts(request);            return JsonResult.buildSuccess(result);        } catch (Exception e) {            e.printStackTrace();            return JsonResult.buildError(e.getMessage());        }    }    ...}
//商品结构化查询请求@Datapublic class ProductStructuredQuery extends PageQuery { //索引名字 private String indexName; //Query DSL private Map<String, Object> queryDsl;}
//商品搜索@Repositorypublic class ProductSearchRepository { private static final String MY_SUGGEST = "my_suggest"; @Resource private RestHighLevelClient restHighLevelClient; ... //商品结构化查询 public PorductSearchDTO structuredSearchProducts(ProductStructuredQuery request) throws IOException { SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.trackTotalHits(true); //1.解析queryDSL String queryDsl = JSON.toJSONString(request.getQueryDsl()); SearchModule searchModule = new SearchModule(Settings.EMPTY, false, Collections.emptyList()); NamedXContentRegistry namedXContentRegistry = new NamedXContentRegistry(searchModule.getNamedXContents()); XContent xContent = XContentFactory.xContent(XContentType.JSON); XContentParser xContentParser = xContent.createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, queryDsl); searchSourceBuilder.parseXContent(xContentParser); //2.设置搜索分页参数 int from = (request.getPageNum() - 1) * request.getPageSize(); searchSourceBuilder.from(from); searchSourceBuilder.size(request.getPageSize()); //3.封装搜索请求 SearchRequest searchRequest = new SearchRequest(request.getIndexName()); searchRequest.source(searchSourceBuilder); //4.查询elasticsearch SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); //5.封装返回结果 return buildPorductSearchDTO(searchResponse.getHits(), request.getPageNum(), request.getPageSize()); } ...}
复制代码


6.索引重建


(1)问题分析


在实际中可能会遇到正在使⽤的索引需要变化字段类型、增减字段等,这时可能就需要创建新的 mappings。

 

因为索引正在被应⽤使⽤,在进⾏操作时就要考虑怎么降低对应⽤的影响,以及如何把⽬前的数据迁移到新的索引中。

 

(2)解决方案


可以使⽤ES 的索引别名功能来降低对应⽤的影响,实现不停机重建索引。可以使⽤ES 的 Scroll API + Bulk API,实现把⽬前的数据迁移到新的索引中。

 

(3)操作演示


一.假设目前正在被使用的商品索引为sku_info_index二.首先给sku_info_index索引起别名sku_info_index_alias三.然后需要新建一个索引sku_info_index_v2四.接着使用Scroll API + Bulk API迁移数据五.最后把sku_info_index_alias索引别名指向sku_info_index_v2索引
复制代码


一.目前正在被使用的商品索引


sku_info_index 现在正在被业务使⽤:


PUT /sku_info_index{    "settings": {        "number_of_shards": 3,        "number_of_replicas": 1    },    "mappings":{        "properties": {            "skuName": {                "type": "text",                "analyzer": "ik_max_word",                "search_analyzer": "ik_smart"            },            "basePrice": {                "type": "integer"            },            "vipPrice": {                "type": "integer"            },            "brandId": {                "type": "keyword"            },            "brandName": {                "type": "keyword"            },            "saleCount": {                "type": "integer"            },            "createTime": {                "type": "date",                "format": "yyyy-MM-dd HH:mm:ss"            },            "updateTime": {                "type": "date",                "format": "yyyy-MM-dd HH:mm:ss"            }        }    }}
复制代码


二.给 sku_info_index 索引起别名


让应⽤使⽤sku_info_index_alias 别名来操作数据:


PUT /sku_info_index/_alias/sku_info_index_alias
复制代码


三.然后需要新建一个索引 sku_info_index_v2


新建一个 sku_info_index_v2 索引:


PUT /sku_info_index{    "settings": {        "number_of_shards": 3,        "number_of_replicas": 1    },    "mappings": {        "properties": {            "skuName": {                "type": "text",                "analyzer": "ik_max_word",                "search_analyzer": "ik_smart"            },            "basePrice": {                "type": "integer"            },            "vipPrice": {                "type": "integer"            },            "brandId": {                "type": "keyword"            },            "brandName": {                "type": "keyword"            },            "saleCount": {                "type": "integer"            },            "label": {                "type": "integer"            },            "createTime": {                "type": "date",                "format": "yyyy-MM-dd HH:mm:ss"            },            "updateTime": {                "type": "date",                "format": "yyyy-MM-dd HH:mm:ss"            }        }    }}
复制代码


四.接着使用 Scroll API + Bulk API 迁移数据


#https://www.elastic.co/guide/en/elasticsearch/reference/7.6/search-request-body.html#request-body-search-scrollPOST /sku_info_index/_search?scroll=1m{    "size": 3,    "query": {        "match_all": { }    }}
POST /_bulk{ "index": { "_index": "sku_info_index_v2", "_id": "8000177337" }}{ "skuName": "Apple iPhone 13 Pro Max 256GB 苍岭绿⾊ ⽀持移动联通电信5G 双卡双待 ⼿机", "brandName": "苹果", "createTime": "2022-03-12 08:24:57", "brandId": 4, "vipPrice": 9799, "updateTime": "2022-03-12 08:24:57", "basePrice": 9999, "label": "新品"}{ "index": { "_index": "sku_info_index_v2", "_id": "8000177338" }}{ "skuName": "Apple iPhone 13 (A2634)128GB 绿⾊ ⽀持移动联通电信5G 双卡双待⼿ 机", "brandName": "苹果", "createTime": "2022-03-12 08:24:57", "brandId": 4, "vipPrice": 5798, "updateTime": "2022-03-12 08:24:57", "basePrice": 5999, "label": "爆品"}{ "index": { "_index": "sku_info_index_v2", "_id": "8000177339" }}{ "skuName": "苹果13mini Apple iphone 13 mini 5G新品⼿机 粉⾊ 128GB", "brandName": "苹果", "createTime": "2022-03-12 08:24:57", "brandId": 4, "vipPrice": 4900, "updateTime": "2022-03-12 08:24:57", "basePrice": 5100, "label": "超值特惠"}
复制代码


五.最后把 sku_info_index_alias 索引别名指向 sku_info_index_v2 索引


POST /_aliases{    "actions": [{        "remove": {            "index": "sku_info_index",            "alias": "sku_info_index_alias"        }    }, {        "add": {            "index": "sku_info_index_v2",            "alias": "sku_info_index_alias"        }    }]}
复制代码


(4)其他说明


一.如果在上 ES 前,就预计索引结构可能会发⽣变化。可以⼀开始就通过索引别名来操作数据,这样当索引结构需要变更时可按上⾯的⽅案及演示实现不停机重建索引。

 

二.当使⽤索引别名时,ES Java API 的代码无需任何变化,⽐如下⾯是 SearchRequest 的构造法⽅法:


public SearchRequest(String... indices) {    this(indices, new SearchSourceBuilder());}
复制代码


如果直接索引名是 example_index,那么创建的 SearchRequest 对象如下,其中 example_index 为索引名:


new SearchRequest("example_index")
复制代码


如果直接索引 example_index 对应的索引别名是 example_index_alias,那么创建的 SearchRequest 对象就是:


new SearchRequest("example_index_alias")
复制代码


文章转载自:东阳马生架构

原文链接:https://www.cnblogs.com/mjunz/p/18943503

体验地址:http://www.jnpfsoft.com/?from=001YH

用户头像

还未添加个人签名 2025-04-01 加入

还未添加个人简介

评论

发布
暂无评论
商品中心—商品B端搜索系统的说明文档_架构_电子尖叫食人鱼_InfoQ写作社区