Debezium抽取SQL Server同步kafka
ebezium SQL Server连接器捕获SQL Server数据库模式中发生的行级更改。
官方2.0文档:
有关与此连接器兼容的SQL Server版本的信息。
Debezium SQL Server连接器首次连接到SQL Server数据库或集群时,它会对数据库中的模式进行一致的快照。初始快照完成后,连接器会连续捕获提交到CDC启用的SQL Server数据库的INSERT、UPDATE或DELETE操作的行级更改。该连接器为每个数据更改操作生成事件,并将其流式传输到Kafka主题。该连接器将表的所有事件流式传输到一个专门的Kafka主题。然后,应用程序和服务可以消耗该主题的数据更改事件记录。
要使Debezium SQL Server连接器捕获数据库操作的更改事件记录,您必须首先在SQL Server数据库上启用更改数据捕获。必须在数据库和要捕获的每个表上启用CDC。在源数据库上设置CDC后,连接器可以捕获数据库中发生的行级INSERT、UPDATE和DELETE操作。该连接器将每个源表的事件记录写入专门用于该表的Kafka主题。每个捕获的表都有一个主题。客户端应用程序读取他们遵循的数据库表的Kafka主题,并可以响应他们从这些主题中消耗的行级事件。
连接器首次连接到SQL Server数据库或集群时,它需要对其配置为捕获更改的所有表的模式进行一致的快照,并将此状态流式传输到Kafka。快照完成后,连接器会持续捕获随后发生的行级更改。通过首先建立所有数据的一致视图,连接器可以继续读取,而不会丢失快照发生时所做的任何更改。
Debezium SQL Server连接器可以容忍故障。当连接器读取更改并生成事件时,它会定期记录事件在数据库日志中的位置(LSN/日志序列号)。如果连接器因任何原因(包括通信故障、网络问题或崩溃)而停止,则在重新启动后,连接器将从读取的最后一点恢复读取SQL Server CDC表。
抵消定期提交。它们不是在更改事件发生时提交的。因此,在中断后,可能会生成重复的事件。
容错也适用于快照。也就是说,如果连接器在快照期间停止,则连接器在重新启动时开始新的快照。
安装sqlserver linux(忽略 或者查看第二篇文章)
在 SQL Server 数据库上启用 CDC
在为表启用 CDC 之前,您必须为 SQL Server 数据库启用它。 SQL Server 管理员通过运行系统存储过程来启用 CDC。系统存储过程可以使用 SQL Server Management Studio 或 Transact-SQL 运行。
先决条件
您是 SQL Server 的 sysadmin 固定服务器角色的成员。
您是数据库的 db_owner。
SQL Server 代理正在运行。
注意:
SQL Server CDC 功能仅处理用户创建的表中发生的更改。您不能在 SQL Server 主数据库上启用 CDC。
程序
1.从 SQL Server Management Studio 的“查看”菜单中,单击“模板资源管理器”。
2.在模板浏览器中,展开 SQL Server 模板。
3.展开更改数据捕获 > 配置,然后单击为 CDC 启用数据库。
4.在模板中,将 USE 语句中的数据库名称替换为要为 CDC 启用的数据库名称。
5.运行存储过程 sys.sp_cdc_enable_db 为 CDC 启用数据库。
为 CDC 启用数据库后,将创建名为 cdc 的模式,以及 CDC 用户、元数据表和其他系统对象。
以下示例显示如何为数据库 TestDB 启用 CDC:
USE TestDB
GO
EXEC sys.sp_cdc_enable_db
GO
在 SQL Server 表上启用 CDC
SQL Server 管理员必须在您希望 Debezium 捕获的源表上启用更改数据捕获。数据库必须已为 CDC 启用。要在表上启用 CDC,SQL Server 管理员为该表运行存储过程 sys.sp_cdc_enable_table。存储过程可以使用 SQL Server Management Studio 或 Transact-SQL 运行。必须为要捕获的每个表启用 SQL Server CDC。
先决条件:
CDC 在 SQL Server 数据库上启用。
SQL Server 代理正在运行。
您是数据库的 db_owner 固定数据库角色的成员。
程序
从 SQL Server Management Studio 的“查看”菜单中,单击“模板资源管理器”。
在模板浏览器中,展开 SQL Server 模板。
展开更改数据捕获 > 配置,然后单击启用表指定文件组选项。
在模板中,将 USE 语句中的表名替换为您要捕获的表名。
运行存储过程 sys.sp_cdc_enable_table。
以下示例显示如何为表 mgmg 启用 CDC:
示例:为 SQL Server 表启用 CDC
USE TestDB
GO
EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name = N'mgmg',
@role_name = N'NULL',
@filegroup_name = N'MyDB_CT',
@supports_net_changes = 0
GO
示例
USE MyDB
GO
EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name = N'MyTable',
@role_name = N'MyRole',
@filegroup_name = N'MyDB_CT',
@supports_net_changes = 0
GO
指定要捕获的表的名称。
指定角色 MyRole,您可以向该角色添加要授予对源表的捕获列的 SELECT 权限的用户。具有 sysadmin 或 db_owner 角色的用户还可以访问指定的更改表。将 @role_name 的值设置为 NULL,以仅允许 sysadmin 或 db_owner 中的成员对捕获的信息具有完全访问权限。
指定 SQL Server 为捕获的表放置更改表的文件组。命名的文件组必须已经存在。最好不要将更改表放在用于源表的同一文件组中。
Ps文件组
1. 如何查看数据库中所有的文件组。
语法:sp_helpfilegroup
步骤:
use 数据库
sp_helpfilegroup
2. 如何找到文件组和文件的对应情况.
sp_helpdb love
创建文件组。
语法:
alter database 数据库名 add filegroup 文件组名
步骤:
use 数据库名
alter database 数据库名 add filegroup 文件组名
范例:
use love
alter database TestDB add filegroup 财务部
SQL Server 管理员可以运行系统存储过程来查询数据库或表以检索其 CDC 配置信息。存储过程可以使用 SQL Server Management Studio 或 Transact-SQL 运行。
先决条件
您对捕获实例的所有捕获列具有 SELECT 权限。 db_owner 数据库角色的成员可以查看所有已定义捕获实例的信息。
您拥有为查询包括的表信息定义的任何门控角色的成员资格。
程序
从 SQL Server Management Studio 的“查看”菜单中,单击“对象资源管理器”。
在对象资源管理器中,展开数据库,然后展开您的数据库对象,例如 MyDB。
展开可编程性 > 存储过程 > 系统存储过程。
运行 sys.sp_cdc_help_change_data_capture 存储过程来查询表。
查询不应返回空结果。
以下示例在数据库 TestDB 上运行存储过程 sys.sp_cdc_help_change_data_capture:
示例:查询表以获取 CDC 配置信息
USE TestDB;
GO
EXEC sys.sp_cdc_help_change_data_capture
GO
该查询返回数据库中每个表的配置信息,这些表为 CDC 启用并且包含调用者有权访问的更改数据。如果结果为空,请验证用户是否具有访问捕获实例和 CDC 表的权限。
部署Debezium SQL Server连接器
要部署Debezium SQL Server连接器,请安装Debezium SQL Server连接器存档,配置连接器,并通过将其配置添加到Kafka Connect来启动连接器。
先决条件
安装了Apache ZooKeeper、Apache Kafka和Kafka Connect。
SQL Server已安装,已配置为CDC,并准备与Debezium连接器一起使用。
程序
下载Debezium SQL Server连接器插件存档
Wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-sqlserver/1.9.7.Final/debezium-connector-sqlserver-1.9.7.Final-plugin.tar.gz
将文件提取到您的Kafka Connect环境中。
将带有JAR文件的目录添加到Kafka Connect的plugin.path中。
配置连接器并将配置添加到您的Kafka Connect集群中。
重新启动您的Kafka Connect流程以提取新的JAR文件。
解压插件包
tar -xvf debezium-connector-sqlserver-1.9.7.Final-plugin.tar.gz
配置kafka,让kafka知道这个插件.
[root@hhz02 config]# vi connect-distributed.properties
配置注意项:(单机模式启动连接器 connect-standlone.properties)
bootstrap.servers=172.16.120.17:9092
offset.storage.topic=connect-sqlserver-status
offset.storage.replication.factor=1
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
status.storage.topic=connect-sqlserver-statu
status.storage.replication.factor=1
config.storage.topic=connect-sqlserver-config
config.storage.replication.factor=1
group.id=connect-sqlserver
offset.flush.interval.ms=10000
plugin.path=/opt/debezium-connector-sqlserver
rest.port=8083
Ps:
如果kafka开启kerberos 该配置文件还需要添加:
security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka
还需要再启动脚本connect-distributed.sh中添加:
export KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/opt/dtstack/Kafka/kafka/config/kafka_jaas.conf"
注意:集群模式kafka 需要注意配置分发。
重启kafka集群使配置生效。
启动Kafka connector
/opt/dtstack/DTBase/kafka/bin/connect-distributed.sh -daemon /opt/dtstack/DTBase/kafka/config/connect-sqlserver.properties
注意结尾这个文件 集群模式为:connect-distributed.properties,单机模式为:connect-standalone.properties
(如果启动有问题 观察/opt/app/kafka/logs/connectDistributed.out)
检测Kafka connector是否正常工作
1. 检测kafka连接器的服务状态
[root@node01 logs]# curl -H "Accept:application/json" node01:8083/
{"version":"1.1.1","commit":"8e07427ffb493498","kafka_cluster_id":"svXhhAFkSt6xLRPTBIdE1Q"}
2. 检查向 Kafka Connect 注册的连接器列表
[root@hdp01 kafka]# curl -H "Accept:application/json" node01:8083/connectors/
[]
返回空列表, 表示目前还没有注册的连接器
附:删除命令
curl -X DELETE node01:8083/connectors/db2-connector1
SQL Server 连接器配置示例
以下是连接器实例的配置示例,该实例在 172.16.120.17 的端口 1433 上从 SQL Server 服务器捕获数据,我们在逻辑上将其命名为 fullfillment。通常,您通过设置可用于连接器的配置属性,在 JSON 文件中配置 Debezium SQL Server 连接器。
注册连接器
Kafka Connect 服务的 API 提交POST针对/connectors资源的请求,其中包含描述新连接器(称为inventory-connector)的 JSON 文档。
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" node01:8083/connectors/ -d '{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"database.hostname": "172.16.120.17",
"database.port": "1433",
"database.user": "SA",
"database.password": "Dtstack@123",
"database.dbname": "TestDB",
"database.server.name": "fullfillment",
"table.include.list": "dbo.mgmg",
"database.history.kafka.bootstrap.servers": "node01:9092",
"database.history.kafka.topic": "dbhistory.fullfillment"
}
}'
(如果注册有问题 观察/opt/app/kafka/logs/connectDistributed.out)
多看下日志 日志中都可以 写到 如果注册成功,也没有kafka数据 麻烦看下日志WARN中过滤问题。[sourceTableId=DB2INST1.TESTTB, changeTableId=ASNCDC.CDC_DB2INST1_TESTTB
]
查看kafka中的数据
查看所有topic:/opt/dtstack/DTBase/kafka/bin/kafka-topics.sh -list -zookeeper localhost:2181/kafka
消费topic:/opt/dtstack/DTBase/kafka/bin/kafka-console-consumer.sh --bootstrap-server node01:9092 --from-beginning --topic fullfillment.dbo.mgmg