Flink-CDC部署及测试
1、CDC简介
CDC (Change Data Capture) ,在广义的概念上,只要能捕获数据变更的技术,都可以称为 CDC 。但通常我们说的CDC 技术主要面向数据库(包括常见的mysql,Oracle, MongoDB等)的变更,是一种用于捕获数据库中数据变更的技术。
目前市面上的CDC技术非常多,常见的主要包括Flink CDC,DataX,Canal,Sqoop,Kettle,Oracle Goldengate,Debezium等。DataX,Sqoop和kettle的CDC实现技术主要是基于查询的方式实现的,通过离线调度查询作业,实现批处理请求。这种作业方式无法保证数据的一致性,实时性也较差。Flink CDC,Canal,Debezium和Oracle Goldengate是基于日志的CDC技术。这种技术,利用流处理的方式,实时处理日志数据,保证了数据的一致性,为其他服务提供了实时数据。
2、Flink_CDC简介:
目前公司主要是通过canal监控mysql的binlog日志,然后将日志数据实时发送到kafka中,通过flink程序,将日志数据实时下发到其他服务中。这种方式,数据链路长,实时性效果较差,运维也比较复杂。
Flink_CDC技术的出现,解决了传统数据库实时同步的痛点。Flink_CDC通过伪装成msql的slave节点,实时读取master节点全量和增量数据,它能够捕获所有数据的变化,捕获完整的变更记录,无需像查询CDC那样发起全表的扫描过滤,高效且无需入侵代码,完全与业务解耦,运维及其简单。
3、Flink_CDC部署:
3.1 依赖版本
环境:Linux(Centos7) Flink : 1.13.1 Flink_CDC: flink-sql-connector-mysql-cdc-2.1.0.jar mysql版本:5.7.35 mysql驱动包:mysql-connector-java-8.0.27.jar
3.2 环境搭建
1、下载flink:
wget https://archive.apache.org/dist/flink/flink-1.13.0/flink-1.13.1-bin-scala_2.11.tgz
2、解压flink:
3、编辑flink配置文件,配置java环境
4、上传flink_cdc驱动包和mysql驱动包:
5、启动flink集群:
/bin/start-cluster.sh
6、创建mysql表:
测试环境10.51.13.228 数据库 test_db,mysql必须开启binlog
CREATE TABLE `products` ( `id` int NOT NULL, `name` varchar(45) DEFAULT NULL, `description` varchar(45) DEFAULT NULL, `weight` decimal(10,3) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
7、启动flink-sql-client
本次主要是通过flink的sql客户端来测试的。
./flink-1.13.1/bin/sql-client.sh
8、创建Flink_CDC虚拟表:
CREATE TABLE `products_cdc` ( id INT NOT NULL, name varchar(32), description varchar(45), weight DECIMAL(10,3) ) WITH ( 'scan.incremental.snapshot.enabled' = 'false', 'connector' = 'mysql-cdc', 'hostname' = '10.51.13.228', 'port' = '3306', 'username' = 'root', 'password' = 'h7fXy0%z#Ci3', 'database-name' = 'test_db', 'table-name' = 'products' );
9、查询CDC表数据:
select * from products_cdc;
10、在数据库中新增一条数据:
insert into products(id,name,description,weight) values(100,'yaoling','yaoling',60);
11、观察products_cdc表数据变化:
到此,通过flink-sql-client来增量获取mysql全量和增量数据变化。