写点什么

商品中心—商品 B 端搜索系统的实现文档

  • 2025-06-24
    福建
  • 本文字数:19595 字

    阅读完需:约 64 分钟

1.商品数据管理以及 binlog 监听


对单库单表的 MySQL 的数据进行搜索、全文检索、模糊匹配效率和性能都很差,对多库多表的 MySQL 的数据进行搜索就更麻烦了,所以一般会通过 Canal 监听 MySQL 的 binlog + 去 ElasticSearch 建立索引来实现搜索。

 

中小型项目的 ES 需求,基本由"自动补全、拼音纠错、全文检索、结构化搜索"即可满足。


 

2.基于 ES 的商品 B 端搜索系统架构设计


当 MySQL 进行了插入操作时,就会产生对应的 insert 类型的 binlog 日志。这些 insert 类型的 binlog 日志会被 Canal 系统监听到,然后 Canal 系统会拿着这些 binlog 日志以 HTTP 方式向商品 B 端搜索系统的指定接口发起调用。接着商品 B 端搜索系统便能根据这条 binlog 日志对应的数据提取出要进行搜索的字段。

 

商品数据中要进行搜索的字段可能有:title、description、category 等,商品 B 端搜索系统便会把这些用来进行搜索的字段以及商品 ID 写入到 ElasticSearch,ElasticSearch 便会根据这些内容建立倒排索引。

 

后续 MySQL 对这些商品数据进行修改时,会产生对应的 update 类型的 binlog 日志,这些 update 类型的 binlog 日志也会被 Canal 系统监听到。然后 Canal 又会通过商品 B 端搜索系统,根据这些商品数据修改后的内容更新 ElasticSerch 索引。


 

3.商品 B 端搜索系统实现步骤介绍

 

(1)分词器对 ES 来说是非常重要的


在 ES 里建立索引时,都是根据文本内容类型的字段来建立索引的。而这些字段如果是中文,就要使用中文分词器来进行分词。所以在落地搜索系统时,中文分词器的实现是至关重要的,这个是必须要做的。


(2)实现商品 B 端搜索系统的关键步骤


步骤一:需要搭建和部署一套 ES 生产集群。

 

步骤二:需要给 ES 集群各个节点去安装部署我们自己开发好的一套中文分词器。

 

步骤三:需要分析商品数据模型,针对商品数据模型去设计 ES 里的索引数据结构。所以需要创建好商品索引、设计好索引结构、索引里指定的字段必须用我们的分词器。除了商品核心索引外,还有用来辅助用户实现搜索提示和自动纠错的 suggest 索引。

 

步骤四:需要实现数据写入 ES 建立索引的接口、实现基于 ES 索引进行搜索的接口。在搜索框里进行输入时,实现自动补全、自动纠错、自动推荐。搜索接口分为全文检索和结构化搜索:全文检索就是根据关键词在索引里进行搜索,结构化搜索就是根据一些固定的条件去搜索商品(比如商品品牌、销售属性、颜色等固定字段)。

 

步骤 5:需要测试 ES 写入性能和 ES 优化、搜索接口的性能测试和优化。大量数据瞬时高并发写入的性能测试和优化,海量数据搜索时的性能测试和优化。

 

4.步骤一:ES 生产集群部署

 

经典的 ES 生产集群配置:ES 的 3 节点都用 8 核 16G + 一台 2 核 4G 的用于可视化监控的 Kibana。



ES 生产集群部署不会使用低配置的机器,而是使用配置高一点的机器。因为需要对这些机器进行亿级数据量下的高并发写入测试和搜索测试。这里则使用了 3 台阿里云上标准的 8 核 16G 的机器,来部署 ES 生产集群。测试时选用的硬盘容量几十 G 也可以,毕竟 1 亿的数据量使用 40G 磁盘也可满足了。

 

一般部署 MySQL、RocketMQ、Redis、ES 等中间件和基础系统,会使用 8 核 16G+的机器。一般部署普通业务系统,则使用 4 核 8G 甚至 2 核 4G 的机器。

 

(1)OS 内核参数优化


nproc表示单个用户可以打开的进程数量nofile表示单个进程可以打开的文件句柄的数量max_map_count表示单个进程可以拥有的虚拟内存数量
复制代码


$ vi /etc/security/limits.conf (退出当前⽤户重新登录后⽣效)* soft nofile 65535* hard nofile 65535* soft nproc 4096* hard nproc 4096
$ vi /etc/sysctl.conf (然后执⾏sysctl -p这条命令让配置)vm.max_map_count=262144
复制代码


(2)ES 生产集群部署之三节点配置


一.配置 elasticsearch.yml 文件


需要注意配置打开对 ES 节点的监控,以便后续 ES 节点的监控数据可以传输给 ELK 的 Kibana。通过 Kibana 把监控数据做一个展示,这样就可以看到 ES 集群各个节点完整的监控数据。


$ cd /app/elasticsearch/elasticsearch-7.9.3/config$ echo'' > elasticsearch.yml (默认的配置⽂件⾥⾯的内容全都被注释了,就把它清空了,重新写配置⽂件)$ vi elasticsearch.yml# 集群名称cluster.name: escluster
# 节点名称node.name: esnode1
# 节点⻆⾊node.master: truenode.data: true
# 最⼤的节点数node.max_local_storage_nodes: 3
# 绑定的ip地址network.host: 0.0.0.0
# 对外的端⼝http.port: 9300
# 节点之间通信的端⼝transport.tcp.port: 9800
# 节点发现和集群选举discovery.seed_hosts: ["172.19.16.132:9800","172.19.16.133:9800","172.19.16.134:9800"]cluster.initial_master_nodes: ["esnode1", "esnode2","esnode3"]
# 数据⽬录和⽇志⽬录 path.data: /app/elasticsearch/datapath.logs: /app/elasticsearch/log
# 配置了之后到时候kibana上可以看堆栈监控数据xpack.monitoring.enabled: truexpack.monitoring.collection.enabled: true
复制代码


二.配置 jvm.options 文件


还有至关重要的配置是 JVM 配置,ES 节点会以 JVM 进程的形式运行。一般会把机器的一半内存分配给 JVM,另一半内存留给 OS Cache。这样可以让 ES 中索引文件的数据尽可能多地驻留在 OS Cache 内存里,搜索时尽量从 OS Cache 里搜索,从而提升搜索的性能。现在机器是 16G 的,也就是 8G 内存给 JVM,8G 内存给 OS Cache。


$ cd /app/elasticsearch/elasticsearch-7.9.3/config $ vi jvm.options-Xms8g-Xmx8g
复制代码


(3)ES 生产集群部署之 Kibana 监控


部署 Kibana 的机器配置只需要低配的 2 核 4G 即可,因为只用来做展示而已,Kibana 也会以 JVM 进程的方式来运行。修改 kibana.yml 配置文件:


$ echo '' > kibana.yml $ vi kibana.yml# kibana的端⼝server.port:9000
# 绑定的地址 server.host:"0.0.0.0"
# es的地址 elasticsearch.hosts: ["http://106.14.80.207:9300", "http://139.196.198.62:9300", "http://139.196.231.156:9300"]
# 显示的语⾔i18n.locale: "zh-CN"
复制代码


5.步骤二:IK 分词器改造和部署


(1)IK 分词器词库热刷新机制介绍


原生开源的 IK 分词器不太好用,一般都要对 IK 分词器进行改造,基于 IK 分词器的源码进行二次开发。

 

要进行中文分词,首先就需要有中文词库。比如对"我特别喜欢在床上看书"这句话进行分词,中文词库中有"特别"、"喜欢"、"看书"三个词。那么就会把上面这句话分成:我、特别、喜欢、在、床、上、看书。也就是句子里的词语如果在词库中找不到,那就会拆成单独的字。如果在中文词库加上"床上"这个词语,那么就可以把"床上"分出来。

 

因此,使用 IK 分词器时,一般要从数据库加载定义好的中文词库来进行初始化。之后会开启一个后台线程定时从数据库里加载最新的词库。这样当我们想要更新词库时,就可以在 web 界面里手工录入最新的词语到数据库。当然也可以从专门的词汇系统通过爬虫从外部的网络环境来对中文词库进行自动录入。

 

然后,在 ES 的 JVM 进程里会运行这个 IK 中文分词器的代码。此时 IK 中文分词器的代码就可以不断去热加载数据库里最新的词汇,这样线上运行的 ES 搜索集群就能随时热加载热刷新最新的中文词库了。

 

所以,IK 分词器其实是一个 Java 代码包,它会嵌入到 ES 的 JVM 进程里去跑的。我们需要修改 IK 分词器的源码,让它在跑的时候可以从外部的 MySQL 里进行词库热刷新。当然,IK 中文分词器在启动时也会内嵌一个自己的基础中文词库。

 

此外,还有停用词词库(stop words),也就是在分词的时候可以忽略掉的没意义的词汇。



(2)IK 分词器源码改造流程步骤


步骤一:在IK分词器源码的pom文件中添加MySQL依赖步骤二:添加⼀个DictLoader类,用来加载MySQL中的词库内容步骤三:将Dictionary的私有方法getDictRoot()改成public以便能在DictLoader中调用步骤四:Dictionary类中添加⼀个addStopWords⽅法步骤五:在Dictionary的initial()⽅法中开启⼀个加载词库的线程步骤六:重新打包IK分词器源码
复制代码


(3)IK 分词器定时加载词库实现


读取配置文件内容的最佳实践:先获取配置文件的全路径名,然后根据全路径名构建输入流,接着加载输入流到 Java 的配置数据对象 Properties 中,最后从该对象就可以获取配置值了。

 

当 IK 分词器打包后,就可以部署安装到线上各个 ES 的节点里。然后这个 IK 分词器的代码便能随着 ES 的 JVM 进程的启动来运行,于是就能够定时地去 MySQL 里热刷新词库和停用词。


//加载MySQL中的词库内容,单例public class DictLoader {    private static final Logger LOGGER = ESPluginLoggerFactory.getLogger(DictLoader.class.getName());    private static final DictLoader INSTANCE = new DictLoader();    private final String url;    private final String username;    private final String password;    private final AtomicBoolean extensionWordFistLoad = new AtomicBoolean(false);    private final AtomicReference<String> extensionWordLastLoadTimeRef = new AtomicReference<>(null);    private final AtomicBoolean stopWordFistLoad = new AtomicBoolean(false);    private final AtomicReference<String> stopWordLastLoadTimeRef = new AtomicReference<>(null);
//单例类,构造函数是私有的 private DictLoader() { //创建一个Properties配置数据对象,用来获取MySQL JDBC连接的配置 Properties mysqlConfig = new Properties();
//PathUtils会从指定目录下,对指定的文件名进行拼接,然后返回全路径名 //所以这里会把"IK分词器配置目录 + jdbc.properties"拼接成"jdbc.properties的成全路径名" Path configPath = PathUtils.get(Dictionary.getSingleton().getDictRoot(), "jdbc.properties");
try { //根据全路径名构建输入流,然后加载到mysqlConfig对象中,这样就可以从mysqlConfig对象读取配置值了 mysqlConfig.load(new FileInputStream(configPath.toFile())); this.url = mysqlConfig.getProperty("jdbc.url"); this.username = mysqlConfig.getProperty("jdbc.username"); this.password = mysqlConfig.getProperty("jdbc.password"); } catch (IOException e) { throw new IllegalStateException("加载jdbc.properties配置文件发生异常"); }
try { //加载MySQL驱动的类 Class.forName("com.mysql.cj.jdbc.Driver"); } catch (ClassNotFoundException e) { throw new IllegalStateException("加载数据库驱动时发生异常"); } }
public static DictLoader getInstance() { return INSTANCE; }
public void loadMysqlExtensionWords() { //每次从MySQL里加载词库时会执行一条SQL语句 //这时就必须要有一个和MySQL之间建立的网络连接,才能发送SQL语句出去
//由于这里会每分钟执行一次SQL语句 //所以每次执行SQL语句的时候就创建一个数据库的网络连接Connection,执行完SQL后再把该Connection释放即可
Connection connection = null; Statement statement = null; ResultSet resultSet = null;
String sql;
//第一次执行时会通过CAS操作把extensionWordFistLoad变量由false改成true,并且查全量词汇 //之后的执行,extensionWordFistLoad变量已经变为true,所以CAS操作会不成功,于是只查增量词汇 if (extensionWordFistLoad.compareAndSet(false, true)) { //首次加载会从数据库查全量的词汇 sql = "SELECT word FROM extension_word"; } else { //后面按照最近的修改时间来加载增量的词 sql = "SELECT word FROM extension_word WHERE update_time >= '" + extensionWordLastLoadTimeRef.get() + "'"; }
//每次生成了加载词库的SQL后,都会去设置一个本次加载的时间 SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String nowString = dateFormat.format(new Date()); //设置最近一次加载词库的时间,extensionWordLastLoadTimeRef也是Atomic变量,线程安全的 extensionWordLastLoadTimeRef.set(nowString);
//加载扩展词词库内容 try { //使用传统的JDBC编程获取连接 connection = DriverManager.getConnection(url, username, password); //创建statement statement = connection.createStatement(); //执行SQL语句获取结果集 resultSet = statement.executeQuery(sql); LOGGER.info("从MySQL加载extensionWord, sql={}", sql);
Set<String> extensionWords = new HashSet<>(); while (resultSet.next()) { String word = resultSet.getString("word"); if (word != null) { extensionWords.add(word); //为了方便看日志,可以把加载到的扩展词全都打印出来了 LOGGER.info("从MySQL加载extensionWord, word={}", word); } }
//放到字典里 Dictionary.getSingleton().addWords(extensionWords); } catch (Exception e) { LOGGER.error("从MySQL加载extensionWord发生异常", e); } finally { //把结果集resultSet、statement、连接connection都进行释放 if (resultSet != null) { try { resultSet.close(); } catch (SQLException e) { LOGGER.error(e); } }
if (statement != null) { try { statement.close(); } catch (SQLException e) { LOGGER.error(e); } }
if (connection != null) { try { connection.close(); } catch (SQLException e) { LOGGER.error(e); } } } }
public void loadMysqlStopWords() { //和loadMysqlExtensionWords逻辑基本一样 }}
复制代码


(4)在 ES 生产集群中安装 IK 分词器


步骤一:首先下载elasticsearch-analysis-ik的发⾏版,比如7.9.3版本步骤二:然后在ES节点机器上的elasticserach的plugins⽬录下创建⼀个ik⽬录步骤三:接着把elasticsearch-analysis-ik-7.9.3.zip包上传到ik⽬录,然后解压步骤四:然后⽤打好的elasticsearch-analysis-ik-7.9.3.jar包替换掉解压得到的同名jar包步骤五:接着把mysql-connector-java-8.0.20.jar包也上传到ik⽬录步骤六:最后在ik/config⽬录⾥⾯添加⼀个jdbc.properties⽂件
复制代码


这样就完成了在一个 ES 节点上安装 IK 分词器了。

 

之后,重启 ES 集群的三个节点:


$ ps -ef | grep elasticsearch$ kill 2989$ echo '' > /app/elasticsearch/log/escluster.log$ /app/elasticsearch/elasticsearch-7.9.3/bin/elasticsearch -d$ tail -f /app/elasticsearch/log/escluster.log
复制代码


(5)在 ES 生产集群中安装拼音分词器


拼音分词器是专门和 IK 分词器配合使用来更好实现中文搜索的。如果用户在搜索框输入一些拼音,那么拼音分词器就可以帮助 ES 返回拼音对应的结果。


步骤一:首先下载elasticsearch-analysis-pinyin的发⾏版,比如版本为7.9.3步骤二:在ES节点机器上的elasticserach的plugins⽬录下创建⼀个pinyin⽬录步骤三:把elasticsearch-analysis-pinyin-7.9.3.zip包上传到ik⽬录,然后解压步骤四:最后重启ES集群的各个节点,这样拼音分词器便安装好了
复制代码

 

6.步骤三:为商品数据设计和创建索引


索引会包括:商品核心索引 + suggest 索引

 

(1)对商品的核心数据模型进行分析


商品数据中用于搜索的核心字段如下:


一.skuName商品名称商品名称是⼀个字符串,我们要对商品名称进⾏全⽂检索。
二.skuId商品id商品id⼀般是⼀个long类型的数字。
三.category商品分类商品分类是⼀个字符串;我们不会对商品分类做全⽂检索,⽽是对商品分类做精准匹配。
四.basePrice商品价格 | vipPrice会员价格 | saleCount销量 | commentCount评论数这⼏个字段都是数字。
五.skuImgUrl商品图⽚商品图⽚⼀个图⽚的url地址,我们不会对这个字段做任何搜索操作,也不需要索引这个字段。
六.createTime创建时间 | updateTime修改时间
复制代码


(2)生产项目中的搜索分词器方案


一.IK 分词器的算法类型


创建索引时,要设置具体的分片数量,分片数量可以参考 ES 集群有几个节点。比如每个节点可以有一个 shard 数据分片,每个 shard 数据分片可以有 1 个副本。下面对 IK 分词器的两个分词算法类型 ik_max_word 和 ik_smart 进行简单介绍。

 

ik_max_word:会对一个词尽可能多地进行拆分,让该词汇可以匹配的搜索词尽可能多。

 

ik_smart:会对一个词尽可能精准地进行拆分,也就是让该词汇可以匹配的搜索词尽可能少但精准。

 

可以理解前者细粒度、后者粗粒度。

 

二.IK 分词器的算法使用方案


在生产中使用 IK 中文分词器时,会分不同的场景来使用这两种分词算法类型。

 

场景一:在写入数据场景,会使用 ik_max_word 去建立索引。从而产生非常精细化的小词汇,这样就可以对后续的搜索词匹配,提供更多的选择。

 

场景二:在对字段进行搜索的场景,传入的搜索词就会使用 ik_smart 来进行分词。对搜索词进行精准拆分,那么按这些拆分后的词去匹配,搜出来的结果才能更准确。

 

这种方案具体的实现就是如下所示:


"skuName":{    "type":"text",    "analyzer":"ik_max_word",    "search_analyzer":"ik_smart"}
复制代码


(3)商品和 suggest 索引的设计与创建


一.elasticsearch 数据类型说明


数据类型⽂档:https://www.elastic.co/guide/en/elasticsearch/reference/7.9/mapping-types.html
text数据类型⽂档:https://www.elastic.co/guide/en/elasticsearch/reference/7.9/text.html
keyword数据类型⽂档:https://www.elastic.co/guide/en/elasticsearch/reference/7.9/keyword.html
数字类型⽂档:https://www.elastic.co/guide/en/elasticsearch/reference/7.9/number.html
时间类型⽂档:https://www.elastic.co/guide/en/elasticsearch/reference/7.9/number.html
复制代码


二.数据类型简单说明


type为text时,会进行分词并且通常指定创建索引时用ik_max_word,搜索时搜索词用ik_smart
type为keyword时,为keyword类型,不会进行分词
type为integer时,为数字类型
index为false时,表示指明不需要建立索引
复制代码


三.创建商品索引的 mappings


商品的索引名为:career_plan_sku_index_序号。因为需要做多次不同的测试,有的测试是使⽤不同的索引,⽽且在实现接⼝时并没有把接⼝写死,可以指定操作那个索引,所以索引后⾯就加了⼀个序号。


PUT /demo_plan_sku_index_15{    "settings": {        "number_of_shards": 3,//数据分片        "number_of_replicas": 1//分片副本    },    "mappings": {        "properties": {             "skuId": {                 "type": "keyword"             },            "skuName": {                 "type": "text",                 "analyzer": "ik_max_word",                 "search_analyzer": "ik_smart"             },            "category": {                 "type": "keyword"             },            "basePrice": {                 "type": "integer"             },            "vipPrice": {                 "type": "integer"             },            "saleCount": {                 "type": "integer"             },            "commentCount": {                 "type": "integer"             },            "skuImgUrl": {                 "type": "keyword",                 "index": false             },            "createTime": {                 "type": "date",                 "format": "yyyy-MM-dd HH:mm:ss"             },            "updateTime": {                 "type": "date",                 "format": "yyyy-MM-dd HH:mm:ss"             }         }     } }
复制代码


对 skuName 字段进行分词建立索引,是用来进行全文检索的。对分类、价格、销售数、评论数、时间这些字段建立索引,是用来进行结构化搜索的。结构化搜索指的是,可以去查询指定分类、指定价格范围、指定销售数范围等条件进行搜索。

 

四.创建 suggest 索引


suggest 索引是用来实现用户输入自动补全、拼写纠错、搜索推荐(搜不出结果时推荐其他数据)的。suggest 索引一共有两个字段:word1 字段是用来做自动补全,word2 字段是用来做拼写纠错和搜索推荐的。word1 字段的类型会使用 completion 数据类型,该数据类型具体信息可以参考如下文档:


https://www.elastic.co/guide/en/elasticsearch/reference/7.9/mapping-types.html
复制代码


下面在创建 suggest 索引时自定义了一个 analyzer:ik_and_pinyin_analyzer。该 analyzer 同时使用了 IK 分词器和拼音分词器,这样用户输入汉字和拼音时都能做自动补全。


PUT /career_plan_sku_suggest_15 {     "settings": {         "number_of_shards": 3,         "number_of_replicas": 1,         "analysis": {             "analyzer": {                 "ik_and_pinyin_analyzer": {                     "type": "custom",                     "tokenizer": "ik_smart",                     "filter": "my_pinyin"                 }             },            "filter": {                 "my_pinyin": {                     "type": "pinyin",                     "keep_first_letter": true,                     "keep_full_pinyin": true,                     "keep_original": true,                     "remove_duplicated_term": true                 }             }         }     },    "mappings": {         "properties": {             "word1": {                 "type": "completion",                 "analyzer": "ik_and_pinyin_analyzer"             },            "word2": {                 "type": "text"             }         }     } }
复制代码


7.步骤四:为商品数据生成索引


(1)往 ES 索引中写入模拟数据简介


会有一个 MockDataController 往 ES 索引中写入模拟数据,也就是 MockDataController 会模拟触发商品和 suggest 索引数据的写入。

 

当触发了 MockDataController 的接口后,会先从文件中加载出 10 万条数据。之后会对内存中的这 10 万条数据,基于 ES 的 API 进行批量写入。

 

(2)单线程模式 bulk 批量写入商品数据


从文件中加载出 10 万条数据到内存后,首先会看看需要分成多少个批次来进行批量写入,以及每个批次需要写入多少条数据。然后会按照批次数 batchTimes 进行遍历,接着会对每个批次构建一个 ES 的 BulkRequest 对象,最后会调用 RestHighLevelClient 的 bulk()方法将 BulkRequest 对象写入到 ES 里去。

 

注意:在真正在生产环境下,不可能使用单个线程处理一个一个 batch 写入来实现大批量数据的写入。实现这个单线程批量写入的方法主要是用来和接下来的多线程批量写入的方法进行性能对比。


@Configurationpublic class ElasticSearchConfig {    @Value("${elasticsearch.addr}")    private String addr;
@Bean(destroyMethod = "close") public RestHighLevelClient restHighLevelClient() { String[] segments = addr.split(","); HttpHost[] esNodes = new HttpHost[segments.length]; for (int i = 0; i < segments.length; i++) { String[] hostAndPort = segments[i].split(":"); esNodes[i] = new HttpHost(hostAndPort[0], Integer.parseInt(hostAndPort[1]), "http"); } return new RestHighLevelClient(RestClient.builder(esNodes)); }}
@RestController@RequestMapping("/api/mockData")public class MockDataController { private static final String dataFileName = "100k_products.txt";
@Autowired private RestHighLevelClient restHighLevelClient;
//单线程写入模拟的商品数据 //https://www.elastic.co/guide/en/elasticsearch/client/java-rest/7.9/java-rest-high-document-bulk.html @PostMapping("/mockData1") public JsonResult mockData1(@RequestBody MockData1Dto request) throws IOException { if (!request.validateParams()) { return JsonResult.buildError("参数有误"); }
//索引名字 String indexName = request.getIndexName(); //一次批量写入多少数据 int batchSize = request.getBatchSize(); //进行批量写入的次数 int batchTimes = request.getBatchTimes();
//1.从txt文件里面加载10w条商品数据,大小才13M,可以全部一次读出来 List<Map<String, Object>> skuList = loadSkusFromTxt();
long startTime = System.currentTimeMillis();
//2.每次随机取出batchSize个商品数据,然后批量写入,一共执行batchTimes次 for (int i = 0; i < batchTimes; i++) { //把指定的batchSize条数据打包成一个BulkRequest对象 BulkRequest bulkRequest = buildSkuBulkRequest(indexName, batchSize, skuList); //然后调用ES的restHighLevelClient.bulk()接口,将BulkRequest对象写入到ES里去 restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT); log.info("写入[{}]条商品数据", batchSize); }
long endTime = System.currentTimeMillis();
//3.记录统计信息 int totalCount = batchSize * batchTimes; long elapsedSeconds = (endTime - startTime) / 1000; long perSecond = totalCount / elapsedSeconds; log.info("此次共写入[{}]条商品数据,耗时[{}]秒,平均每秒导入[{}]条数据", totalCount, elapsedSeconds, perSecond);
Map<String, Object> result = new LinkedHashMap<>(); result.put("startTime", DateUtil.format(new Date(startTime), DatePattern.NORM_DATETIME_PATTERN)); result.put("endTime", DateUtil.format(new Date(endTime), DatePattern.NORM_DATETIME_PATTERN)); result.put("totalCount", totalCount); result.put("elapsedSeconds", elapsedSeconds); result.put("perSecond", perSecond); return JsonResult.buildSuccess(result); } ...
//读取txt文件中的sku数据 private List<Map<String, Object>> loadSkusFromTxt() throws IOException { InputStream resourceAsStream = getClass().getClassLoader().getResourceAsStream(dataFileName); InputStreamReader inputStreamReader = new InputStreamReader(resourceAsStream); BufferedReader bufferedReader = new BufferedReader(inputStreamReader);
List<Map<String, Object>> skuList = new ArrayList<>();
//读取文件内容(一共是10万条商品数据,总共才13MB大小),一条数据的内容如下: //10001,房屋卫士自流平美缝剂瓷砖地砖专用双组份真瓷胶防水填缝剂镏金色,品质建材,398.00,上海,540785126782 String line; Random random = new Random(); while ((line = bufferedReader.readLine()) != null) { String[] segments = line.split(","); int id = Integer.parseInt(segments[0]); String skuName = segments[1]; String category = segments[2].replace("会场", "").replace("主会场", "").replace("风格好店", ""); int basePrice = Integer.parseInt(segments[3].substring(0, segments[3].indexOf("."))); if (basePrice <= 100) { basePrice = 200; }
//10个字段 Map<String, Object> sku = new HashMap<>(); sku.put("skuId", id); sku.put("skuName", skuName); sku.put("category", category); sku.put("basePrice", basePrice); sku.put("vipPrice", basePrice - 100);
sku.put("saleCount", random.nextInt(100_000)); sku.put("commentCount", random.nextInt(100_000)); sku.put("skuImgUrl", "http://sku_img_url.png"); sku.put("createTime", "2021-01-04 10:00:00"); sku.put("updateTime", "2021-01-04 10:00:00"); skuList.add(sku); } return skuList; } ...
//从10万个sku里面随机选择batchSize个,然后封装成一个批量写入的BulkRequest对象 private BulkRequest buildSkuBulkRequest(String indexName, int batchSize, List<Map<String, Object>> skuList) { //根据索引名称indexName创建BulkRequest对象 BulkRequest bulkRequest = new BulkRequest(indexName); Random random = new Random(); for (int j = 0; j < batchSize; j++) { //获取0到10万中的随机数 int index = random.nextInt(100_000); //根据随机出来的index获取一条数据 Map<String, Object> map = skuList.get(index);
//下面List的元素大概如下所示: //list[0] = skuId,list[1] = xx,list[2] = skuName, list[3] = xx List<Object> list = new ArrayList<>(); map.forEach((k, v) -> { list.add(k); list.add(v); });
IndexRequest indexRequest = new IndexRequest().source(XContentType.JSON, list.toArray()); bulkRequest.add(indexRequest); } return bulkRequest; } ...}
复制代码


(3)多线程并发大批量写入商品数据实现


从文件中加载出 10 万条数据到内存后,首先会看看需要分成多少个批次来进行批量写入,以及每个批次需要写入多少条数据。然后会按照批次数 batchTimes 进行遍历,遍历到的每个批次对应的批量写入都会提交一个任务到线程池去执行。接着线程池的一个线程便会对一个任务中的一个批次构建一个 ES 的 BulkRequest 对象,最后会调用 RestHighLevelClient 的 bulk()方法将 BulkRequest 对象写入到 ES 里。

 

其中在遍历批次数提交任务到线程池前,会根据批次数创建一个 CountdownLatch。每当线程池的一个线程执行完批量写入任务时,该 CountdownLatch 就会减 1 直到 0,从而使得所有批次数的批量写入任务都完成时程序才结束。

 

此外,还使用了一个信号量 Semaphore 来控制同时执行的最多任务数。即每当要提交一个批次的批量写入任务到线程池前,都要先获取一个信号量,否则就阻塞等待。

 

注意:当一个线程刚刚释放完 Semaphore 后,还要执行下一行代码。也就是线程还没释放,此时新一个任务可能就可以获取到 Semaphore 并提交任务到线程池了。所以线程池实际需要执行的任务数可能会比 Semaphore 允许数 threadCount 多一点,因此才将 maxCorePoolSize 设置为 2 倍的 threadCount。


@RestController@RequestMapping("/api/mockData")public class MockDataController {    private static final String dataFileName = "100k_products.txt";
@Autowired private RestHighLevelClient restHighLevelClient; ...
//多线程写入模拟的商品数据 @PostMapping("/mockData2") public JsonResult mockData2(@RequestBody MockData2Dto request) throws IOException, InterruptedException { if (!request.validateParams()) { return JsonResult.buildError("参数有误"); }
String indexName = request.getIndexName(); //需要进行多少次batch批量写入 int batchTimes = request.getBatchTimes(); //每次batch批量写入多少条数据 int batchSize = request.getBatchSize(); //可以同时执行batch批量写入的线程数量 int threadCount = request.getThreadCount(); //读取10万条数据到内存 List<Map<String, Object>> skuList = loadSkusFromTxt();
//CountDownLatch:一个线程完成任务后才进行countDown,最后countDown到0时才能结束 CountDownLatch countDownLatch = new CountDownLatch(batchTimes);
//Semaphore:一个线程可以尝试从semaphore获取一个信号,如果获取不到就阻塞等待,获取到了,信号就是这个线程的了 //当一个线程执行完其任务之后,会把信号还回去,所以最多只能有threadCount个线程可以获取到信号量 Semaphore semaphore = new Semaphore(threadCount);
//虽然semaphore可以控制线程池中同时进行的任务数,但是maximumPoolSize也不能设置的和semaphore一样的大小 //因为线程池用了SynchronousQueue队列,可能会出现实际需要执行的任务数比semaphore允许数多一两个的情况 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( threadCount, threadCount * 2, 60, TimeUnit.SECONDS, new SynchronousQueue<>() );
long startTime = System.currentTimeMillis();
//batch数量可以是比线程数量多 for (int i = 0; i < batchTimes; i++) { //通过semaphore保证一直最多有threadCount个线程同时在执行批量写入的操作 //先获取一个信号量,获取到了就提交任务到线程池执行批量写入的操作,获取不到就阻塞等待有空余的信号量 semaphore.acquireUninterruptibly();
threadPoolExecutor.submit(() -> { try { BulkRequest bulkRequest = buildSkuBulkRequest(indexName, batchSize, skuList); restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT); log.info("线程[{}]写入[{}]条商品数据", Thread.currentThread().getName(), batchSize); } catch (IOException e) { e.printStackTrace(); } finally { //从下面两行代码就可以看到,当一个线程刚刚释放完semaphore后,还要执行下一行代码,还没释放线程 //这时新一个任务可能就获取到semaphore并提交到线程池了 //所以线程池实际需要执行的任务数可能会比semaphore的允许数threadCount多一点 semaphore.release(); countDownLatch.countDown(); } }); }
long endTime = System.currentTimeMillis();
//在这里等待一下最后一个批次的批量写入操作执行完 countDownLatch.await();
//现在的使用方式,在这里需要手动的把线程池给关掉 threadPoolExecutor.shutdown();
int totalCount = batchSize * batchTimes; long elapsedSeconds = (endTime - startTime) / 1000; long perSecond = totalCount / elapsedSeconds; log.info("此次共导入[{}]条商品数据,耗时[{}]秒,平均每秒导入[{}]条数据", totalCount, elapsedSeconds, perSecond);
Map<String, Object> result = new LinkedHashMap<>(); result.put("startTime", DateUtil.format(new Date(startTime), DatePattern.NORM_DATETIME_PATTERN)); result.put("endTime", DateUtil.format(new Date(endTime), DatePattern.NORM_DATETIME_PATTERN)); result.put("totalCount", totalCount); result.put("elapsedSeconds", elapsedSeconds); result.put("perSecond", perSecond); return JsonResult.buildSuccess(result); } ...
private BulkRequest buildSkuBulkRequest(String indexName, int batchSize, List<Map<String, Object>> skuList) { //根据索引名称indexName创建BulkRequest对象 BulkRequest bulkRequest = new BulkRequest(indexName); Random random = new Random(); for (int j = 0; j < batchSize; j++) { //获取0到10万中的随机数 int index = random.nextInt(100_000); //根据随机出来的index获取一条数据 Map<String, Object> map = skuList.get(index);
//下面List的元素大概如下所示: //list[0] = skuId,list[1] = xx,list[2] = skuName, list[3] = xx List<Object> list = new ArrayList<>(); map.forEach((k, v) -> { list.add(k); list.add(v); });
IndexRequest indexRequest = new IndexRequest().source(XContentType.JSON, list.toArray()); bulkRequest.add(indexRequest); } return bulkRequest; } ...}
复制代码


(4)多批次有限线程池导入 ES 代码分析


代码执行流程如下:



(5)全量与增量商品数据导入 ES 生产方案


一.全量商品数据导入 ES 的生产方案


说明一:假设商品数据一共有 1000 万,那么如何进行查询,每次查多少?每条商品数据需要导入 ES 的只有部分字段,10 万条这样的数据才 10MB。这样 10 万的数据放到内存里加上对象额外的开销,总共才需要几十 MB。所以可对 1000 万商品数据进行分批次查询导入处理,每次查询 10 万条数据,共 100 次。

 

说明二:在每次处理查询出来的 10 万条商品数据时,要拆分多少个 Bulk 来进行批量写入?这要看测试效果,看看往 ES 里进行一次 Bulk 写入时,写入多少 MB 的数据会比较好。ES 官方建议每次的 Bulk 写入不超过 15MB,这里假设每次 Bulk 写入不超过 10MB。

 

假设 10 万商品数据到了内存后,由于商品数据对象的结构导致内存消耗膨胀到了需要 100MB,那么这 10 万条商品数据可以拆分为 10 个 Bulk 进行批量写入,每个 Bulk 的批量写入任务都可以往线程池进行提交。

 

说明三:接下来需要考虑线程池究竟开启多少线程。如果将每次查询出来的 10 万条数据进行拆分成 5 个 Bulk 写入,那么可以让线程池开启 5 个线程。让这 5 个线程并发去进行 Bulk 写入即可,也就是 threadCount 可以设为 5。

 

二.增量商品数据导入 ES 的生产方案


Canal 会监听 MySQL 数据库的增删改 binlog,然后把这些 binlog 发送给商品索引系统。接着商品索引系统从 binlog 中提取出相关的字段,构建出单个 BulkRequest,然后把这单个 BulkRequest 直接写入到 ES 即可。



(6)suggest 索引同步写入实现


suggest 索引的同步写入和商品索引的同步写入差不多,其实每个线程在进行 bulk 批量写入时构建两个 BulkRequest 即可:一个是商品索引的 BulkRequest,一个是 suggest 索引的 BulkRequest。



关键代码如下:


private BulkRequest buildProductIndexBulkRequest(List<Map<String, Object>> bulkList) {    BulkRequest bulkRequest = new BulkRequest("product_index");    for (int i = 0; i < bulkList.size(); i++) {        Map<String, Object> productDataMap = bulkList.get(i);        List<Object> productDataList = new ArrayList<>();        productDataMap.forEach((k, v) -> {            productDataList.add(k);            productDataList.add(v);        });
IndexRequest indexRequest = new IndexRequest().source(XContentType.JSON, productDataList.toArray()); bulkRequest.add(indexRequest); } return bulkRequest;}
private BulkRequest buildSuggestIndexBulkRequest(List<String> skuNameBulkList) throws Exception { BulkRequest bulkRequest = new BulkRequest("suggest_index"); for (int i = 0; i < skuNameBulkList.size(); i++) { String skuName = skuNameBulkList.get(i); IndexRequest indexRequest = new IndexRequest().source(XContentType.JSON, "word1", skuName, "word2", skuName); bulkRequest.add(indexRequest); } return bulkRequest;}
复制代码


(7)全量数据双索引写入的生产代码实现


实现思路:进行分 batch 查询 + 每个 batch 的查询结果进行分 bulk 批量写 + 使用 CountDownLatch 等待所有 bulk 任务都执行完毕 + 使用 Semaphore 控制同时执行任务的线程数。

 

举个例子:一般会从数据库中查询出 10w 条数据作为一个 batch,所以 1000 万的商品数据就需要 100 个 batch 批量写入。然后每次查出来的一批 10 万条数据,会拆分为多个 bulk 进行并发批量写入。比如每个 bulk 只有 150 条数据,于是有 667 次 bulk 批量写入,CountDownLatch 大小为 667。于是就会产生 667 个任务,每个任务由线程池执行完之后都会 counDown()一下。


@RestController@RequestMapping("/api/mockData")public class MockDataController {    @Autowired    private RestHighLevelClient restHighLevelClient;    ...
//多线程向ES写入生产全量商品的双索引数据 @PostMapping("/indexAllProductData") public JsonResult indexAllProductData(@RequestBody MockData2Dto request) throws IOException, InterruptedException { if (!request.validateParams()) { return JsonResult.buildError("参数有误"); } //1000万除以10万=100,需要进行多少次batch批量写入 int batchCount = 100; //每个批次10万条数据 int batchSize = 100_000; //每次bulk批量写入ES多少条数据 int bulkSize = request.getBatchSize(); //每个批次需要进行多少次bulk批量写入 int bulkCount = batchSize / bulkSize + 1; //可以同时执行bulk批量写入的线程数量 int threadCount = request.getThreadCount();
for (int batchIndex = 1; batchIndex <= batchCount; batchIndex++) { //一般会每次从数据库中查询出10w条数据作为一个batch //查出来的每一批数据,都会拆分为多个bulk进行并发批量写入 List<Map<String, Object>> batchList = queryProductBatchFromDatabase(batchIndex, batchSize);
CountDownLatch countDownLatch = new CountDownLatch(bulkCount); Semaphore semaphore = new Semaphore(threadCount);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( threadCount, threadCount * 2, 60, TimeUnit.SECONDS, new SynchronousQueue<>() );
int bulkDataCurrentIndex = 0; for (int bulkIndex = 1; bulkIndex <= bulkCount; bulkIndex++) { List<Map<String, Object>> bulkList = new ArrayList<Map<String, Object>>(); List<String> skuNameBulkList = new ArrayList<String>(); for (int bulkDataIndex = bulkDataCurrentIndex; bulkDataIndex < bulkDataCurrentIndex + bulkSize; bulkDataIndex++) { if (batchList.get(bulkDataIndex) == null) { //外层循环不断break,否则CountDownLatch会少countDown()而一直等待 break; } bulkList.add(batchList.get(bulkDataIndex)); skuNameBulkList.add(String.valueOf(batchList.get(bulkDataIndex).get("skuName"))); }
bulkDataCurrentIndex += bulkSize; semaphore.acquireUninterruptibly();
threadPoolExecutor.submit(() -> { try { if (bulkList.size() > 0) { BulkRequest productIndexBulkRequest = buildProductIndexBulkRequest(bulkList); restHighLevelClient.bulk(productIndexBulkRequest, RequestOptions.DEFAULT); } if (skuNameBulkList.size() > 0) { BulkRequest suggestIndexBulkRequest = buildSuggestIndexBulkRequest(skuNameBulkList); restHighLevelClient.bulk(suggestIndexBulkRequest, RequestOptions.DEFAULT); } log.info("线程[{}]写入[{}]条商品数据", Thread.currentThread().getName(), bulkList.size()); } catch (Exception e) { e.printStackTrace(); } finally { semaphore.release(); countDownLatch.countDown(); } }); }
countDownLatch.await(); threadPoolExecutor.shutdown(); }
long endTime = System.currentTimeMillis(); int totalCount = batchSize * batchCount; long elapsedSeconds = (endTime - startTime) / 1000; long perSecond = totalCount / elapsedSeconds; log.info("此次共导入[{}]条商品数据,耗时[{}]秒,平均每秒导入[{}]条数据", totalCount, elapsedSeconds, perSecond);
Map<String, Object> result = new LinkedHashMap<>(); result.put("startTime", DateUtil.format(new Date(startTime), DatePattern.NORM_DATETIME_PATTERN)); result.put("endTime", DateUtil.format(new Date(endTime), DatePattern.NORM_DATETIME_PATTERN)); result.put("totalCount", totalCount); result.put("elapsedSeconds", elapsedSeconds); result.put("perSecond", perSecond); return JsonResult.buildSuccess(result); }
private List<Map<String, Object>> queryProductBatchFromDatabase(int batchIndex, int batchSize) { //根据第几个batch,每个batch多少条数据,对数据库发起sql查询,把一批一批的数据查出来 return new ArrayList<Map<String, Object>>(); } ...}
复制代码


(8)商品数据写入双索引后的效果分析


把数据写入 ES,无非就是让 ES 为数据建立倒排索引和正排索引。倒排索引是用来进行全文检索的,正排索引是用来进行结构化搜索和数据分析的。

 

比如针对商品 skuName 字段的检索数据,会使用 ik_max_word 类型尽量精细化地分词。然后基于分词建立尽可能多的倒排索引,后续就可以根据 skuName 来进行全文检索。当输入针对 skuName 的搜索词到输入框时,会使用 ik_smark 类型进行粗粒度分词,然后到倒排索引进行精准匹配。


文章转载自:东阳马生架构

原文链接:https://www.cnblogs.com/mjunz/p/18945187

体验地址:http://www.jnpfsoft.com/?from=001YH

用户头像

还未添加个人签名 2023-06-19 加入

还未添加个人简介

评论

发布
暂无评论
商品中心—商品B端搜索系统的实现文档_架构_不在线第一只蜗牛_InfoQ写作社区