写点什么

MySQL 同步 ES 的 6 种方案!

  • 2025-05-08
    福建
  • 本文字数:1964 字

    阅读完需:约 6 分钟

引言


在分布式架构中,MySQL 与 Elasticsearch(ES)的协同已成为解决高并发查询与复杂检索的标配组合。


然而,如何实现两者间的高效数据同步,是架构设计中绕不开的难题。


这篇文章跟大家一起聊聊 MySQL 同步 ES 的 6 种主流方案,结合代码示例与场景案例,帮助开发者避开常见陷阱,做出最优技术选型。


方案一:同步双写


场景:适用于对数据实时性要求极高,且业务逻辑简单的场景,如金融交易记录同步。


在业务代码中同时写入 MySQL 与 ES。


代码如下:

@Transactional  public void createOrder(Order order) {      // 写入MySQL      orderMapper.insert(order);      // 同步写入ES      IndexRequest request = new IndexRequest("orders")          .id(order.getId())          .source(JSON.toJSONString(order), XContentType.JSON);      client.index(request, RequestOptions.DEFAULT);  }
复制代码


痛点

  1. 硬编码侵入:所有涉及写操作的地方均需添加 ES 写入逻辑。

  2. 性能瓶颈:双写操作导致事务时间延长,TPS 下降 30%以上。

  3. 数据一致性风险:若 ES 写入失败,需引入补偿机制(如本地事务表+定时重试)。


方案二:异步双写


场景:电商订单状态更新后需同步至 ES 供客服系统检索。


我们可以使用 MQ 进行解耦。


架构图如下



代码示例如下

// 生产者端  public void updateProduct(Product product) {      productMapper.update(product);      kafkaTemplate.send("product-update", product.getId());  }  
// 消费者端 @KafkaListener(topics = "product-update") public void syncToEs(String productId) { Product product = productMapper.selectById(productId); esClient.index(product); }
复制代码


优势

  • 吞吐量提升:通过 MQ 削峰填谷,可承载万级 QPS。

  • 故障隔离:ES 宕机不影响主业务链路。


缺陷

  • 消息堆积:突发流量可能导致消费延迟(需监控 Lag 值)。

  • 顺序性问题:需通过分区键保证同一数据的顺序消费。


方案三:Logstash 定时拉取


场景:用户行为日志的 T+1 分析场景。


该方案低侵入但高延迟。


配置示例如下

input {    jdbc {      jdbc_driver => "com.mysql.jdbc.Driver"      jdbc_url => "jdbc:mysql://localhost:3306/log_db"      schedule => "*/5 * * * *"  # 每5分钟执行      statement => "SELECT * FROM user_log WHERE update_time > :sql_last_value"    }  }  output {    elasticsearch {      hosts => ["es-host:9200"]      index => "user_logs"    }  }
复制代码


适用性分析

  • 优点:零代码改造,适合历史数据迁移。

  • 致命伤:分钟级延迟(无法满足实时搜索)全表扫描压力大(需优化增量字段索引)


方案四:Canal 监听 Binlog


场景:社交平台动态实时搜索(如微博热搜更新)。技术栈:Canal + RocketMQ + ES


该方案高实时,并且低侵入。


架构流程如下



关键配置

# canal.properties  canal.instance.master.address=127.0.0.1:3306  canal.mq.topic=canal.es.sync
复制代码


避坑指南

  1. 数据漂移:需处理 DDL 变更(通过 Schema Registry 管理映射)。

  2. 幂等消费:通过_id唯一键避免重复写入。


方案五:DataX 批量同步


场景:将历史订单数据从分库分表 MySQL 迁移至 ES。


该方案是大数据迁移的首选。


配置文件如下

{    "job": {      "content": [{        "reader": {          "name": "mysqlreader",          "parameter": { "splitPk": "id", "querySql": "SELECT * FROM orders" }        },        "writer": {          "name": "elasticsearchwriter",          "parameter": { "endpoint": "http://es-host:9200", "index": "orders" }        }      }]    }  }
复制代码


性能调优

  • 调整channel数提升并发(建议与分片数对齐)

  • 启用limit分批查询避免 OOM


方案六:Flink 流处理


场景:商品价格变更时,需关联用户画像计算实时推荐评分。


该方案适合于复杂的 ETL 场景。


代码片段如下

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  env.addSource(new CanalSource())     .map(record -> parseToPriceEvent(record))     .keyBy(event -> event.getProductId())     .connect(userProfileBroadcastStream)     .process(new PriceRecommendationProcess())     .addSink(new ElasticsearchSink());
复制代码


优势

  • 状态管理:精准处理乱序事件(Watermark 机制)

  • 维表关联:通过 Broadcast State 实现实时画像关联


总结:


对于文章上面给出的这 6 种技术方案,我们在实际工作中,该如何做选型呢?


下面用一张表格做对比:



建议

  1. 若团队无运维中间件能力 → 选择 Logstash 或同步双写

  2. 需秒级延迟且允许改造 → MQ 异步 + 本地事务表

  3. 追求极致实时且资源充足 → Canal + Flink 双保险


文章转载自:苏三说技术

原文链接:https://www.cnblogs.com/12lisu/p/18860943

体验地址:http://www.jnpfsoft.com/?from=001YH

用户头像

还未添加个人签名 2025-04-01 加入

还未添加个人简介

评论

发布
暂无评论
MySQL同步ES的6种方案!_MySQL_量贩潮汐·WholesaleTide_InfoQ写作社区