李锋镝的博客

  • 首页
  • 时间轴
  • 插件
  • 评论区显眼包🔥
  • 左邻右舍
  • 博友圈
  • 关于我
    • 关于我
    • 另一个网站
    • 我的导航站
    • 网站地图
  • 留言
  • 赞助
Destiny
自是人生长恨水长东
  1. 首页
  2. 中间件
  3. 正文

MySQL 同步 ElasticSearch 深度指南——6 种方案的原理、实战与避坑

2025年10月30日 68点热度 0人点赞 0条评论

在分布式架构中,MySQL 与 Elasticsearch(ES)的组合早已成为“事务存储+高效检索”的黄金搭档——MySQL 凭借 ACID 特性保障核心数据一致性,ES 则以倒排索引和分布式架构支撑百万级数据的全文检索、聚合分析。但二者的协同核心,始终绕不开“数据同步”这一关键环节:如何在保证数据一致性的前提下,兼顾实时性、低侵入性与高可用性?

本文将基于业务场景演进,从“简单同步”到“复杂流处理”,详细拆解 6 种 MySQL 同步 ES 方案的底层原理、完整实操步骤、性能优化技巧与实战踩坑经验,结合真实项目案例说明“不同阶段该选哪种方案”,帮你避开从技术选型到落地的所有陷阱。

一、先明确核心诉求:同步方案选型的 4 个关键维度

在选择同步方案前,必须先理清业务的核心诉求——不同诉求对应完全不同的技术选型,这是避免“过度设计”或“方案不足”的关键。

1. 实时性要求:数据延迟容忍度

  • 实时(毫秒-秒级):如社交平台的实时动态搜索、电商商品价格变更后立即更新检索结果;
  • 准实时(分钟级):如客服系统查询订单状态、运营后台的用户行为分析;
  • 离线(小时-T+1 级):如历史订单归档检索、每日用户画像统计。

2. 侵入性:是否允许修改业务代码

  • 高侵入:可修改业务代码(如在订单服务中添加 ES 写入逻辑);
  • 低侵入:仅允许添加中间件(如 MQ、Canal),不修改核心业务逻辑;
  • 无侵入:完全不触碰业务系统(如通过 Logstash 拉取 MySQL 数据)。

3. 数据一致性:是否容忍数据偏差

  • 强一致性:MySQL 与 ES 数据实时对齐(如金融交易记录,不允许偏差);
  • 最终一致性:短时间内允许偏差,但需保证一段时间后对齐(如商品库存,延迟 10 秒可接受);
  • 近似一致性:允许少量数据偏差(如用户行为日志,丢失几条可容忍)。

4. 数据规模与复杂度

  • 小规模简单数据:单表、无复杂转换(如订单表同步);
  • 大规模分库分表:多库多表、需合并数据(如跨区域订单同步);
  • 复杂 ETL 需求:需关联多表、计算衍生字段(如商品价格关联用户画像生成推荐评分)。

二、方案一:同步双写(实时,高侵入)

核心逻辑:在业务代码中,同一事务内同时写入 MySQL 与 ES,确保两者数据“实时对齐”。
适用场景:实时性要求极高(秒级)、数据一致性要求强、业务逻辑简单的场景(如金融交易记录、核心订单状态同步)。

1. 底层原理与风险点

同步双写的本质是“将 ES 写入纳入业务事务链路”,但存在三大核心风险:

  • 事务一致性风险:MySQL 提交成功后,ES 写入失败会导致数据不一致;
  • 性能瓶颈:ES 写入属于网络 IO 操作,会延长事务耗时,降低系统 TPS;
  • 代码侵入性:所有写操作(新增/修改/删除)都需添加 ES 逻辑,维护成本高。

2. 完整实操:从基础实现到一致性保障

(1)基础代码实现(Spring Boot)

@Service
public class OrderSyncService {
    @Autowired
    private OrderMapper orderMapper;
    @Autowired
    private RestHighLevelClient esClient;
    @Autowired
    private OrderFailRetryMapper failRetryMapper; // 本地事务表 DAO

    /**
     * 同步双写订单:MySQL + ES
     * @param order 订单实体
     */
    @Transactional(rollbackFor = Exception.class)
    public void createOrder(Order order) {
        // 1. 写入 MySQL(事务内)
        orderMapper.insert(order);

        try {
            // 2. 同步写入 ES(设置超时时间,避免阻塞事务)
            IndexRequest request = new IndexRequest("orders_index") // ES 索引名
                    .id(order.getId().toString()) // ES 文档 ID(与 MySQL 主键一致,确保幂等)
                    .source(JSON.toJSONString(order), XContentType.JSON)
                    .setTimeout(TimeValue.timeValueSeconds(3)); // 超时 3 秒,避免长期阻塞

            // 执行 ES 写入
            esClient.index(request, RequestOptions.DEFAULT);
        } catch (Exception e) {
            // 3. ES 写入失败:记录到本地事务表,后续重试(保障最终一致性)
            OrderFailRetry retry = new OrderFailRetry();
            retry.setOrderId(order.getId());
            retry.setData(JSON.toJSONString(order));
            retry.setRetryCount(0);
            retry.setStatus(0); // 0-待重试,1-已成功,2-重试失败
            retry.setCreateTime(new Date());
            failRetryMapper.insert(retry);

            // 注意:此处不抛异常,避免 MySQL 事务回滚(优先保证核心库数据)
            log.error("ES 写入失败,已记录重试任务,orderId:{}", order.getId(), e);
        }
    }
}

(2)关键优化:本地事务表 + 定时重试

ES 写入失败后,通过“本地事务表”记录失败数据,定时任务重试,确保最终一致性:

/**
 * 定时重试 ES 失败任务(每 1 分钟执行一次)
 */
@Scheduled(fixedRate = 60 * 1000)
public void retryEsFailTask() {
    // 1. 查询待重试任务(限制每次查询数量,避免一次性处理过多)
    List<OrderFailRetry> failList = failRetryMapper.selectByStatusAndRetryCount(0, 5); // 最多重试 5 次
    if (failList.isEmpty()) {
        return;
    }

    // 2. 批量重试(使用线程池提高效率)
    ExecutorService executor = Executors.newFixedThreadPool(5);
    for (OrderFailRetry retry : failList) {
        executor.submit(() -> {
            try {
                Order order = JSON.parseObject(retry.getData(), Order.class);
                // 重新执行 ES 写入
                IndexRequest request = new IndexRequest("orders_index")
                        .id(order.getId().toString())
                        .source(retry.getData(), XContentType.JSON);
                esClient.index(request, RequestOptions.DEFAULT);

                // 3. 重试成功:更新任务状态
                retry.setStatus(1);
                retry.setUpdateTime(new Date());
                failRetryMapper.updateById(retry);
            } catch (Exception e) {
                // 4. 重试失败:累计重试次数,超过阈值标记为失败
                retry.setRetryCount(retry.getRetryCount() + 1);
                if (retry.getRetryCount() >= 5) {
                    retry.setStatus(2); // 重试 5 次失败,人工介入
                    log.error("订单 {} 重试 ES 写入 5 次失败,需人工处理", retry.getOrderId(), e);
                }
                retry.setUpdateTime(new Date());
                failRetryMapper.updateById(retry);
            }
        });
    }
    executor.shutdown();
}

(3)事务表设计(MySQL)

CREATE TABLE `order_fail_retry` (
  `id` BIGINT PRIMARY KEY AUTO_INCREMENT COMMENT '主键',
  `order_id` BIGINT NOT NULL COMMENT '订单 ID',
  `data` TEXT NOT NULL COMMENT '订单 JSON 数据',
  `retry_count` INT NOT NULL DEFAULT 0 COMMENT '重试次数',
  `status` TINYINT NOT NULL COMMENT '状态:0-待重试,1-已成功,2-重试失败',
  `create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  UNIQUE KEY `uk_order_id` (`order_id`) COMMENT '避免重复记录同一订单',
  KEY `idx_status_retry_count` (`status`, `retry_count`) COMMENT '查询待重试任务'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT '订单 ES 同步失败重试表';

3. 性能优化:缓解双写带来的 TPS 下降

同步双写会导致事务耗时延长(如从 50ms 增至 200ms),TPS 可能下降 30%-50%,可通过以下方式优化:

  • 异步化 ES 写入(非事务内):将 ES 写入从事务中剥离,用线程池异步执行(需接受“MySQL 提交后 ES 写入延迟”);

    // 优化:用线程池异步写入 ES,不阻塞事务
    @Async("esWritePool") // 自定义线程池
    public CompletableFuture asyncWriteEs(Order order) {
      try {
          // ES 写入逻辑...
          return CompletableFuture.runAsync(() -> {});
      } catch (Exception e) {
          // 记录重试表...
          return CompletableFuture.failedFuture(e);
      }
    }
  • ES 批量写入:对高频写入场景(如秒杀订单),积累一定数量后批量提交(BulkRequest),减少网络 IO 次数;
  • 线程池参数调优:根据 ES 集群性能调整写入线程池大小(如核心线程数=CPU 核心数*2,最大线程数=20),避免线程耗尽。

4. 实战踩坑总结

坑点 解决方案
MySQL 提交成功,ES 写入失败 本地事务表+定时重试,重试策略用指数退避(如 1s→2s→4s→8s)
事务耗时过长,TPS 暴跌 剥离 ES 写入到异步线程池,接受短暂不一致
代码侵入严重,维护成本高 封装 ES 写入工具类,统一处理异常与重试

三、方案二:异步双写(准实时,中侵入)

核心逻辑:业务代码仅写入 MySQL,通过 MQ(如 Kafka、RocketMQ)异步通知 ES 同步,解耦主业务与 ES 写入。
适用场景:实时性要求较高(秒级)、允许轻微延迟、业务代码可修改的场景(如电商商品更新、用户信息同步)。

1. 底层原理:MQ 解耦与最终一致性

异步双写通过 MQ 实现“生产者-消费者”模式,核心优势是故障隔离(ES 宕机不影响 MySQL 主业务),但需解决三大问题:

  • 消息顺序性:同一数据的更新/删除操作需按顺序执行,避免乱序;
  • 消息一致性:确保 MySQL 写入后,MQ 消息不丢失、不重复;
  • 消费幂等性:避免重复消费导致 ES 数据重复或覆盖错误。

2. 完整实操:从 MQ 选型到消费保障

(1)MQ 选型:Kafka vs RocketMQ

特性 Kafka RocketMQ 推荐场景
事务消息 不支持(需自定义) 原生支持事务消息,解决“半消息”问题 对一致性要求高的场景(如订单同步)
顺序性 支持分区内顺序 支持全局顺序(单分区) 需严格顺序的场景(如商品价格变更)
吞吐量 高(百万级 QPS) 中(十万级 QPS) 高并发场景选 Kafka,复杂业务选 RocketMQ
延迟消息 不支持(需自定义延时队列) 原生支持延迟消息(1s-2h) 需延迟重试的场景

实战建议:若需保障“MySQL 写入成功则 MQ 消息必发”,优先选 RocketMQ 的事务消息;若追求高吞吐量,选 Kafka。

(2)基于 RocketMQ 事务消息的实现

// 1. 生产者:业务服务(MySQL 写入 + MQ 事务消息)
@Service
public class ProductSyncService {
    @Autowired
    private ProductMapper productMapper;
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 更新商品并发送事务消息
     */
    public void updateProduct(Product product) {
        // 发送事务消息:半消息(MQ 暂存,未提交)
        String transactionId = UUID.randomUUID().toString();
        rocketMQTemplate.sendMessageInTransaction(
            "product-update-group", // 生产者组
            "product-update-topic", // 主题
            MessageBuilder.withPayload(product.getId().toString())
                          .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
                          .build(),
            product // 附加参数,供事务监听使用
        );
    }

    /**
     * 事务监听:确认 MySQL 写入成功后,提交 MQ 消息
     */
    @RocketMQTransactionListener(txProducerGroup = "product-update-group")
    public class ProductTransactionListener implements RocketMQLocalTransactionListener {
        @Override
        public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            Product product = (Product) arg;
            try {
                // 执行 MySQL 更新(本地事务)
                productMapper.updateById(product);
                return RocketMQLocalTransactionState.COMMIT; // 提交消息,消费者可消费
            } catch (Exception e) {
                return RocketMQLocalTransactionState.ROLLBACK; // 回滚消息,消费者不可见
            }
        }

        // 事务回查:解决生产者崩溃导致的状态未知
        @Override
        public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
            String productId = new String((byte[]) msg.getPayload());
            // 查询 MySQL 中商品是否已更新
            Product product = productMapper.selectById(productId);
            if (product != null && product.getUpdateTime() != null) {
                return RocketMQLocalTransactionState.COMMIT;
            }
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
}

// 2. 消费者:ES 同步服务
@Service
public class EsSyncConsumer {
    @Autowired
    private ProductMapper productMapper;
    @Autowired
    private RestHighLevelClient esClient;
    @Autowired
    private RedisTemplate<String, String> redisTemplate; // 用于幂等控制

    /**
     * 消费商品更新消息,同步到 ES
     */
    @RocketMQMessageListener(
        topic = "product-update-topic",
        consumerGroup = "es-sync-group",
        messageModel = MessageModel.CLUSTERING, // 集群消费,避免重复消费
        consumeThreadMax = 20 // 消费线程数
    )
    public void consumeProductUpdate(String productId) {
        // 1. 幂等控制:避免重复消费(Redis 记录已消费的 productId,过期时间 24h)
        String key = "es:consumed:product:" + productId;
        Boolean isConsumed = redisTemplate.opsForValue().setIfAbsent(key, "1", 24, TimeUnit.HOURS);
        if (Boolean.FALSE.equals(isConsumed)) {
            log.info("商品 {} 已同步 ES,跳过重复消费", productId);
            return;
        }

        // 2. 查询 MySQL 最新数据(避免消费时数据未提交)
        Product product = null;
        for (int i = 0; i < 3; i++) { // 重试 3 次,等待 MySQL 提交
            product = productMapper.selectById(productId);
            if (product != null) break;
            try { Thread.sleep(100); } catch (InterruptedException e) {}
        }
        if (product == null) {
            log.error("商品 {} 未查询到,同步 ES 失败", productId);
            redisTemplate.delete(key); // 删除幂等标记,允许后续重试
            return;
        }

        // 3. 同步到 ES
        try {
            IndexRequest request = new IndexRequest("products_index")
                    .id(productId)
                    .source(JSON.toJSONString(product), XContentType.JSON);
            esClient.index(request, RequestOptions.DEFAULT);
            log.info("商品 {} 同步 ES 成功", productId);
        } catch (Exception e) {
            log.error("商品 {} 同步 ES 失败", productId, e);
            redisTemplate.delete(key); // 删除幂等标记,允许重试
            // 可选:记录到重试表,定时重发
        }
    }
}

(3)顺序性保障:分区键与单分区策略

若业务要求“商品价格更新必须按顺序同步”(如先降价再涨价,不能反过来),需确保同一商品的消息在同一 MQ 分区:

  • Kafka:生产者发送时指定 partitionKey = productId,确保同一商品的消息进入同一分区;
  • RocketMQ:通过 messageQueueSelector 自定义队列选择器,按 productId 哈希分配队列。
// Kafka 生产者指定分区键
ProducerRecord<String, String> record = new ProducerRecord<>(
    "product-update-topic",
    productId, // partitionKey:按商品 ID 哈希到分区
    productId // value:消息内容
);
kafkaTemplate.send(record);

(4)消息堆积监控:Lag 值告警

MQ 消费延迟(Lag 值 = 生产消息数 - 消费消息数)是核心监控指标,需设置阈值告警:

  • 阈值设置:根据业务容忍度(如 Lag > 1000 触发告警);
  • 告警渠道:集成 Prometheus + Grafana 可视化,或通过钉钉/企业微信推送告警;
  • 处理方案:堆积时临时扩容消费节点,或降低消费端处理逻辑复杂度。

3. 实战踩坑总结

坑点 解决方案
消息乱序(先删后更) 同一数据的消息分配到同一 MQ 分区,消费端按顺序处理
消费时 MySQL 数据未提交 消费端重试查询(100ms/次,最多 3 次),等待事务提交
消息堆积导致 ES 同步延迟 监控 Lag 值,超阈值时扩容消费节点,优化消费逻辑
重复消费导致 ES 数据重复 Redis 幂等标记(setIfAbsent),或 ES 文档 ID 唯一

四、方案三:Logstash 定时拉取(离线,无侵入)

核心逻辑:通过 Logstash 的 JDBC 输入插件,定时从 MySQL 拉取增量数据,写入 ES。
适用场景:实时性要求低(分钟-T+1 级)、无业务代码修改权限、适合离线分析的场景(如用户行为日志统计、历史订单检索)。

1. 底层原理:定时增量同步

Logstash 基于“定时任务+增量查询”实现同步,核心是通过 sql_last_value 记录上一次同步的位置(如 update_time),避免全表扫描。但需注意:

  • 同步延迟:延迟取决于定时任务间隔(如 5 分钟一次,延迟最多 5 分钟);
  • 增量依赖:需 MySQL 表有“更新时间+主键”字段,确保增量数据可定位。

2. 完整实操:从配置到优化

(1)核心配置文件(logstash.conf)

# 1. 输入插件:JDBC 拉取 MySQL 增量数据
input {
  jdbc {
    # MySQL JDBC 驱动(需提前放入 Logstash 的 lib 目录)
    jdbc_driver_library => "/usr/share/logstash/lib/mysql-connector-java-8.0.32.jar"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    # MySQL 连接地址(useSSL=false 关闭 SSL,serverTimezone 统一时区)
    jdbc_connection_string => "jdbc:mysql://mysql-host:3306/user_db?useSSL=false&serverTimezone=Asia/Shanghai"
    jdbc_user => "root"
    jdbc_password => "123456"
    # 定时任务间隔(Cron 表达式:每 5 分钟执行一次)
    schedule => "*/5 * * * *"
    # 增量查询 SQL(sql_last_value 是 Logstash 内置变量,记录上一次的 update_time)
    statement => "
      SELECT id, user_id, action_type, create_time, update_time 
      FROM user_behavior 
      WHERE update_time > :sql_last_value 
      ORDER BY update_time ASC, id ASC
    "
    # 追踪增量的字段(此处用 update_time,确保每次拉取最新数据)
    tracking_column => "update_time"
    # 追踪字段类型(timestamp/datetime 需指定为 timestamp)
    tracking_column_type => "timestamp"
    # 记录 sql_last_value 的文件路径(避免 Logstash 重启后丢失)
    last_run_metadata_path => "/usr/share/logstash/config/user_behavior_last_run"
    # 每次拉取的批量大小(避免一次拉取过多数据导致 OOM)
    jdbc_fetch_size => 1000
  }
}

# 2. 过滤插件:数据转换(可选,如字段重命名、日期格式化)
filter {
  # 日期格式化:将 create_time 从字符串转为 ES 的 date 类型
  date {
    match => ["create_time", "yyyy-MM-dd HH:mm:ss"]
    target => "@timestamp"
    timezone => "Asia/Shanghai"
  }
  # 字段重命名:将 user_id 改为 user_id_str(避免 ES 自动识别为 long 类型)
  mutate {
    rename => { "user_id" => "user_id_str" }
    convert => { "user_id_str" => "string" } # 转为字符串类型
  }
}

# 3. 输出插件:写入 ES
output {
  elasticsearch {
    # ES 集群地址(多个节点用逗号分隔)
    hosts => ["es-host1:9200", "es-host2:9200"]
    # ES 索引名(支持日期滚动,如 user_behavior-2024.05.20)
    index => "user_behavior-%{+YYYY.MM.dd}"
    # ES 文档 ID(用 MySQL 主键,确保幂等)
    document_id => "%{id}"
    # 批量写入大小(根据 ES 性能调整,默认 500)
    batch_size => 1000
    # 超时时间
    timeout => 30
  }
  # 日志输出到控制台(调试用,生产环境可关闭)
  stdout {
    codec => rubydebug
  }
}

(2)关键优化:避免全表扫描与 OOM

  • 增量查询优化:
    • 必须为 update_time 和 id 建立联合索引(idx_update_time_id),避免 where 条件导致全表扫描;
    • SQL 用 update_time > :sql_last_value AND id > :last_id 双重条件,防止 update_time 重复导致数据漏拉;
  • Logstash JVM 优化:
    • 修改 config/jvm.options,设置堆内存(如 -Xms4g -Xmx4g,不超过物理内存的 50%),避免 OOM;
    • 关闭不必要的插件(如监控插件),减少资源占用;
  • ES 写入优化:
    • 索引按日期滚动(如 user_behavior-2024.05.20),避免单索引过大;
    • 关闭 ES 索引的副本(number_of_replicas: 0),同步完成后再开启,提升写入速度。

3. 实战踩坑总结

坑点 解决方案
定时拉取导致全表扫描 为 update_time+id 建立联合索引,SQL 用增量条件
Logstash 重启后丢失同步位置 配置 last_run_metadata_path,持久化 sql_last_value
ES 字段类型不匹配(如日期) 在 filter 中用 date 插件格式化日期,明确字段类型
拉取数据过多导致 OOM 减小 jdbc_fetch_size 和 batch_size,增加 JVM 堆内存

五、方案四:Canal 监听 Binlog(实时,无侵入)

核心逻辑:Canal 模拟 MySQL Slave 节点,实时解析 Binlog 日志,将数据变更同步到 MQ 或直接写入 ES。
适用场景:实时性要求高(毫秒级)、无业务代码修改权限、高并发生产环境(如社交平台动态同步、电商库存更新)。

1. 底层原理:Binlog 解析与实时同步

Canal 的工作流程如下:

  1. 模拟 Slave:Canal 向 MySQL Master 发送dump 请求,模拟 Slave 节点;
  2. 解析 Binlog:Master 推送 Binlog 到 Canal,Canal 解析 Binlog(需 Binlog 为 ROW 模式);
  3. 数据分发:Canal 将解析后的变更数据(新增/更新/删除)发送到 MQ(如 RocketMQ),或通过 Canal Adapter 直接写入 ES。

2. 完整实操:从 Canal 部署到 ES 同步

(1)MySQL 前置配置(开启 Binlog)

Canal 依赖 MySQL Binlog,需先配置 MySQL:

# MySQL 配置文件(my.cnf)
[mysqld]
# 开启 Binlog
log_bin = mysql-bin
# Binlog 格式必须为 ROW(STATEMENT 模式无法解析具体数据)
binlog_format = ROW
# 服务 ID(唯一,不能与其他 Slave 重复)
server_id = 1
# 仅同步指定数据库(可选,减少 Binlog 体积)
binlog_do_db = order_db

配置后重启 MySQL,执行 show variables like 'log_bin%' 确认 Binlog 已开启。

(2)Canal Server 配置(canal.properties)

#  Canal 服务端口
canal.port = 11111
#  Binlog 解析模式(默认 ROW)
canal.parse.dbsync.parser = org.apache.canal.parse.inbound.mysql.dbsync.DbSyncParser
#  MQ 配置(将解析后的 Binlog 发送到 RocketMQ)
canal.mq.servers = rocketmq-host:9876
canal.mq.topic = canal_binlog_topic
# 分区策略(按表名哈希,确保同一表的数据在同一分区)
canal.mq.partitionHash = order_db.order_info:id,order_db.order_item:order_id

(3)Canal Adapter 配置(同步到 ES)

Canal Adapter 是 Canal 的扩展组件,可直接将 Binlog 数据同步到 ES,无需 MQ:

# adapter.yml 核心配置
dataSourceKey: defaultDS # 数据源标识(对应 application.yml 中的数据库配置)
destination: example # Canal Server 的 destination(默认 example)
groupId: g1
esMapping:
  _index: order_info_index # ES 索引名
  _type: _doc # ES 7.x+ 已废弃 type,固定为 _doc
  _id: id # ES 文档 ID(与 MySQL 主键一致)
  sql: "SELECT o.id, o.order_no, o.user_id, o.total_amount, o.create_time FROM order_info o WHERE o.id = #{id}"
  # 字段映射(MySQL 字段 → ES 字段,类型不匹配时需指定)
  objFields:
    total_amount: double # MySQL 的 decimal 转为 ES 的 double
  # 增量同步条件(基于 Binlog 的变更时间)
  etlCondition: "o.update_time > {updateTime}"

(4)高可用配置:Canal 集群

单节点 Canal 存在单点故障风险,需部署集群:

  1. 多 Canal Server 连接同一 MySQL Master,每个 Server 模拟独立 Slave;
  2. Canal Admin:可视化管理 Canal 集群,监控同步状态、配置变更;
  3. MQ 高可用:RocketMQ/Kafka 集群部署,避免 MQ 单点故障。

3. 实战踩坑总结

坑点 解决方案
Binlog 格式不是 ROW 模式 修改 MySQL 配置为 binlog_format=ROW,重启 MySQL
数据漂移(DDL 变更未同步) 用 Schema Registry 同步 MySQL 表结构与 ES mapping,DDL 后自动更新 ES 映射
Canal Server 单点故障 部署 Canal 集群,通过 Canal Admin 管理节点状态
大事务 Binlog 导致 OOM 拆分大事务,调整 Canal 的 parse 线程池大小,增加 JVM 堆内存

六、方案五:DataX 批量同步(离线,无侵入)

核心逻辑:DataX 是阿里开源的离线数据同步工具,支持 MySQL 到 ES 的批量数据迁移,适合大规模历史数据同步。
适用场景:历史数据迁移(如分库分表数据合并)、离线批量同步(如每日全量同步)、无业务代码修改权限的场景。

1. 底层原理:分布式批量同步

DataX 采用“Reader-Channel-Writer”架构:

  • Reader:从数据源(MySQL)读取数据;
  • Channel:负责数据传输,支持并发(可设置 Channel 数提升性能);
  • Writer:将数据写入目标端(ES)。

2. 完整实操:从配置到性能调优

(1)核心配置文件(mysql2es.json)

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "root",
            "password": "123456",
            "connection": [
              {
                "querySql": [
                  "SELECT id, order_no, user_id, total_amount, create_time FROM order_info WHERE create_time >= '2024-01-01' AND create_time < '2024-02-01'"
                ],
                "jdbcUrl": [
                  "jdbc:mysql://mysql-host:3306/order_db?useSSL=false&serverTimezone=Asia/Shanghai"
                ]
              }
            },
            "splitPk": "id", // 分库分表场景的分片键(按 id 分片,避免数据重复)
            "fetchSize": 1000, // 每次从 MySQL 读取的行数
            "batchSize": 1000 // 每次批量提交给 Channel 的行数
          }
        },
        "writer": {
          "name": "elasticsearchwriter",
          "parameter": {
            "endpoint": "http://es-host:9200",
            "index": "order_info_202401", // ES 索引名(按月份分区)
            "docId": "${id}", // ES 文档 ID(用 MySQL 主键)
            "batchSize": 1000, // 每次批量写入 ES 的行数
            "column": [
              {"name": "id", "type": "long"},
              {"name": "order_no", "type": "string"},
              {"name": "user_id", "type": "long"},
              {"name": "total_amount", "type": "double"},
              {"name": "create_time", "type": "date", "format": "yyyy-MM-dd HH:mm:ss"}
            ],
            "writeMode": "insert" // 写入模式:insert(新增)、update(更新)
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 5, // Channel 数(建议与 ES 分片数一致,如 5 个 Channel 对应 5 个 ES 分片)
        "byte": 10485760 // 每秒传输字节数限制(10MB)
      },
      "errorLimit": {
        "record": 100, // 允许的错误记录数(超过则任务失败)
        "percentage": 0.01 // 允许的错误百分比
      }
    }
  }
}

(2)分库分表场景处理

若 MySQL 是分库分表(如 order_db_1-order_db_10),需配置多数据源:

"connection": [
  {
    "querySql": ["SELECT ... FROM order_info WHERE id BETWEEN 1 AND 1000000"],
    "jdbcUrl": ["jdbc:mysql://mysql-host:3306/order_db_1?..."]
  },
  {
    "querySql": ["SELECT ... FROM order_info WHERE id BETWEEN 1000001 AND 2000000"],
    "jdbcUrl": ["jdbc:mysql://mysql-host:3306/order_db_2?..."]
  }
  // 其他分库...
]

(3)性能调优参数

参数 优化建议 原理
channel 等于 ES 分片数(如 5-10 个) 避免单 Channel 写入压力过大,并行写入提升速度
fetchSize 1000-2000(根据 MySQL 性能调整) 减少 MySQL 连接次数,降低 IO 开销
batchSize 1000-5000(不超过 ES 批量写入上限) 减少 ES 网络 IO 次数,提升批量写入效率
byte 10-20MB(根据网络带宽调整) 避免网络拥堵,平衡传输速度与稳定性

3. 实战踩坑总结

坑点 解决方案
分库分表数据重复/遗漏 用 splitPk 按主键分片,确保每个分片无重叠
数据类型不匹配(如 decimal→double) 在 writer 的 column 中明确字段类型,或用 convert 函数转换
任务失败后重跑效率低 开启断点续传(DataX 1.0+ 支持),记录失败位置
全量迁移导致 MySQL 压力大 夜间低峰期执行,限制 fetchSize,避免全表扫描

七、方案六:Flink 流处理(实时,低侵入)

核心逻辑:基于 Flink 实时流处理引擎,消费 MySQL Binlog(通过 Canal 或 Debezium),进行复杂 ETL 处理后写入 ES。
适用场景:实时性要求高(毫秒级)、需复杂数据处理(如多表关联、实时计算)、大规模数据场景(如实时数仓、推荐系统)。

1. 底层原理:流处理与状态管理

Flink 同步 ES 的核心优势是复杂处理能力与状态一致性:

  • 复杂 ETL:支持多表关联(如商品表关联用户画像表)、窗口计算(如 5 分钟内的订单汇总);
  • 状态管理:通过 RocksDB 存储中间状态,处理乱序数据(Watermark 机制);
  • Exactly-Once 语义:确保数据不重复、不丢失,与 ES 实现最终一致性。

2. 完整实操:从数据源到 ES 写入

(1)依赖引入(pom.xml)

<!-- Flink 核心依赖 -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-java</artifactId>
  <version>1.17.0</version>
</dependency>
<!-- Flink CDC 依赖(读取 MySQL Binlog) -->
<dependency>
  <groupId>com.ververica</groupId>
  <artifactId>flink-connector-mysql-cdc</artifactId>
  <version>2.4.0</version>
</dependency>
<!-- Flink ES 连接器 -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-elasticsearch7_2.12</artifactId>
  <version>1.17.0</version>
</dependency>

(2)核心代码:Binlog 消费→ETL→ES 写入

public class MysqlToEsFlinkJob {
    public static void main(String[] args) throws Exception {
        // 1. 创建 Flink 执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 启用 Checkpoint(每 10 秒一次,确保 Exactly-Once 语义)
        env.enableCheckpointing(10000);
        env.getCheckpointConfig().setCheckpointStorage("hdfs:///flink/checkpoints/");

        // 2. 读取 MySQL Binlog(通过 Flink CDC)
        DebeziumSourceFunction<RowData> mysqlSource = MySQLSource.<RowData>builder()
                .hostname("mysql-host")
                .port(3306)
                .databaseList("product_db") // 监听的数据库
                .tableList("product_db.product_info") // 监听的表
                .username("root")
                .password("123456")
                .deserializer(new JsonDebeziumDeserializationSchema()) // 解析 Binlog 为 JSON
                .build();

        DataStream<RowData> mysqlStream = env.addSource(mysqlSource);

        // 3. 复杂 ETL 处理(示例:关联用户画像表,计算商品推荐评分)
        // 3.1 读取用户画像广播流(维表)
        DataStream<UserProfile> userProfileStream = env.addSource(new UserProfileSource());
        BroadcastStream<UserProfile> broadcastStream = userProfileStream.broadcast(new MapStateDescriptor<>("userProfileState", Long.class, UserProfile.class));

        // 3.2 关联商品流与用户画像流
        DataStream<ProductWithScore> productWithScoreStream = mysqlStream
                .keyBy(row -> row.getLong(2)) // 按 user_id 分组
                .connect(broadcastStream)
                .process(new BroadcastProcessFunction<RowData, UserProfile, ProductWithScore>() {
                    private MapState<Long, UserProfile> userProfileState;

                    @Override
                    public void open(Configuration parameters) {
                        userProfileState = getRuntimeContext().getMapState(
                                new MapStateDescriptor<>("userProfileState", Long.class, UserProfile.class)
                        );
                    }

                    // 处理商品流(主数据流)
                    @Override
                    public void processElement(RowData productRow, ReadOnlyContext ctx, Collector<ProductWithScore> out) throws Exception {
                        long userId = productRow.getLong(2);
                        UserProfile profile = userProfileState.get(userId);
                        if (profile == null) {
                            log.warn("用户 {} 未找到画像,使用默认评分", userId);
                            out.collect(new ProductWithScore(productRow, 5.0)); // 默认评分 5.0
                            return;
                        }
                        // 计算推荐评分(如:用户偏好*商品类别匹配度)
                        double score = profile.getPreference() * getCategoryMatch(productRow, profile);
                        out.collect(new ProductWithScore(productRow, score));
                    }

                    // 处理用户画像流(广播流)
                    @Override
                    public void processBroadcastElement(UserProfile profile, Context ctx, Collector<ProductWithScore> out) throws Exception {
                        userProfileState.put(profile.getUserId(), profile);
                    }
                });

        // 4. 写入 ES
        productWithScoreStream.addSink(
                ElasticsearchSink.<ProductWithScore>builder(
                        Collections.singletonList(new HttpHost("es-host", 9200, "http")),
                        new ElasticsearchSinkFunction<ProductWithScore>() {
                            @Override
                            public void process(ProductWithScore product, RuntimeContext ctx, RequestIndexer indexer) {
                                // 构建 ES 索引请求
                                Map<String, Object> json = new HashMap<>();
                                json.put("id", product.getId());
                                json.put("product_name", product.getName());
                                json.put("recommend_score", product.getScore());
                                json.put("update_time", new Date());

                                IndexRequest request = Requests.indexRequest()
                                        .index("product_recommend_index")
                                        .id(product.getId().toString())
                                        .source(json);
                                indexer.add(request);
                            }
                        }
                ).build()
        );

        // 5. 执行任务
        env.execute("MySQL Binlog to ES Recommend Job");
    }

    // 辅助方法:计算商品类别与用户偏好的匹配度
    private double getCategoryMatch(RowData productRow, UserProfile profile) {
        String productCategory = productRow.getString(3).toString();
        return profile.getPreferredCategories().contains(productCategory) ? 1.2 : 0.8;
    }
}

(3)关键配置:Watermark 与状态管理

  • Watermark 处理乱序数据:

    // 允许 3 秒的乱序窗口,超过则视为迟到数据
    DataStream timedStream = mysqlStream
          .assignTimestampsAndWatermarks(
                  WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3))
                          .withTimestampAssigner((row, timestamp) -> row.getTimestamp(4).getTime()) // 用 update_time 作为时间戳
          );
  • 状态后端配置:
    // 使用 RocksDB 作为状态后端(支持大状态)
    env.setStateBackend(new RocksDBStateBackend("hdfs:///flink/state/"));

3. 实战踩坑总结

坑点 解决方案
维表(用户画像)更新不及时 用 Broadcast State 实时更新维表,设置状态过期时间(如 1 小时)
乱序数据导致计算错误 配置 Watermark 允许 3-5 秒乱序窗口,迟到数据写入侧输出流
Flink 任务重启后状态丢失 启用 Checkpoint 并存储到 HDFS,配置状态后端为 RocksDB
ES 写入压力过大 批量写入(BulkProcessor),根据 ES 性能调整批量大小

八、方案对比与选型决策树

1. 6 种方案核心指标对比

方案 实时性 侵入性 复杂度 一致性 适用场景
同步双写 秒级 高 低 强一致性 金融交易、核心订单同步
异步双写(MQ) 秒级 中 中 最终一致性 电商商品、用户信息同步
Logstash 定时拉取 分钟-T+1 级 无 低 最终一致性 离线日志分析、历史数据检索
Canal 监听 Binlog 毫秒级 无 高 最终一致性 高并发生产环境、实时搜索
DataX 批量同步 小时级 无 中 最终一致性 历史数据迁移、分库分表合并
Flink 流处理 毫秒级 低 极高 Exactly-Once 实时数仓、复杂 ETL、推荐系统

2. 选型决策树(三步确定方案)

  1. 第一步:判断实时性需求

    • 实时(毫秒-秒级)→ 进入第二步;
    • 准实时(分钟级)→ 选“异步双写”或“Logstash”;
    • 离线(小时-T+1)→ 选“DataX”或“Logstash”。
  2. 第二步:判断业务复杂度

    • 简单同步(无关联计算)→ 进入第三步;
    • 复杂 ETL(多表关联、实时计算)→ 选“Flink 流处理”。
  3. 第三步:判断侵入性容忍度

    • 可修改业务代码 → 选“同步双写”(强一致)或“异步双写”(高吞吐);
    • 不可修改代码 → 选“Canal 监听 Binlog”(高实时)。

九、通用最佳实践:数据一致性与监控保障

无论选择哪种方案,都需做好以下保障措施:

  1. 数据一致性校验:

    • 定时对账:通过脚本对比 MySQL 与 ES 的数据量、关键字段(如每日凌晨对比前一天数据);
    • 异常告警:发现数据偏差(如差异率>0.1%)时,触发告警并自动重试同步。
  2. ES 索引设计优化:

    • 提前规划 mapping:避免后续字段类型变更(如 MySQL 的 decimal 对应 ES 的 double 或 scaled_float);
    • 合理分片:根据数据量设置 ES 分片数(如 1000 万数据对应 5-10 个分片),避免单分片过大。
  3. 全链路监控:

    • 核心指标:MySQL Binlog 延迟、MQ Lag 值、ES 写入 QPS/延迟、同步成功率;
    • 监控工具:Prometheus + Grafana(可视化)、ELK(日志分析)、钉钉/企业微信(告警推送)。

十、总结:没有最优,只有最适配

MySQL 同步 ES 的方案选择,本质是业务需求与技术成本的平衡:

  • 小团队、简单场景:优先选“同步双写”或“Logstash”,降低复杂度;
  • 中大型团队、高并发场景:选“Canal+MQ”或“异步双写”,兼顾实时性与低侵入;
  • 大型企业、复杂场景:选“Flink+Canal”,支撑实时数仓与复杂 ETL。

最终,建议在落地前先进行技术验证(POC):用真实业务数据测试方案的性能、延迟与一致性,再逐步推广到生产环境。

除非注明,否则均为李锋镝的博客原创文章,转载必须以链接形式标明本文链接

本文链接:https://www.lifengdi.com/zhong-jian-jian/4546

相关文章

  • 深入了解PostgreSQL
  • 为什么MySQL要“小表驱动大表”
  • 为什么不建议在 Docker 中运行 MySQL?从技术原理到实践避坑
  • Redis 不只是缓存:8 大实战场景 + 深度避坑指南,从入门到架构师级应用
  • 高性能场景为什么推荐使用PostgreSQL,而非MySQL?
本作品采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可
标签: ElasticSearch MySQL 数据 架构
最后更新:2025年10月30日

李锋镝

既然选择了远方,便只顾风雨兼程。

打赏 点赞
< 上一篇
下一篇 >

文章评论

1 2 3 4 5 6 7 8 9 11 12 13 14 15 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 46 47 48 49 50 51 52 53 54 55 57 58 60 61 62 63 64 65 66 67 69 72 74 76 77 78 79 80 81 82 85 86 87 90 92 93 94 95 96 97 98 99
取消回复

位卑未敢忘忧国,事定犹须待阖棺。

那年今日(12月17日)

  • 1981年:德国足球运动员蒂姆·维泽出生
  • 1971年:印度和东巴基斯坦达成停火协议
  • 1909年:比利时国王利奥波德二世逝世
  • 1905年:狙击之王西蒙·海耶出生
  • 1902年:京师大学堂正式开学
  • 更多历史事件
最新 热点 随机
最新 热点 随机
AI原生数据库新标杆:seekdb深度解析,轻量架构与混合搜索的双重革命 做了一个WordPress文章热力图插件 Spring WebFlux底层原理深度剖析-从响应式流到事件循环的全链路拆解 Spring WebFlux深度解析:异步非阻塞架构与实战落地指南 规范驱动AI编程:用OpenSpec实现100%可控开发,从需求到代码的全流程闭环 WordPress网站换了个字体,差点儿把样式换崩了
玩博客的人是不是越来越少了?准备入手个亚太的ECS,友友们有什么建议吗?使用WireGuard在Ubuntu 24.04系统搭建VPNWordPress实现用户评论等级排行榜插件Gemini 3 Pro 深度测评:多模态AI编程的跨代际突破,从一句话到完整应用的全链路革命WordPress网站换了个字体,差点儿把样式换崩了
使用itext和freemarker来根据Html模板生成PDF文件,加水印、印章 项目中不用 redis 分布式锁,怎么防止用户重复提交? SpringBoot框架自动配置之spring.factories和AutoConfiguration.imports JAVA线程池简析(JDK1.6) IDEA版本2020.*全局MAVEN配置 Gemini 3 深度解析:从像素级复刻到 AGI 雏形,多模态 AI 如何重构开发与创作?
标签聚合
JVM WordPress SQL 日常 K8s 架构 SpringBoot AI编程 MySQL ElasticSearch 多线程 分布式 数据库 AI JAVA docker 设计模式 Spring IDEA Redis
友情链接
  • Blogs·CN
  • Honesty
  • 临窗旋墨
  • 哥斯拉
  • 彬红茶日记
  • 志文工作室
  • 搬砖日记
  • 旧时繁华
  • 林羽凡
  • 瓦匠个人小站
  • 皮皮社
  • 知向前端
  • 蜗牛工作室
  • 韩小韩博客
  • 风渡言

COPYRIGHT © 2025 lifengdi.com. ALL RIGHTS RESERVED.

Theme Kratos Made By Dylan

津ICP备2024022503号-3