Kafka指标性能分析

南墨2年前技术文章1345

1)      根据group组查看消费情况(数值不准,仅可参考)

根据之前的分析,只有driverconsumer会提交offset,因此通过

./kafka-consumer-groups.sh命令能够查看对应分区的消费情况,如下图:

1.png

对应时间段的批次执行时间。

2.png

这样能够看出,这个时间段消费的数据是从哪个offset到哪个offset的数据段,可以根据这个时间看出是否有数据挤压。

注意:

1, 只有在consumer的配置文件中设置了enable.auto.commit=true才能使用./kafka-consumer-groups.sh命令查看,spark内部并没有自动提交这个offset的机制。

2, LAG的数值在这里是不准确的,因为使用自动提交后有提交时间间隔,因此存在时间不同步的情况。LAG值仅可作为参考。

2)      抽样最新的数据

可以使用新的消费命令查看,最新的数据的offset是哪个,如下图:

./kafka-verifiable-consumer.sh  --topic testsparktopic --group-id test_$(date +%s)

 --verbose --consumer.config ../config/consumer.properties --broker-list kafka业务ip:21007  --reset-policy latest --max-messages 100

其中核心参数含义如下:

--topic 为要抽样的topic名称

--group-id 本次消费的groupid,要求每次获取最新的offset需要使用最新的groupID

--verbose  是否打印出具体的数据

--consumer.config 消费者的配置文件路径

--reset-policy 消费offset得到策略

--max-messages 消费多少条数据,若指定为-1则会一直消费直到手动停止消费任务

执行后消费情况如下:

3.png

核心的数据结构如下:

    Timestamp:为时间戳,默认是数据发送时间。

    Key:数据的key值,若不设置则为null

    Value:数据的值。

Topictopic名称。

Partition:消费数据所属的分区编号。

Offset:消费数据的偏移量。

 

说明:由于每次需要获取的offset一定是最新的,因此,每次消费的时候必须使用最新的groupID

3)      消费指定位置的数据,并且查看时间消息的生产时间

当排查问题涉及到数据延时的时候,需要分析一下当时数据产生的时间是什么时候,这个时候需要分析一下数据生产的时间和消费批次时间之间的对应关系。

假设需要定位:3分区,1000~2000之间的数据信息。

(1)     消费100条数据,设置group组为testgroup,让消费者组生效

./kafka-verifiable-consumer.sh  --topic testsparktopic  --broker-list 189.39.234.219:21007 --verbose --consumer.config ../config/consumer.properties --max-messages 100 --group-id testgroup

4.png

(2)     使用kafka-consumer-group命令看一下对应的消费状况

5.png

(3)     重置offset,将组内的所有offset重置到1000

 ./kafka-consumer-groups.sh --bootstrap-server 189.39.234.219:21007 --new-consumer --to-offset 1000 --reset-offsets --group testgroup --command-config ../config/consumer.properties --all-topics

 6.png

(4)     再消费2000条数据

 ./kafka-verifiable-consumer.sh  --topic testsparktopic  --broker-list 189.39.234.219:21007 --verbose --consumer.config ../config/consumer.properties --max-messages 2000 --group-id testgroup

6.png

此处可以看到消费结果。

(5)     查找offset=2000的时间戳为1585316904936,这个时间即为这条数据开始发送的时间。

7.png

 


相关文章

开源大数据集群部署(九)Ranger审计日志集成(solr)

开源大数据集群部署(九)Ranger审计日志集成(solr)

1、下载solr安装包并解压包tar -xzvf solr-8.11.2.gzcd solr-8.11.2执行安装脚本./bin/install_solr_service.sh /opt/solr-8...

kafka节点数规划

按磁盘容量规划节点数Kafka的数据存放在本地磁盘,建议使用SAS盘,提供较高磁盘IO,以提高Kafka吞吐量。在本规划基于的硬件规格下,单节点平均吞吐量参考值为读300MB/s,写150MB/s。数...

mysql双主更改为主从架构分析

mysql双主更改为主从架构分析

客户需求客户业务运行的在mysql双主架构上,因为客户经常误操作触发双写,导致数据不一致,对业务的稳定运行造成加大的影响。客户现有数据库架构图解决方案基于客户业务和底层数据库架构实际情况,云掣科技提供...

数据湖技术之iceberg(八)Spark与Iceberg整合DDL操作

数据湖技术之iceberg(八)Spark与Iceberg整合DDL操作

1.CREATE TABLE 创建表Create table 创建Iceberg表,创建表不仅可以创建普通表还可以创建分区表,再向分区表中插入一批数据时,必须对数据中分区列进行排序,否则会出现文件关闭...

网络策略NetworkPolicy

网络策略NetworkPolicy

目的:为了实现细粒度的容器间网络访问隔离策略。引用:1.3版本NetworkPolicy机制 -> 1.8版本networking.k8s.io/v1稳定版本功能:对pod、ns之间网络通信限制...

Ubuntu 网卡启动及配置

Ubuntu 网卡启动及配置

问题分析打开虚拟机后发现没有网卡网络。查看网卡信息sudo ip link set ens33 up1得到本机的所有网卡信息,例如我这边网卡为ens33启动网卡启动网卡后发现依然网卡没有IP地址。配置...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。