Flink-CDC部署及测试

耀灵1年前技术文章1556

1、CDC简介

CDC (Change Data Capture) ,在广义的概念上,只要能捕获数据变更的技术,都可以称为 CDC 。但通常我们说的CDC 技术主要面向数据库(包括常见的mysql,Oracle, MongoDB等)的变更,是一种用于捕获数据库中数据变更的技术。

目前市面上的CDC技术非常多,常见的主要包括Flink CDC,DataX,Canal,Sqoop,Kettle,Oracle Goldengate,Debezium等。DataX,Sqoop和kettle的CDC实现技术主要是基于查询的方式实现的,通过离线调度查询作业,实现批处理请求。这种作业方式无法保证数据的一致性,实时性也较差。Flink CDC,Canal,Debezium和Oracle Goldengate是基于日志的CDC技术。这种技术,利用流处理的方式,实时处理日志数据,保证了数据的一致性,为其他服务提供了实时数据。

2、Flink_CDC简介:

目前公司主要是通过canal监控mysql的binlog日志,然后将日志数据实时发送到kafka中,通过flink程序,将日志数据实时下发到其他服务中。这种方式,数据链路长,实时性效果较差,运维也比较复杂。

Flink_CDC技术的出现,解决了传统数据库实时同步的痛点。Flink_CDC通过伪装成msql的slave节点,实时读取master节点全量和增量数据,它能够捕获所有数据的变化,捕获完整的变更记录,无需像查询CDC那样发起全表的扫描过滤,高效且无需入侵代码,完全与业务解耦,运维及其简单。


3、Flink_CDC部署:

3.1 依赖版本

环境:Linux(Centos7)
Flink : 1.13.1
Flink_CDC: flink-sql-connector-mysql-cdc-2.1.0.jar
mysql版本:5.7.35
mysql驱动包:mysql-connector-java-8.0.27.jar

3.2 环境搭建

1、下载flink:

wget https://archive.apache.org/dist/flink/flink-1.13.0/flink-1.13.1-bin-scala_2.11.tgz

2、解压flink:

1.png

3、编辑flink配置文件,配置java环境

2.png

4、上传flink_cdc驱动包和mysql驱动包:

3.png

5、启动flink集群:

/bin/start-cluster.sh

4.png


5.png

6、创建mysql表:

测试环境10.51.13.228 数据库 test_db,mysql必须开启binlog

6.png

CREATE TABLE `products` (
   `id` int NOT NULL,
   `name` varchar(45) DEFAULT NULL,
   `description` varchar(45) DEFAULT NULL,
   `weight` decimal(10,3) DEFAULT NULL,
   PRIMARY KEY (`id`)
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

7、启动flink-sql-client

本次主要是通过flink的sql客户端来测试的。

./flink-1.13.1/bin/sql-client.sh

8、创建Flink_CDC虚拟表:

CREATE TABLE `products_cdc` (
 id INT NOT NULL,
 name varchar(32),
 description varchar(45),
 weight DECIMAL(10,3)
) WITH (
 'scan.incremental.snapshot.enabled' = 'false',
 'connector' = 'mysql-cdc',
 'hostname' = '10.51.13.228',
 'port' = '3306',
 'username' = 'root',
 'password' = 'h7fXy0%z#Ci3',
 'database-name' = 'test_db',
 'table-name' = 'products'
);

9、查询CDC表数据:

select * from products_cdc;

7.png

10、在数据库中新增一条数据:

insert into products(id,name,description,weight) values(100,'yaoling','yaoling',60);

8.png

11、观察products_cdc表数据变化:

9.png

到此,通过flink-sql-client来增量获取mysql全量和增量数据变化。


相关文章

Elasticsearch数据生命周期如何规划

Elasticsearch中的open状态的索引都会占用堆内存来存储倒排索引,过多的索引会导致集群整体内存使用率多大,甚至引起内存溢出。所以需要根据自身业务管理历史数据的生命周期,如近3个月的数据op...

MySQL运维实战之备份和恢复(8.7)将数据库恢复到指定时间点的另外一种方法

使用mysql原生复制功能实现时间点恢复使用mysqlbinlog解析并执行binlog是实现mysql时间点恢复的一种常用的方法。这里提供另外一种实现时间点恢复的方法:使用mysql的复制功能来实现...

chronyc时间同步器配置

chronyc时间同步器配置

chronyc & chronydchrony 有两个核心组件:chronyd守护进程,主要用于调整内核中运行的系统时间和时间服务器同步chronyc命令行界面程序,让用户能够对 chrony...

kafka高可靠性相关配置

kafka高可靠性相关配置

为保证高可靠可以通过以下方面进行设置:1) 物理机器场景配置项配置说明高可靠高性能不间断电源配置,防止服务器异常断电RAID卡电池配置,防止服务器异常断电RAID卡写缓存开启,提高性能RAID 1配置...

CDH实操--impala增加ldap认证

CDH实操--impala增加ldap认证

本文基于cdh安装ldap主主模式,并且配置haproxy+keepalived基础上进行配置。一、impala配置ldap1、impala配置中增加ldap相关验证2、重启过时配置。3、ldap验证...

Prometheus集成pushgateway监控k8s集群

Prometheus集成pushgateway监控k8s集群

Prometheus部署环境介绍本文的k8s环境是通过二进制方式搭建的v1.20.13版本清单准备注意集群版本的坑,自己先到Github上下载对应的版本。注意: 集群版本在v1.21.x之前需要注意下...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。