Debezium抽取SQL Server同步kafka

芒果1年前技术文章571

ebezium SQL Server连接器捕获SQL Server数据库模式中发生的行级更改。
官方2.0文档:

有关与此连接器兼容的SQL Server版本的信息。

image.png

Debezium SQL Server连接器首次连接到SQL Server数据库或集群时,它会对数据库中的模式进行一致的快照。初始快照完成后,连接器会连续捕获提交到CDC启用的SQL Server数据库INSERTUPDATEDELETE操作的行级更改。该连接器为每个数据更改操作生成事件,并将其流式传输到Kafka主题。该连接器将表的所有事件流式传输到一个专门的Kafka主题。然后,应用程序和服务可以消耗该主题的数据更改事件记录。

要使Debezium SQL Server连接器捕获数据库操作的更改事件记录,您必须首先在SQL Server数据库上启用更改数据捕获。必须在数据库和要捕获的每个表上启用CDC。在源数据库上设置CDC后,连接器可以捕获数据库中发生的行级INSERTUPDATEDELETE操作。该连接器将每个源表的事件记录写入专门用于该表的Kafka主题。每个捕获的表都有一个主题。客户端应用程序读取他们遵循的数据库表的Kafka主题,并可以响应他们从这些主题中消耗的行级事件。
连接器首次连接到SQL Server数据库或集群时,它需要对其配置为捕获更改的所有表的模式进行一致的快照,并将此状态流式传输到Kafka。快照完成后,连接器会持续捕获随后发生的行级更改。通过首先建立所有数据的一致视图,连接器可以继续读取,而不会丢失快照发生时所做的任何更改。
Debezium SQL Server连接器可以容忍故障。当连接器读取更改并生成事件时,它会定期记录事件在数据库日志中的位置(LSN/日志序列号)。如果连接器因任何原因(包括通信故障、网络问题或崩溃)而停止,则在重新启动后,连接器将从读取的最后一点恢复读取SQL Server CDC表。
抵消定期提交。它们不是在更改事件发生时提交的。因此,在中断后,可能会生成重复的事件。
容错也适用于快照。也就是说,如果连接器在快照期间停止,则连接器在重新启动时开始新的快照。


安装sqlserver linux(忽略 或者查看第二篇文章)

在 SQL Server 数据库上启用 CDC

在为表启用 CDC 之前,您必须为 SQL Server 数据库启用它。 SQL Server 管理员通过运行系统存储过程来启用 CDC。系统存储过程可以使用 SQL Server Management Studio 或 Transact-SQL 运行。
先决条件
您是 SQL Server 的 sysadmin 固定服务器角色的成员。
您是数据库的 db_owner。
SQL Server 代理正在运行。
注意:
SQL Server CDC 功能仅处理用户创建的表中发生的更改。您不能在 SQL Server 主数据库上启用 CDC。
程序
1.从 SQL Server Management Studio 的“查看”菜单中,单击“模板资源管理器”。
2.在模板浏览器中,展开 SQL Server 模板。
3.展开更改数据捕获 > 配置,然后单击为 CDC 启用数据库。
4.在模板中,将 USE 语句中的数据库名称替换为要为 CDC 启用的数据库名称。
5.运行存储过程 sys.sp_cdc_enable_db 为 CDC 启用数据库。
为 CDC 启用数据库后,将创建名为 cdc 的模式,以及 CDC 用户、元数据表和其他系统对象。
以下示例显示如何为数据库 TestDB 启用 CDC:
USE TestDB
GO
EXEC sys.sp_cdc_enable_db
GO 

FAD9E219-FFBD-4F45-9278-1AC439B01CAF.png


在 SQL Server 表上启用 CDC
SQL Server 管理员必须在您希望 Debezium 捕获的源表上启用更改数据捕获。数据库必须已为 CDC 启用。要在表上启用 CDC,SQL Server 管理员为该表运行存储过程 sys.sp_cdc_enable_table。存储过程可以使用 SQL Server Management Studio 或 Transact-SQL 运行。必须为要捕获的每个表启用 SQL Server CDC。
先决条件:
CDC 在 SQL Server 数据库上启用。
SQL Server 代理正在运行。
您是数据库的 db_owner 固定数据库角色的成员。
程序
从 SQL Server Management Studio 的“查看”菜单中,单击“模板资源管理器”。
在模板浏览器中,展开 SQL Server 模板。
展开更改数据捕获 > 配置,然后单击启用表指定文件组选项。
在模板中,将 USE 语句中的表名替换为您要捕获的表名。
运行存储过程 sys.sp_cdc_enable_table。
以下示例显示如何为表 mgmg 启用 CDC:
示例:为 SQL Server 表启用 CDC
USE TestDB
GO
EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name   = N'mgmg',
@role_name     = N'NULL',  
@filegroup_name = N'MyDB_CT',
@supports_net_changes = 0
GO

示例
USE MyDB
GO
EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name   = N'MyTable',
@role_name     = N'MyRole',  
@filegroup_name = N'MyDB_CT',
@supports_net_changes = 0
GO
指定要捕获的表的名称。
指定角色 MyRole,您可以向该角色添加要授予对源表的捕获列的 SELECT 权限的用户。具有 sysadmin 或 db_owner 角色的用户还可以访问指定的更改表。将 @role_name 的值设置为 NULL,以仅允许 sysadmin 或 db_owner 中的成员对捕获的信息具有完全访问权限。
指定 SQL Server 为捕获的表放置更改表的文件组。命名的文件组必须已经存在。最好不要将更改表放在用于源表的同一文件组中。
Ps文件组
1. 如何查看数据库中所有的文件组。
   语法:sp_helpfilegroup
    步骤:
               use 数据库
               sp_helpfilegroup
2. 如何找到文件组和文件的对应情况.
        sp_helpdb love
创建文件组。
语法:
   alter database 数据库名 add filegroup 文件组名
步骤:
    use 数据库名
    alter database  数据库名  add filegroup 文件组名
范例:
use love
alter database TestDB add filegroup 财务部

SQL Server 管理员可以运行系统存储过程来查询数据库或表以检索其 CDC 配置信息。存储过程可以使用 SQL Server Management Studio 或 Transact-SQL 运行。
先决条件
您对捕获实例的所有捕获列具有 SELECT 权限。 db_owner 数据库角色的成员可以查看所有已定义捕获实例的信息。
您拥有为查询包括的表信息定义的任何门控角色的成员资格。
程序
从 SQL Server Management Studio 的“查看”菜单中,单击“对象资源管理器”。
在对象资源管理器中,展开数据库,然后展开您的数据库对象,例如 MyDB。
展开可编程性 > 存储过程 > 系统存储过程。
运行 sys.sp_cdc_help_change_data_capture 存储过程来查询表。
查询不应返回空结果。
以下示例在数据库 TestDB 上运行存储过程 sys.sp_cdc_help_change_data_capture:
示例:查询表以获取 CDC 配置信息
USE TestDB;
GO
EXEC sys.sp_cdc_help_change_data_capture
GO 

image.png

该查询返回数据库中每个表的配置信息,这些表为 CDC 启用并且包含调用者有权访问的更改数据。如果结果为空,请验证用户是否具有访问捕获实例和 CDC 表的权限。
部署Debezium SQL Server连接器
要部署Debezium SQL Server连接器,请安装Debezium SQL Server连接器存档,配置连接器,并通过将其配置添加到Kafka Connect来启动连接器。
先决条件
安装了Apache ZooKeeper、Apache Kafka和Kafka Connect。
SQL Server已安装,已配置为CDC,并准备与Debezium连接器一起使用。
程序
下载Debezium SQL Server连接器插件存档
Wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-sqlserver/1.9.7.Final/debezium-connector-sqlserver-1.9.7.Final-plugin.tar.gz
将文件提取到您的Kafka Connect环境中。
将带有JAR文件的目录添加到Kafka Connect的plugin.path中。
配置连接器并将配置添加到您的Kafka Connect集群中。
重新启动您的Kafka Connect流程以提取新的JAR文件。
解压插件包
tar -xvf debezium-connector-sqlserver-1.9.7.Final-plugin.tar.gz
配置kafka,让kafka知道这个插件.
[root@hhz02 config]# vi connect-distributed.properties
配置注意项:(单机模式启动连接器 connect-standlone.properties)
bootstrap.servers=172.16.120.17:9092
offset.storage.topic=connect-sqlserver-status
offset.storage.replication.factor=1
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
status.storage.topic=connect-sqlserver-statu
status.storage.replication.factor=1
config.storage.topic=connect-sqlserver-config
config.storage.replication.factor=1
group.id=connect-sqlserver
offset.flush.interval.ms=10000
plugin.path=/opt/debezium-connector-sqlserver
rest.port=8083

Ps:
如果kafka开启kerberos 该配置文件还需要添加:
security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka

还需要再启动脚本connect-distributed.sh中添加:
export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/opt/dtstack/Kafka/kafka/config/kafka_jaas.conf"
注意:集群模式kafka 需要注意配置分发。
重启kafka集群使配置生效。
启动Kafka connector
/opt/dtstack/DTBase/kafka/bin/connect-distributed.sh -daemon /opt/dtstack/DTBase/kafka/config/connect-sqlserver.properties
注意结尾这个文件 集群模式为:connect-distributed.properties,单机模式为:connect-standalone.properties
(如果启动有问题 观察/opt/app/kafka/logs/connectDistributed.out)
检测Kafka connector是否正常工作
1.    检测kafka连接器的服务状态
[root@node01 logs]# curl -H "Accept:application/json" node01:8083/
{"version":"1.1.1","commit":"8e07427ffb493498","kafka_cluster_id":"svXhhAFkSt6xLRPTBIdE1Q"}
2.    检查向 Kafka Connect 注册的连接器列表
[root@hdp01 kafka]# curl -H "Accept:application/json" node01:8083/connectors/
[]
返回空列表, 表示目前还没有注册的连接器
附:删除命令
curl -X DELETE node01:8083/connectors/db2-connector1
SQL Server 连接器配置示例
以下是连接器实例的配置示例,该实例在 172.16.120.17 的端口 1433 上从 SQL Server 服务器捕获数据,我们在逻辑上将其命名为 fullfillment。通常,您通过设置可用于连接器的配置属性,在 JSON 文件中配置 Debezium SQL Server 连接器。
注册连接器
Kafka Connect 服务的 API 提交POST针对/connectors资源的请求,其中包含描述新连接器(称为inventory-connector)的 JSON 文档。
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" node01:8083/connectors/ -d '{
    "name": "inventory-connector",
    "config": {
        "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
        "database.hostname": "172.16.120.17",
        "database.port": "1433",
        "database.user": "SA",
        "database.password": "Dtstack@123",
        "database.dbname": "TestDB",
        "database.server.name": "fullfillment",
        "table.include.list": "dbo.mgmg",
        "database.history.kafka.bootstrap.servers": "node01:9092",
        "database.history.kafka.topic": "dbhistory.fullfillment"
    }
}' 

image.png

(如果注册有问题 观察/opt/app/kafka/logs/connectDistributed.out)
多看下日志 日志中都可以 写到 如果注册成功,也没有kafka数据 麻烦看下日志WARN中过滤问题。[sourceTableId=DB2INST1.TESTTB, changeTableId=ASNCDC.CDC_DB2INST1_TESTTB
]



查看kafka中的数据
查看所有topic:/opt/dtstack/DTBase/kafka/bin/kafka-topics.sh -list -zookeeper localhost:2181/kafka
消费topic:/opt/dtstack/DTBase/kafka/bin/kafka-console-consumer.sh --bootstrap-server node01:9092 --from-beginning --topic fullfillment.dbo.mgmg 

BE2255E2-4A1C-4A65-AA56-533E49E7E11A.png




相关文章

Java-API-MapReduce的操作WordCount篇

Java-API-MapReduce的操作WordCount篇

首先就是pom文件<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/...

CDH时钟同步问题

CDH时钟同步问题

首先执行如下命令:查看chronyd是否启动systemctl status chronyd如果没启动执行如下命令启动systemctl restart chronyd每台检查时间状态chronyc...

正式发布 | 《云运维服务白皮书》开放下载!

正式发布 | 《云运维服务白皮书》开放下载!

在全球数字化变革的背景下,为适应数字经济环境下企业生存发展和市场变化的需要,企业进行主动的、系统性、整体性的数字化转型升级。大数据、云计算、人工智能、区块链等新一代信息通信技术为企业的数字化转型提供了...

如何重塑IT运维核心竞争力?可观测运维这么做!

如何重塑IT运维核心竞争力?可观测运维这么做!

随着云计算、大数据、人工智能等新兴技术的兴起及运用,无论是通讯、金融、教育,还是交通、政府、企业等行业,都得到飞速发展,但在高速发展的同时,各行业巨大的 IT 维护和管理成本也在与日俱增,存在监控工具...

在经济低迷时管理云服务的策略!

近几年全球经济在疫情等各方面影响之下持续低迷,Wanclouds公司发布的一份研究报告指出,81%的美国IT领导者表示,他们的首席执行官要求他们减少或者不增加云计算支出。事实上,在那些被要求削减成本的...

hive创建hbase映射表

hive创建hbase映射表

hbase创建表,导入数据/opt/app/hbase-2.1.0/bin/hbase shell查看已有表,创建新表,查看表结构listcreate 'student', 'info', 'scor...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。