在分布式架构中,MySQL 与 Elasticsearch(ES)的组合早已成为“事务存储+高效检索”的黄金搭档——MySQL 凭借 ACID 特性保障核心数据一致性,ES 则以倒排索引和分布式架构支撑百万级数据的全文检索、聚合分析。但二者的协同核心,始终绕不开“数据同步”这一关键环节:如何在保证数据一致性的前提下,兼顾实时性、低侵入性与高可用性?
本文将基于业务场景演进,从“简单同步”到“复杂流处理”,详细拆解 6 种 MySQL 同步 ES 方案的底层原理、完整实操步骤、性能优化技巧与实战踩坑经验,结合真实项目案例说明“不同阶段该选哪种方案”,帮你避开从技术选型到落地的所有陷阱。
一、先明确核心诉求:同步方案选型的 4 个关键维度
在选择同步方案前,必须先理清业务的核心诉求——不同诉求对应完全不同的技术选型,这是避免“过度设计”或“方案不足”的关键。
1. 实时性要求:数据延迟容忍度
- 实时(毫秒-秒级):如社交平台的实时动态搜索、电商商品价格变更后立即更新检索结果;
- 准实时(分钟级):如客服系统查询订单状态、运营后台的用户行为分析;
- 离线(小时-T+1 级):如历史订单归档检索、每日用户画像统计。
2. 侵入性:是否允许修改业务代码
- 高侵入:可修改业务代码(如在订单服务中添加 ES 写入逻辑);
- 低侵入:仅允许添加中间件(如 MQ、Canal),不修改核心业务逻辑;
- 无侵入:完全不触碰业务系统(如通过 Logstash 拉取 MySQL 数据)。
3. 数据一致性:是否容忍数据偏差
- 强一致性:MySQL 与 ES 数据实时对齐(如金融交易记录,不允许偏差);
- 最终一致性:短时间内允许偏差,但需保证一段时间后对齐(如商品库存,延迟 10 秒可接受);
- 近似一致性:允许少量数据偏差(如用户行为日志,丢失几条可容忍)。
4. 数据规模与复杂度
- 小规模简单数据:单表、无复杂转换(如订单表同步);
- 大规模分库分表:多库多表、需合并数据(如跨区域订单同步);
- 复杂 ETL 需求:需关联多表、计算衍生字段(如商品价格关联用户画像生成推荐评分)。
二、方案一:同步双写(实时,高侵入)
核心逻辑:在业务代码中,同一事务内同时写入 MySQL 与 ES,确保两者数据“实时对齐”。
适用场景:实时性要求极高(秒级)、数据一致性要求强、业务逻辑简单的场景(如金融交易记录、核心订单状态同步)。
1. 底层原理与风险点
同步双写的本质是“将 ES 写入纳入业务事务链路”,但存在三大核心风险:
- 事务一致性风险:MySQL 提交成功后,ES 写入失败会导致数据不一致;
- 性能瓶颈:ES 写入属于网络 IO 操作,会延长事务耗时,降低系统 TPS;
- 代码侵入性:所有写操作(新增/修改/删除)都需添加 ES 逻辑,维护成本高。
2. 完整实操:从基础实现到一致性保障
(1)基础代码实现(Spring Boot)
@Service
public class OrderSyncService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private RestHighLevelClient esClient;
@Autowired
private OrderFailRetryMapper failRetryMapper; // 本地事务表 DAO
/**
* 同步双写订单:MySQL + ES
* @param order 订单实体
*/
@Transactional(rollbackFor = Exception.class)
public void createOrder(Order order) {
// 1. 写入 MySQL(事务内)
orderMapper.insert(order);
try {
// 2. 同步写入 ES(设置超时时间,避免阻塞事务)
IndexRequest request = new IndexRequest("orders_index") // ES 索引名
.id(order.getId().toString()) // ES 文档 ID(与 MySQL 主键一致,确保幂等)
.source(JSON.toJSONString(order), XContentType.JSON)
.setTimeout(TimeValue.timeValueSeconds(3)); // 超时 3 秒,避免长期阻塞
// 执行 ES 写入
esClient.index(request, RequestOptions.DEFAULT);
} catch (Exception e) {
// 3. ES 写入失败:记录到本地事务表,后续重试(保障最终一致性)
OrderFailRetry retry = new OrderFailRetry();
retry.setOrderId(order.getId());
retry.setData(JSON.toJSONString(order));
retry.setRetryCount(0);
retry.setStatus(0); // 0-待重试,1-已成功,2-重试失败
retry.setCreateTime(new Date());
failRetryMapper.insert(retry);
// 注意:此处不抛异常,避免 MySQL 事务回滚(优先保证核心库数据)
log.error("ES 写入失败,已记录重试任务,orderId:{}", order.getId(), e);
}
}
}
(2)关键优化:本地事务表 + 定时重试
ES 写入失败后,通过“本地事务表”记录失败数据,定时任务重试,确保最终一致性:
/**
* 定时重试 ES 失败任务(每 1 分钟执行一次)
*/
@Scheduled(fixedRate = 60 * 1000)
public void retryEsFailTask() {
// 1. 查询待重试任务(限制每次查询数量,避免一次性处理过多)
List<OrderFailRetry> failList = failRetryMapper.selectByStatusAndRetryCount(0, 5); // 最多重试 5 次
if (failList.isEmpty()) {
return;
}
// 2. 批量重试(使用线程池提高效率)
ExecutorService executor = Executors.newFixedThreadPool(5);
for (OrderFailRetry retry : failList) {
executor.submit(() -> {
try {
Order order = JSON.parseObject(retry.getData(), Order.class);
// 重新执行 ES 写入
IndexRequest request = new IndexRequest("orders_index")
.id(order.getId().toString())
.source(retry.getData(), XContentType.JSON);
esClient.index(request, RequestOptions.DEFAULT);
// 3. 重试成功:更新任务状态
retry.setStatus(1);
retry.setUpdateTime(new Date());
failRetryMapper.updateById(retry);
} catch (Exception e) {
// 4. 重试失败:累计重试次数,超过阈值标记为失败
retry.setRetryCount(retry.getRetryCount() + 1);
if (retry.getRetryCount() >= 5) {
retry.setStatus(2); // 重试 5 次失败,人工介入
log.error("订单 {} 重试 ES 写入 5 次失败,需人工处理", retry.getOrderId(), e);
}
retry.setUpdateTime(new Date());
failRetryMapper.updateById(retry);
}
});
}
executor.shutdown();
}
(3)事务表设计(MySQL)
CREATE TABLE `order_fail_retry` (
`id` BIGINT PRIMARY KEY AUTO_INCREMENT COMMENT '主键',
`order_id` BIGINT NOT NULL COMMENT '订单 ID',
`data` TEXT NOT NULL COMMENT '订单 JSON 数据',
`retry_count` INT NOT NULL DEFAULT 0 COMMENT '重试次数',
`status` TINYINT NOT NULL COMMENT '状态:0-待重试,1-已成功,2-重试失败',
`create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
UNIQUE KEY `uk_order_id` (`order_id`) COMMENT '避免重复记录同一订单',
KEY `idx_status_retry_count` (`status`, `retry_count`) COMMENT '查询待重试任务'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT '订单 ES 同步失败重试表';
3. 性能优化:缓解双写带来的 TPS 下降
同步双写会导致事务耗时延长(如从 50ms 增至 200ms),TPS 可能下降 30%-50%,可通过以下方式优化:
-
异步化 ES 写入(非事务内):将 ES 写入从事务中剥离,用线程池异步执行(需接受“MySQL 提交后 ES 写入延迟”);
// 优化:用线程池异步写入 ES,不阻塞事务 @Async("esWritePool") // 自定义线程池 public CompletableFutureasyncWriteEs(Order order) { try { // ES 写入逻辑... return CompletableFuture.runAsync(() -> {}); } catch (Exception e) { // 记录重试表... return CompletableFuture.failedFuture(e); } } - ES 批量写入:对高频写入场景(如秒杀订单),积累一定数量后批量提交(
BulkRequest),减少网络 IO 次数; - 线程池参数调优:根据 ES 集群性能调整写入线程池大小(如核心线程数=CPU 核心数*2,最大线程数=20),避免线程耗尽。
4. 实战踩坑总结
| 坑点 | 解决方案 |
|---|---|
| MySQL 提交成功,ES 写入失败 | 本地事务表+定时重试,重试策略用指数退避(如 1s→2s→4s→8s) |
| 事务耗时过长,TPS 暴跌 | 剥离 ES 写入到异步线程池,接受短暂不一致 |
| 代码侵入严重,维护成本高 | 封装 ES 写入工具类,统一处理异常与重试 |
三、方案二:异步双写(准实时,中侵入)
核心逻辑:业务代码仅写入 MySQL,通过 MQ(如 Kafka、RocketMQ)异步通知 ES 同步,解耦主业务与 ES 写入。
适用场景:实时性要求较高(秒级)、允许轻微延迟、业务代码可修改的场景(如电商商品更新、用户信息同步)。
1. 底层原理:MQ 解耦与最终一致性
异步双写通过 MQ 实现“生产者-消费者”模式,核心优势是故障隔离(ES 宕机不影响 MySQL 主业务),但需解决三大问题:
- 消息顺序性:同一数据的更新/删除操作需按顺序执行,避免乱序;
- 消息一致性:确保 MySQL 写入后,MQ 消息不丢失、不重复;
- 消费幂等性:避免重复消费导致 ES 数据重复或覆盖错误。
2. 完整实操:从 MQ 选型到消费保障
(1)MQ 选型:Kafka vs RocketMQ
| 特性 | Kafka | RocketMQ | 推荐场景 |
|---|---|---|---|
| 事务消息 | 不支持(需自定义) | 原生支持事务消息,解决“半消息”问题 | 对一致性要求高的场景(如订单同步) |
| 顺序性 | 支持分区内顺序 | 支持全局顺序(单分区) | 需严格顺序的场景(如商品价格变更) |
| 吞吐量 | 高(百万级 QPS) | 中(十万级 QPS) | 高并发场景选 Kafka,复杂业务选 RocketMQ |
| 延迟消息 | 不支持(需自定义延时队列) | 原生支持延迟消息(1s-2h) | 需延迟重试的场景 |
实战建议:若需保障“MySQL 写入成功则 MQ 消息必发”,优先选 RocketMQ 的事务消息;若追求高吞吐量,选 Kafka。
(2)基于 RocketMQ 事务消息的实现
// 1. 生产者:业务服务(MySQL 写入 + MQ 事务消息)
@Service
public class ProductSyncService {
@Autowired
private ProductMapper productMapper;
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 更新商品并发送事务消息
*/
public void updateProduct(Product product) {
// 发送事务消息:半消息(MQ 暂存,未提交)
String transactionId = UUID.randomUUID().toString();
rocketMQTemplate.sendMessageInTransaction(
"product-update-group", // 生产者组
"product-update-topic", // 主题
MessageBuilder.withPayload(product.getId().toString())
.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
.build(),
product // 附加参数,供事务监听使用
);
}
/**
* 事务监听:确认 MySQL 写入成功后,提交 MQ 消息
*/
@RocketMQTransactionListener(txProducerGroup = "product-update-group")
public class ProductTransactionListener implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
Product product = (Product) arg;
try {
// 执行 MySQL 更新(本地事务)
productMapper.updateById(product);
return RocketMQLocalTransactionState.COMMIT; // 提交消息,消费者可消费
} catch (Exception e) {
return RocketMQLocalTransactionState.ROLLBACK; // 回滚消息,消费者不可见
}
}
// 事务回查:解决生产者崩溃导致的状态未知
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String productId = new String((byte[]) msg.getPayload());
// 查询 MySQL 中商品是否已更新
Product product = productMapper.selectById(productId);
if (product != null && product.getUpdateTime() != null) {
return RocketMQLocalTransactionState.COMMIT;
}
return RocketMQLocalTransactionState.ROLLBACK;
}
}
}
// 2. 消费者:ES 同步服务
@Service
public class EsSyncConsumer {
@Autowired
private ProductMapper productMapper;
@Autowired
private RestHighLevelClient esClient;
@Autowired
private RedisTemplate<String, String> redisTemplate; // 用于幂等控制
/**
* 消费商品更新消息,同步到 ES
*/
@RocketMQMessageListener(
topic = "product-update-topic",
consumerGroup = "es-sync-group",
messageModel = MessageModel.CLUSTERING, // 集群消费,避免重复消费
consumeThreadMax = 20 // 消费线程数
)
public void consumeProductUpdate(String productId) {
// 1. 幂等控制:避免重复消费(Redis 记录已消费的 productId,过期时间 24h)
String key = "es:consumed:product:" + productId;
Boolean isConsumed = redisTemplate.opsForValue().setIfAbsent(key, "1", 24, TimeUnit.HOURS);
if (Boolean.FALSE.equals(isConsumed)) {
log.info("商品 {} 已同步 ES,跳过重复消费", productId);
return;
}
// 2. 查询 MySQL 最新数据(避免消费时数据未提交)
Product product = null;
for (int i = 0; i < 3; i++) { // 重试 3 次,等待 MySQL 提交
product = productMapper.selectById(productId);
if (product != null) break;
try { Thread.sleep(100); } catch (InterruptedException e) {}
}
if (product == null) {
log.error("商品 {} 未查询到,同步 ES 失败", productId);
redisTemplate.delete(key); // 删除幂等标记,允许后续重试
return;
}
// 3. 同步到 ES
try {
IndexRequest request = new IndexRequest("products_index")
.id(productId)
.source(JSON.toJSONString(product), XContentType.JSON);
esClient.index(request, RequestOptions.DEFAULT);
log.info("商品 {} 同步 ES 成功", productId);
} catch (Exception e) {
log.error("商品 {} 同步 ES 失败", productId, e);
redisTemplate.delete(key); // 删除幂等标记,允许重试
// 可选:记录到重试表,定时重发
}
}
}
(3)顺序性保障:分区键与单分区策略
若业务要求“商品价格更新必须按顺序同步”(如先降价再涨价,不能反过来),需确保同一商品的消息在同一 MQ 分区:
- Kafka:生产者发送时指定
partitionKey = productId,确保同一商品的消息进入同一分区; - RocketMQ:通过
messageQueueSelector自定义队列选择器,按 productId 哈希分配队列。
// Kafka 生产者指定分区键
ProducerRecord<String, String> record = new ProducerRecord<>(
"product-update-topic",
productId, // partitionKey:按商品 ID 哈希到分区
productId // value:消息内容
);
kafkaTemplate.send(record);
(4)消息堆积监控:Lag 值告警
MQ 消费延迟(Lag 值 = 生产消息数 - 消费消息数)是核心监控指标,需设置阈值告警:
- 阈值设置:根据业务容忍度(如 Lag > 1000 触发告警);
- 告警渠道:集成 Prometheus + Grafana 可视化,或通过钉钉/企业微信推送告警;
- 处理方案:堆积时临时扩容消费节点,或降低消费端处理逻辑复杂度。
3. 实战踩坑总结
| 坑点 | 解决方案 |
|---|---|
| 消息乱序(先删后更) | 同一数据的消息分配到同一 MQ 分区,消费端按顺序处理 |
| 消费时 MySQL 数据未提交 | 消费端重试查询(100ms/次,最多 3 次),等待事务提交 |
| 消息堆积导致 ES 同步延迟 | 监控 Lag 值,超阈值时扩容消费节点,优化消费逻辑 |
| 重复消费导致 ES 数据重复 | Redis 幂等标记(setIfAbsent),或 ES 文档 ID 唯一 |
四、方案三:Logstash 定时拉取(离线,无侵入)
核心逻辑:通过 Logstash 的 JDBC 输入插件,定时从 MySQL 拉取增量数据,写入 ES。
适用场景:实时性要求低(分钟-T+1 级)、无业务代码修改权限、适合离线分析的场景(如用户行为日志统计、历史订单检索)。
1. 底层原理:定时增量同步
Logstash 基于“定时任务+增量查询”实现同步,核心是通过 sql_last_value 记录上一次同步的位置(如 update_time),避免全表扫描。但需注意:
- 同步延迟:延迟取决于定时任务间隔(如 5 分钟一次,延迟最多 5 分钟);
- 增量依赖:需 MySQL 表有“更新时间+主键”字段,确保增量数据可定位。
2. 完整实操:从配置到优化
(1)核心配置文件(logstash.conf)
# 1. 输入插件:JDBC 拉取 MySQL 增量数据
input {
jdbc {
# MySQL JDBC 驱动(需提前放入 Logstash 的 lib 目录)
jdbc_driver_library => "/usr/share/logstash/lib/mysql-connector-java-8.0.32.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
# MySQL 连接地址(useSSL=false 关闭 SSL,serverTimezone 统一时区)
jdbc_connection_string => "jdbc:mysql://mysql-host:3306/user_db?useSSL=false&serverTimezone=Asia/Shanghai"
jdbc_user => "root"
jdbc_password => "123456"
# 定时任务间隔(Cron 表达式:每 5 分钟执行一次)
schedule => "*/5 * * * *"
# 增量查询 SQL(sql_last_value 是 Logstash 内置变量,记录上一次的 update_time)
statement => "
SELECT id, user_id, action_type, create_time, update_time
FROM user_behavior
WHERE update_time > :sql_last_value
ORDER BY update_time ASC, id ASC
"
# 追踪增量的字段(此处用 update_time,确保每次拉取最新数据)
tracking_column => "update_time"
# 追踪字段类型(timestamp/datetime 需指定为 timestamp)
tracking_column_type => "timestamp"
# 记录 sql_last_value 的文件路径(避免 Logstash 重启后丢失)
last_run_metadata_path => "/usr/share/logstash/config/user_behavior_last_run"
# 每次拉取的批量大小(避免一次拉取过多数据导致 OOM)
jdbc_fetch_size => 1000
}
}
# 2. 过滤插件:数据转换(可选,如字段重命名、日期格式化)
filter {
# 日期格式化:将 create_time 从字符串转为 ES 的 date 类型
date {
match => ["create_time", "yyyy-MM-dd HH:mm:ss"]
target => "@timestamp"
timezone => "Asia/Shanghai"
}
# 字段重命名:将 user_id 改为 user_id_str(避免 ES 自动识别为 long 类型)
mutate {
rename => { "user_id" => "user_id_str" }
convert => { "user_id_str" => "string" } # 转为字符串类型
}
}
# 3. 输出插件:写入 ES
output {
elasticsearch {
# ES 集群地址(多个节点用逗号分隔)
hosts => ["es-host1:9200", "es-host2:9200"]
# ES 索引名(支持日期滚动,如 user_behavior-2024.05.20)
index => "user_behavior-%{+YYYY.MM.dd}"
# ES 文档 ID(用 MySQL 主键,确保幂等)
document_id => "%{id}"
# 批量写入大小(根据 ES 性能调整,默认 500)
batch_size => 1000
# 超时时间
timeout => 30
}
# 日志输出到控制台(调试用,生产环境可关闭)
stdout {
codec => rubydebug
}
}
(2)关键优化:避免全表扫描与 OOM
- 增量查询优化:
- 必须为
update_time和id建立联合索引(idx_update_time_id),避免 where 条件导致全表扫描; - SQL 用
update_time > :sql_last_value AND id > :last_id双重条件,防止update_time重复导致数据漏拉;
- 必须为
- Logstash JVM 优化:
- 修改
config/jvm.options,设置堆内存(如-Xms4g -Xmx4g,不超过物理内存的 50%),避免 OOM; - 关闭不必要的插件(如监控插件),减少资源占用;
- 修改
- ES 写入优化:
- 索引按日期滚动(如
user_behavior-2024.05.20),避免单索引过大; - 关闭 ES 索引的副本(
number_of_replicas: 0),同步完成后再开启,提升写入速度。
- 索引按日期滚动(如
3. 实战踩坑总结
| 坑点 | 解决方案 |
|---|---|
| 定时拉取导致全表扫描 | 为 update_time+id 建立联合索引,SQL 用增量条件 |
| Logstash 重启后丢失同步位置 | 配置 last_run_metadata_path,持久化 sql_last_value |
| ES 字段类型不匹配(如日期) | 在 filter 中用 date 插件格式化日期,明确字段类型 |
| 拉取数据过多导致 OOM | 减小 jdbc_fetch_size 和 batch_size,增加 JVM 堆内存 |
五、方案四:Canal 监听 Binlog(实时,无侵入)
核心逻辑:Canal 模拟 MySQL Slave 节点,实时解析 Binlog 日志,将数据变更同步到 MQ 或直接写入 ES。
适用场景:实时性要求高(毫秒级)、无业务代码修改权限、高并发生产环境(如社交平台动态同步、电商库存更新)。
1. 底层原理:Binlog 解析与实时同步
Canal 的工作流程如下:
- 模拟 Slave:Canal 向 MySQL Master 发送dump 请求,模拟 Slave 节点;
- 解析 Binlog:Master 推送 Binlog 到 Canal,Canal 解析 Binlog(需 Binlog 为 ROW 模式);
- 数据分发:Canal 将解析后的变更数据(新增/更新/删除)发送到 MQ(如 RocketMQ),或通过 Canal Adapter 直接写入 ES。
2. 完整实操:从 Canal 部署到 ES 同步
(1)MySQL 前置配置(开启 Binlog)
Canal 依赖 MySQL Binlog,需先配置 MySQL:
# MySQL 配置文件(my.cnf)
[mysqld]
# 开启 Binlog
log_bin = mysql-bin
# Binlog 格式必须为 ROW(STATEMENT 模式无法解析具体数据)
binlog_format = ROW
# 服务 ID(唯一,不能与其他 Slave 重复)
server_id = 1
# 仅同步指定数据库(可选,减少 Binlog 体积)
binlog_do_db = order_db
配置后重启 MySQL,执行 show variables like 'log_bin%' 确认 Binlog 已开启。
(2)Canal Server 配置(canal.properties)
# Canal 服务端口
canal.port = 11111
# Binlog 解析模式(默认 ROW)
canal.parse.dbsync.parser = org.apache.canal.parse.inbound.mysql.dbsync.DbSyncParser
# MQ 配置(将解析后的 Binlog 发送到 RocketMQ)
canal.mq.servers = rocketmq-host:9876
canal.mq.topic = canal_binlog_topic
# 分区策略(按表名哈希,确保同一表的数据在同一分区)
canal.mq.partitionHash = order_db.order_info:id,order_db.order_item:order_id
(3)Canal Adapter 配置(同步到 ES)
Canal Adapter 是 Canal 的扩展组件,可直接将 Binlog 数据同步到 ES,无需 MQ:
# adapter.yml 核心配置
dataSourceKey: defaultDS # 数据源标识(对应 application.yml 中的数据库配置)
destination: example # Canal Server 的 destination(默认 example)
groupId: g1
esMapping:
_index: order_info_index # ES 索引名
_type: _doc # ES 7.x+ 已废弃 type,固定为 _doc
_id: id # ES 文档 ID(与 MySQL 主键一致)
sql: "SELECT o.id, o.order_no, o.user_id, o.total_amount, o.create_time FROM order_info o WHERE o.id = #{id}"
# 字段映射(MySQL 字段 → ES 字段,类型不匹配时需指定)
objFields:
total_amount: double # MySQL 的 decimal 转为 ES 的 double
# 增量同步条件(基于 Binlog 的变更时间)
etlCondition: "o.update_time > {updateTime}"
(4)高可用配置:Canal 集群
单节点 Canal 存在单点故障风险,需部署集群:
- 多 Canal Server 连接同一 MySQL Master,每个 Server 模拟独立 Slave;
- Canal Admin:可视化管理 Canal 集群,监控同步状态、配置变更;
- MQ 高可用:RocketMQ/Kafka 集群部署,避免 MQ 单点故障。
3. 实战踩坑总结
| 坑点 | 解决方案 |
|---|---|
| Binlog 格式不是 ROW 模式 | 修改 MySQL 配置为 binlog_format=ROW,重启 MySQL |
| 数据漂移(DDL 变更未同步) | 用 Schema Registry 同步 MySQL 表结构与 ES mapping,DDL 后自动更新 ES 映射 |
| Canal Server 单点故障 | 部署 Canal 集群,通过 Canal Admin 管理节点状态 |
| 大事务 Binlog 导致 OOM | 拆分大事务,调整 Canal 的 parse 线程池大小,增加 JVM 堆内存 |
六、方案五:DataX 批量同步(离线,无侵入)
核心逻辑:DataX 是阿里开源的离线数据同步工具,支持 MySQL 到 ES 的批量数据迁移,适合大规模历史数据同步。
适用场景:历史数据迁移(如分库分表数据合并)、离线批量同步(如每日全量同步)、无业务代码修改权限的场景。
1. 底层原理:分布式批量同步
DataX 采用“Reader-Channel-Writer”架构:
- Reader:从数据源(MySQL)读取数据;
- Channel:负责数据传输,支持并发(可设置 Channel 数提升性能);
- Writer:将数据写入目标端(ES)。
2. 完整实操:从配置到性能调优
(1)核心配置文件(mysql2es.json)
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"connection": [
{
"querySql": [
"SELECT id, order_no, user_id, total_amount, create_time FROM order_info WHERE create_time >= '2024-01-01' AND create_time < '2024-02-01'"
],
"jdbcUrl": [
"jdbc:mysql://mysql-host:3306/order_db?useSSL=false&serverTimezone=Asia/Shanghai"
]
}
},
"splitPk": "id", // 分库分表场景的分片键(按 id 分片,避免数据重复)
"fetchSize": 1000, // 每次从 MySQL 读取的行数
"batchSize": 1000 // 每次批量提交给 Channel 的行数
}
},
"writer": {
"name": "elasticsearchwriter",
"parameter": {
"endpoint": "http://es-host:9200",
"index": "order_info_202401", // ES 索引名(按月份分区)
"docId": "${id}", // ES 文档 ID(用 MySQL 主键)
"batchSize": 1000, // 每次批量写入 ES 的行数
"column": [
{"name": "id", "type": "long"},
{"name": "order_no", "type": "string"},
{"name": "user_id", "type": "long"},
{"name": "total_amount", "type": "double"},
{"name": "create_time", "type": "date", "format": "yyyy-MM-dd HH:mm:ss"}
],
"writeMode": "insert" // 写入模式:insert(新增)、update(更新)
}
}
}
],
"setting": {
"speed": {
"channel": 5, // Channel 数(建议与 ES 分片数一致,如 5 个 Channel 对应 5 个 ES 分片)
"byte": 10485760 // 每秒传输字节数限制(10MB)
},
"errorLimit": {
"record": 100, // 允许的错误记录数(超过则任务失败)
"percentage": 0.01 // 允许的错误百分比
}
}
}
}
(2)分库分表场景处理
若 MySQL 是分库分表(如 order_db_1-order_db_10),需配置多数据源:
"connection": [
{
"querySql": ["SELECT ... FROM order_info WHERE id BETWEEN 1 AND 1000000"],
"jdbcUrl": ["jdbc:mysql://mysql-host:3306/order_db_1?..."]
},
{
"querySql": ["SELECT ... FROM order_info WHERE id BETWEEN 1000001 AND 2000000"],
"jdbcUrl": ["jdbc:mysql://mysql-host:3306/order_db_2?..."]
}
// 其他分库...
]
(3)性能调优参数
| 参数 | 优化建议 | 原理 |
|---|---|---|
channel |
等于 ES 分片数(如 5-10 个) | 避免单 Channel 写入压力过大,并行写入提升速度 |
fetchSize |
1000-2000(根据 MySQL 性能调整) | 减少 MySQL 连接次数,降低 IO 开销 |
batchSize |
1000-5000(不超过 ES 批量写入上限) | 减少 ES 网络 IO 次数,提升批量写入效率 |
byte |
10-20MB(根据网络带宽调整) | 避免网络拥堵,平衡传输速度与稳定性 |
3. 实战踩坑总结
| 坑点 | 解决方案 |
|---|---|
| 分库分表数据重复/遗漏 | 用 splitPk 按主键分片,确保每个分片无重叠 |
| 数据类型不匹配(如 decimal→double) | 在 writer 的 column 中明确字段类型,或用 convert 函数转换 |
| 任务失败后重跑效率低 | 开启断点续传(DataX 1.0+ 支持),记录失败位置 |
| 全量迁移导致 MySQL 压力大 | 夜间低峰期执行,限制 fetchSize,避免全表扫描 |
七、方案六:Flink 流处理(实时,低侵入)
核心逻辑:基于 Flink 实时流处理引擎,消费 MySQL Binlog(通过 Canal 或 Debezium),进行复杂 ETL 处理后写入 ES。
适用场景:实时性要求高(毫秒级)、需复杂数据处理(如多表关联、实时计算)、大规模数据场景(如实时数仓、推荐系统)。
1. 底层原理:流处理与状态管理
Flink 同步 ES 的核心优势是复杂处理能力与状态一致性:
- 复杂 ETL:支持多表关联(如商品表关联用户画像表)、窗口计算(如 5 分钟内的订单汇总);
- 状态管理:通过 RocksDB 存储中间状态,处理乱序数据(Watermark 机制);
- Exactly-Once 语义:确保数据不重复、不丢失,与 ES 实现最终一致性。
2. 完整实操:从数据源到 ES 写入
(1)依赖引入(pom.xml)
<!-- Flink 核心依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.17.0</version>
</dependency>
<!-- Flink CDC 依赖(读取 MySQL Binlog) -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.4.0</version>
</dependency>
<!-- Flink ES 连接器 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_2.12</artifactId>
<version>1.17.0</version>
</dependency>
(2)核心代码:Binlog 消费→ETL→ES 写入
public class MysqlToEsFlinkJob {
public static void main(String[] args) throws Exception {
// 1. 创建 Flink 执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 启用 Checkpoint(每 10 秒一次,确保 Exactly-Once 语义)
env.enableCheckpointing(10000);
env.getCheckpointConfig().setCheckpointStorage("hdfs:///flink/checkpoints/");
// 2. 读取 MySQL Binlog(通过 Flink CDC)
DebeziumSourceFunction<RowData> mysqlSource = MySQLSource.<RowData>builder()
.hostname("mysql-host")
.port(3306)
.databaseList("product_db") // 监听的数据库
.tableList("product_db.product_info") // 监听的表
.username("root")
.password("123456")
.deserializer(new JsonDebeziumDeserializationSchema()) // 解析 Binlog 为 JSON
.build();
DataStream<RowData> mysqlStream = env.addSource(mysqlSource);
// 3. 复杂 ETL 处理(示例:关联用户画像表,计算商品推荐评分)
// 3.1 读取用户画像广播流(维表)
DataStream<UserProfile> userProfileStream = env.addSource(new UserProfileSource());
BroadcastStream<UserProfile> broadcastStream = userProfileStream.broadcast(new MapStateDescriptor<>("userProfileState", Long.class, UserProfile.class));
// 3.2 关联商品流与用户画像流
DataStream<ProductWithScore> productWithScoreStream = mysqlStream
.keyBy(row -> row.getLong(2)) // 按 user_id 分组
.connect(broadcastStream)
.process(new BroadcastProcessFunction<RowData, UserProfile, ProductWithScore>() {
private MapState<Long, UserProfile> userProfileState;
@Override
public void open(Configuration parameters) {
userProfileState = getRuntimeContext().getMapState(
new MapStateDescriptor<>("userProfileState", Long.class, UserProfile.class)
);
}
// 处理商品流(主数据流)
@Override
public void processElement(RowData productRow, ReadOnlyContext ctx, Collector<ProductWithScore> out) throws Exception {
long userId = productRow.getLong(2);
UserProfile profile = userProfileState.get(userId);
if (profile == null) {
log.warn("用户 {} 未找到画像,使用默认评分", userId);
out.collect(new ProductWithScore(productRow, 5.0)); // 默认评分 5.0
return;
}
// 计算推荐评分(如:用户偏好*商品类别匹配度)
double score = profile.getPreference() * getCategoryMatch(productRow, profile);
out.collect(new ProductWithScore(productRow, score));
}
// 处理用户画像流(广播流)
@Override
public void processBroadcastElement(UserProfile profile, Context ctx, Collector<ProductWithScore> out) throws Exception {
userProfileState.put(profile.getUserId(), profile);
}
});
// 4. 写入 ES
productWithScoreStream.addSink(
ElasticsearchSink.<ProductWithScore>builder(
Collections.singletonList(new HttpHost("es-host", 9200, "http")),
new ElasticsearchSinkFunction<ProductWithScore>() {
@Override
public void process(ProductWithScore product, RuntimeContext ctx, RequestIndexer indexer) {
// 构建 ES 索引请求
Map<String, Object> json = new HashMap<>();
json.put("id", product.getId());
json.put("product_name", product.getName());
json.put("recommend_score", product.getScore());
json.put("update_time", new Date());
IndexRequest request = Requests.indexRequest()
.index("product_recommend_index")
.id(product.getId().toString())
.source(json);
indexer.add(request);
}
}
).build()
);
// 5. 执行任务
env.execute("MySQL Binlog to ES Recommend Job");
}
// 辅助方法:计算商品类别与用户偏好的匹配度
private double getCategoryMatch(RowData productRow, UserProfile profile) {
String productCategory = productRow.getString(3).toString();
return profile.getPreferredCategories().contains(productCategory) ? 1.2 : 0.8;
}
}
(3)关键配置:Watermark 与状态管理
-
Watermark 处理乱序数据:
// 允许 3 秒的乱序窗口,超过则视为迟到数据 DataStreamtimedStream = mysqlStream .assignTimestampsAndWatermarks( WatermarkStrategy. forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner((row, timestamp) -> row.getTimestamp(4).getTime()) // 用 update_time 作为时间戳 ); - 状态后端配置:
// 使用 RocksDB 作为状态后端(支持大状态) env.setStateBackend(new RocksDBStateBackend("hdfs:///flink/state/"));
3. 实战踩坑总结
| 坑点 | 解决方案 |
|---|---|
| 维表(用户画像)更新不及时 | 用 Broadcast State 实时更新维表,设置状态过期时间(如 1 小时) |
| 乱序数据导致计算错误 | 配置 Watermark 允许 3-5 秒乱序窗口,迟到数据写入侧输出流 |
| Flink 任务重启后状态丢失 | 启用 Checkpoint 并存储到 HDFS,配置状态后端为 RocksDB |
| ES 写入压力过大 | 批量写入(BulkProcessor),根据 ES 性能调整批量大小 |
八、方案对比与选型决策树
1. 6 种方案核心指标对比
| 方案 | 实时性 | 侵入性 | 复杂度 | 一致性 | 适用场景 |
|---|---|---|---|---|---|
| 同步双写 | 秒级 | 高 | 低 | 强一致性 | 金融交易、核心订单同步 |
| 异步双写(MQ) | 秒级 | 中 | 中 | 最终一致性 | 电商商品、用户信息同步 |
| Logstash 定时拉取 | 分钟-T+1 级 | 无 | 低 | 最终一致性 | 离线日志分析、历史数据检索 |
| Canal 监听 Binlog | 毫秒级 | 无 | 高 | 最终一致性 | 高并发生产环境、实时搜索 |
| DataX 批量同步 | 小时级 | 无 | 中 | 最终一致性 | 历史数据迁移、分库分表合并 |
| Flink 流处理 | 毫秒级 | 低 | 极高 | Exactly-Once | 实时数仓、复杂 ETL、推荐系统 |
2. 选型决策树(三步确定方案)
-
第一步:判断实时性需求
- 实时(毫秒-秒级)→ 进入第二步;
- 准实时(分钟级)→ 选“异步双写”或“Logstash”;
- 离线(小时-T+1)→ 选“DataX”或“Logstash”。
-
第二步:判断业务复杂度
- 简单同步(无关联计算)→ 进入第三步;
- 复杂 ETL(多表关联、实时计算)→ 选“Flink 流处理”。
-
第三步:判断侵入性容忍度
- 可修改业务代码 → 选“同步双写”(强一致)或“异步双写”(高吞吐);
- 不可修改代码 → 选“Canal 监听 Binlog”(高实时)。
九、通用最佳实践:数据一致性与监控保障
无论选择哪种方案,都需做好以下保障措施:
-
数据一致性校验:
- 定时对账:通过脚本对比 MySQL 与 ES 的数据量、关键字段(如每日凌晨对比前一天数据);
- 异常告警:发现数据偏差(如差异率>0.1%)时,触发告警并自动重试同步。
-
ES 索引设计优化:
- 提前规划 mapping:避免后续字段类型变更(如 MySQL 的 decimal 对应 ES 的 double 或 scaled_float);
- 合理分片:根据数据量设置 ES 分片数(如 1000 万数据对应 5-10 个分片),避免单分片过大。
-
全链路监控:
- 核心指标:MySQL Binlog 延迟、MQ Lag 值、ES 写入 QPS/延迟、同步成功率;
- 监控工具:Prometheus + Grafana(可视化)、ELK(日志分析)、钉钉/企业微信(告警推送)。
十、总结:没有最优,只有最适配
MySQL 同步 ES 的方案选择,本质是业务需求与技术成本的平衡:
- 小团队、简单场景:优先选“同步双写”或“Logstash”,降低复杂度;
- 中大型团队、高并发场景:选“Canal+MQ”或“异步双写”,兼顾实时性与低侵入;
- 大型企业、复杂场景:选“Flink+Canal”,支撑实时数仓与复杂 ETL。
最终,建议在落地前先进行技术验证(POC):用真实业务数据测试方案的性能、延迟与一致性,再逐步推广到生产环境。
除非注明,否则均为李锋镝的博客原创文章,转载必须以链接形式标明本文链接
文章评论