canal原理及使用

浩客2年前技术文章1501

什么是canal

canal,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

这里我们可以简单地把canal理解为一个用来同步增量数据的一个工具

Dingtalk_20231229030412.jpg

工作原理

MySQL主备复制原理

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)

  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)

  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

canal 工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议

  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )

  • canal 解析 binary log 对象(原始为 byte 流)

1、开启mysql binlog功能
开启之后对mysql的性能会有一定的影响

# 修改my.cnf
vim /etc/my.cnf

[mysqld]    
# 打开binlog
log-bin=mysql-bin
# 选择ROW(行)模式
binlog-format=ROW
# 配置MySQL replaction需要定义,不要和canal的slaveId重复
server_id=1


# 改了配置文件之后,重启MySQL,使用命令查看是否打开binlog模式:
service mysqld restart
# 进入mysql命令行验证一下
show variables like 'log_bin';


2、搭建canal
# 解压到指定目录下
tar -xvf canal.deployer-1.1.4.tar.gz 

#修改配置文件conf/example/instance.properties
# mysql地址
canal.instance.master.address=master:3306
# mysql用户名和密码
canal.instance.dbUsername=root
canal.instance.dbPassword=123456

# 增加canal数据采集之后写入kafkatopic
# 动态topic,每一个表自动创建一个topic
canal.mq.dynamicTopic=bigdata\\..*

# 修改配置文件conf/canal.properties
# zookeeper的地址,如果有多台需要写多个使用逗号分隔
canal.zkServers = master:2181
# canal保存数据的位置
canal.serverMode = kafka
# kafka地址列表
canal.mq.servers = master:9092

# 启动Canal   启动后会出现进程CanalLauncher
# 启动canal之前需要先保证kafka处于启动状态
cd /usr/local/soft/canal/bin/
# 启动
./startup.sh
# 停止
stop.sh
# 重启
restart.sh


## 测试

-- 在mysql创建表插入数据

mysql -uroot -p123456

use `bigdata`;

-- 创建表
CREATE TABLE `order`
(
    id          BIGINT UNIQUE PRIMARY KEY AUTO_INCREMENT COMMENT '主键',
    order_id    VARCHAR(64)   COMMENT '订单ID',
    amount      DECIMAL(10, 2) COMMENT '订单金额',
    create_time DATETIME       COMMENT '创建时间',
    UNIQUE uniq_order_id (`order_id`)
) COMMENT '订单表';

--  插入数据

INSERT INTO `order`(order_id, amount) VALUES ('100', 66);
UPDATE `order` SET amount = 88 WHERE order_id = '100';
DELETE  FROM `order` WHERE order_id = '100';

-- 查看kafka中是否出现topic

kafka-topics.sh --list  --zookeeper  master:2181,node1:2181,node2:2181

--会出现bigdata.order

-- 消费数据
kafka-console-consumer.sh --bootstrap-server master:9092 --from-beginning --topic bigdata.order
返回列表

上一篇:数据湖Iceberg

下一篇:FLink-Canal

相关文章

xtrabackup报错记录

xtrabackup报错记录

报错记录报错一:报错显示需要依赖,但是当我们执行 yum install -y libstdc++ 显示已是最新版本,且通过 yum 安装 一般会将依赖包都直接安装成功。这时候可以核实一下安装包和当前...

CDH实操--CDH5.8.2升级(一)

CDH实操--CDH5.8.2升级(一)

1、摘要和概述本次升级是从cdh5.8.2升级至cdh6.2.1,由于cm对cdh的兼容性,需要先升级cm,从5.8.2升级至至6.2.1。2、升级CM2.1 CM安装文件2.2 CDH安装文件2.3...

PG参数整理

一、参数的分类参数的类型名称说明internal内部参数,只读无法修改。postgres程序写死或者是在初始化指定后无法修改的参数postmaster更改该类参数,需重启生效sighup不需重启,重新...

Python 类型注解和参数类型检查

1、类型注解1.1 函数定义的弊端Python 是动态语言,变量随时可以被赋值,且能赋值为不同的类型。Python 不是静态编译型语言,变量类型是在运行期决定的。动态语言很灵活,但是这种特性也是弊端。...

MySQL 官方高可用方案:Innodb ReplicaSet

MySQL 官方高可用方案:Innodb ReplicaSet

说明MySQL Innodb ReplicaSet 是 MySQL 团队在 2020 年推出的一款产品,用来帮助用户快速部署和管理主从复制,在数据库层仍然使用的是主从复制技术。ReplicaSet 主...

MySQL的数据拆分

MySQL的数据拆分

一、拆分的概念数据拆分当数据过大,存储、SQL性能达到瓶颈;或多个业务共用一个数据库实例,一个小功能故障导致整个系统瘫痪;为解决类似问题,需考虑对数据进行拆分;粗一级的拆分,针对的是业务系统,将不同业...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。