FLink-Canal

浩客2年前技术文章1638

canal format

Canal 是一个 CDC(ChangeLog Data Capture,变更日志数据捕获)工具,可以实时地将 MySQL 变更传输到其他系统。Canal 为变更日志提供了统一的数据格式,并支持使用 JSON 或 protobuf 序列化消息(Canal 默认使用 protobuf)。

Flink 支持将 Canal 的 JSON 消息解析为 INSERT / UPDATE / DELETE 消息到 Flink SQL 系统中。在很多情况下,利用这个特性非常的有用,例如

  • 将增量数据从数据库同步到其他系统

  • 日志审计

  • 数据库的实时物化视图

  • 关联维度数据库的变更历史,等等。

Flink 还支持将 Flink SQL 中的 INSERT / UPDATE / DELETE 消息编码为 Canal 格式的 JSON 消息,输出到 Kafka 等存储中。 但需要注意的是,目前 Flink 还不支持将 UPDATE_BEFORE 和 UPDATE_AFTER 合并为一条 UPDATE 消息。因此,Flink 将 UPDATE_BEFORE 和 UPDATE_AFTER 分别编码为 DELETE 和 INSERT 类型的 Canal 消息。

示例:

1、在mysql中创建学生表,插入几条数据

CREATE TABLE `student` (
 `id` varchar(20) NOT NULL,
 `name` varchar(255) DEFAULT NULL,
 `age` bigint(20) DEFAULT NULL,
 `gender` varchar(255) DEFAULT NULL,
 `clazz` varchar(255) DEFAULT NULL,
 PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

2、在flink中创建kafka soure表指定数据的格式为canl-json

canal采集的数据包含三种类型,INSERT, UPDATE,DELETE

FLink 会自动将三种类型转换成变更日志流。同时会自动解析数据

CREATE TABLE student_kafka (
 id STRING,
 name STRING,
 age bigint,
 gender STRING,
 clazz STRING
) WITH (
'connector' = 'kafka',
'topic' = 'bigdata.student',
'properties.bootstrap.servers' = 'master:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'canal-json'  -- 使用 canal-json 格式
);


3、统计班级的人数

select clazz,count(1)  as c 
from student_kafka
group by clazz

4、将统计的结果保存到数据库中

CREATE TABLE clazz_num (
 clazz STRING,
 c BIGINT,
 PRIMARY KEY (clazz) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://master:3306/bigdata?useUnicode=true&characterEncoding=UTF-8',
  'table-name' = 'clazz_num',
  'username' ='root',
  'password' = '123456'
);
insert into clazz_num
select clazz,count(1)  as c
from student_kafka
group by clazz

相关文章

Trino对接haproxy开启ssl

Trino对接haproxy开启ssl

1、背景客户在开启https后,在高并发的情况下,集群性能下降严重。初步估计是由于worker和Coordinator交互都需要进行加密和解密,会提高cpu使用率,导致服务变慢。现在采用haproxy...

MySQL运维实战之ProxySQL(9.9)proxysql自身高可用

MySQL运维实战之ProxySQL(9.9)proxysql自身高可用

proxysql作为一个程序,本身也可能出现故障。部署proxysql的服务器也肯能出现故障。高可用架构的一个基本原则是消除单点。可以在多个节点上部署proxysql,在proxysql之前再加一层负...

 Ranger-hive插件部署

Ranger-hive插件部署

解压插件tar -zxf ranger-metastore-plugin.tar.gz -C /opt修改配置vim /opt/ranger-metastore-plugin/install.prop...

Hadoop生产调优

一、NameNode内存生产配置1.NameNode内存计算每个文件块大概占用150byte,一台服务器128G内存为例,能存储多少文件块呢?128 * 1024 * 1024 * 1024  / 1...

SpringBootWeb 篇-深入了解 SpringBoot + Vue 的前后端分离项目部署上线与 Nginx 配置文件结构(4)

SpringBootWeb 篇-深入了解 SpringBoot + Vue 的前后端分离项目部署上线与 Nginx 配置文件结构(4)

 6.0 nginx 配置文件结构        6.1 先了解以下配置文件的结构Nginx 文件结构:      &n...

SQL Server优化入门系列(五)—— SQL Server的执行计划

SQL Server优化入门系列(五)—— SQL Server的执行计划

定位到TOP SQL后,怎么优化呢?我们需要分析SQL的执行计划,制定相应的优化策略。这篇文章中,我们将介绍查看SQL Server执行计划的几种方法。本文测试案例中使用了AdventureWorks...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。