![](https://cudaai.com/wp-content/uploads/2024/07/640-1.gif)
![](https://cudaai.com/wp-content/uploads/2025/01/image-1024x508.png)
1. Flink 在数据库实时同步中的优势
(1)原生支持变更数据捕获(CDC)Flink 社区提供了内置的 CDC 连接器(如 MySQL CDC、PostgreSQL CDC),可直接捕获数据库的增量变更(如 Binlog、WAL),无需额外中间件。支持 Debezium 集成,通过 Kafka 等消息队列传递变更事件,实现解耦和缓冲。
(2)低延迟与高吞吐Flink 的流处理引擎可实现毫秒级延迟,适合实时同步需求。分布式架构支持横向扩展,轻松应对高吞吐场景(如每秒数万条数据变更)。
(3)Exactly-Once 语义保证通过 Checkpoint 机制和两阶段提交(2PC),Flink 能确保数据从源端到目标端的端到端一致性,避免重复或丢失数据。支持幂等写入到目标数据库(如 MySQL、Kafka、Elasticsearch),保障最终一致性。
(4)灵活的流处理能力在数据同步过程中可嵌入 ETL 逻辑(如过滤、转换、聚合),例如实时清洗数据或关联多表变更。支持复杂事件处理(CEP)和状态计算,适合需要实时计算的场景(如数据一致性校验)。
(5)丰富的连接器生态支持主流数据库(MySQL、PostgreSQL、Oracle)和消息队列(Kafka、Pulsar)作为源或目标。可扩展 API 允许自定义连接器,适配小众数据库。
2. 典型应用场景
(1)实时数仓与数据湖同步将 OLTP 数据库的变更实时同步到 OLAP 系统(如 ClickHouse、Hive、Iceberg),支持实时分析。(2)微服务数据共享跨服务同步关键数据(如订单状态、用户信息),避免直接耦合数据库。
(3)异地多活与灾备实时复制数据到异地数据中心,保障业务连续性。
3. 实现流程
3.1 配置源数据库的 CDC
以 MySQL 为例:
- 启用 binlog:
# my.cnf
server-id=1
log-bin=mysql-bin
binlog_format=ROW
binlog_row_image=FULL
创建 Debezium 用户并授权:
CREATE USER 'flink_user'@'%' IDENTIFIED BY 'password';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flink_user';
3.2 部署 Debezium 并捕获变更到 Kafka
- 配置 Debezium MySQL Connector:
{
"name": "mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "flink_user",
"database.password": "password",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.include.list": "mydb",
"table.include.list": "mydb.users",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.mydb"
}
}
Debezium 将 binlog 转换为事件写入 Kafka Topic(如 dbserver1.mydb.users)。
3.3 使用 Flink 处理变更数据Flink CDC Connector(推荐)直接通过 Flink CDC 读取数据库变更,无需 Kafka:
-- Flink SQL
CREATE TABLE users (
id INT,
name STRING,
email STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'flink_user',
'password' = 'password',
'database-name' = 'mydb',
'table-name' = 'users'
);
CREATE TABLE target_db (
id INT,
name STRING,
email STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://target:3306/mydb',
'table-name' = 'users',
'username' = 'target_user',
'password' = 'target_password'
);
-- 将数据同步到目标表
INSERT INTO target_db SELECT * FROM users;
PyFlink SQL 示例
from pyflink.table import EnvironmentSettings, TableEnvironment
# 初始化 Flink Table 环境
env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)
# 定义 MySQL CDC 源表
t_env.execute_sql("""
CREATE TABLE source_users (
id INT,
name STRING,
email STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'flink_user',
'password' = 'password',
'database-name' = 'mydb',
'table-name' = 'users'
)
""")
# 定义目标 PostgreSQL 表
t_env.execute_sql("""
CREATE TABLE target_users (
id INT,
name STRING,
email STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://localhost:5432/target_db',
'table-name' = 'users',
'username' = 'user',
'password' = 'password'
)
""")
# 执行同步任务
t_env.execute_sql("INSERT INTO target_users SELECT * FROM source_users")
![](https://cudaai.com/wp-content/uploads/2024/07/640-1-1.gif)