Flume抽取到kafka数据对比测试

耀灵2年前技术文章515

一、前言

同一台机器两个flume进程抽取同一个目录下日志到kafka,对比kafka中数据量

二、创建测试topic

1、主集群创建topic --tes3

kafka-topics --create --zookeeper 10.129.16.2:2181 --replication-factor 3 --partitions 3 --topic yt-log-collect-tes3

2、备集群创建topic--tes4

kafka-topics --create --zookeeper 10.129.16.32:2181 --replication-factor 3 --partitions 3 --topic yt-log-collect-tes4


三、创建flume配置文件

1、修改to_kafka_master.conf配置文件

[root@hzyg-ip-bd07 conf]# vim to_kafka_master.conf 
#flume-ng agent --conf conf --conf-file /root/flume-conf/to_kafka.conf --name a1 -Dflume.root.logger=INFO,console
#nohup ./flume-ng agent -n a1 -c /flume/apache-flume-1.9.0-bin/conf -f /flume/apache-flume-1.9.0-bin/conf/to_kafka.conf -Dflume.root.logger=INFO,console  &
a1.channels=c2
a1.sources=s2
a1.sinks=k2

a1.sources.s2.type=TAILDIR
a1.sources.s2.positionFile =/tmp/newreport/taildir_position.json
a1.sources.s2.filegroups =f1 f2
a1.sources.s2.filegroups.f1 =/tmp/newreport/log_.*._v2
a1.sources.s2.filegroups.f2 =/tmp/newreport/log_.*._test
#如果日志流量很大,适当增加此值可以增大Source的吞吐量,但是不能超过Channel的capacity和transactionCapacity的限制
a1.sources.s2.batchSize  = 100
#默认为3000ms
a1.sources.s2.writePosInterval  = 1000
#a1.sources.s2.maxBackoffSleep  = 5000
a1.sources.s2.fileHeader = true


a1.channels.c2.type=memory
a1.channels.c2.capacity=5000  ##channel中最多缓存多少
a1.channels.c2.transactionCapacity=2000  #channel一次最多吐给sink多少
a1.channels.c2.keep-alive = 10

a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k2.kafka.bootstrap.servers=hzyg-ip-bd28:9092,hzyg-ip-bd29:9092,hzyg-ip-bd30:9092
a1.sinks.k2.kafka.topic=yt-log-collect-tes3
a1.sinks.k2.kafka.flumeBatchSize = 20

a1.sinks.k2.channel=c2
a1.sources.s2.channels=c2

2、修改to_kafka_slave.conf配置文件

[root@hzyg-ip-bd07 conf]# vim to_kafka_slave.conf 
#flume-ng agent --conf conf --conf-file /root/flume-conf/to_kafka.conf --name a1 -Dflume.root.logger=INFO,console
#nohup ./flume-ng agent -n a1 -c /flume/apache-flume-1.9.0-bin/conf -f /flume/apache-flume-1.9.0-bin/conf/to_kafka.conf -Dflume.root.logger=INFO,console  &
a1.channels=c1
a1.sources=s1
a1.sinks=k1

a1.sources.s1.type=TAILDIR
a1.sources.s1.positionFile =/tmp/newreport/taildir_new_position.json
a1.sources.s1.filegroups =f1 f2
a1.sources.s1.filegroups.f1 =/tmp/newreport/log_.*._v2
a1.sources.s1.filegroups.f2 =/tmp/newreport/log_.*._test
#如果日志流量很大,适当增加此值可以增大Source的吞吐量,但是不能超过Channel的capacity和transactionCapacity的限制
a1.sources.s1.batchSize  = 100
#默认为3000ms
a1.sources.s1.writePosInterval  = 1000
#a1.sources.s2.maxBackoffSleep  = 5000
a1.sources.s1.fileHeader = true


a1.channels.c1.type=memory
a1.channels.c1.capacity=10000  ##channel中最多缓存多少
a1.channels.c1.transactionCapacity=5000  #channel一次最多吐给sink多少
a1.channels.c1.keep-alive = 50

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers=hzyg-ip-bd40:9092,hzyg-ip-bd41:9092,hzyg-ip-bd42:9092
#a1.sinks.r.metadata.broker.list=61.49.60.102:9092,61.49.60.103:9092,61.49.60.104:9092
a1.sinks.k1.kafka.topic=yt-log-collect-tes4
a1.sinks.k1.kafka.flumeBatchSize = 100
a1.sinks.k1.kafka.producer.acks = 1

a1.sinks.k1.channel=c1
a1.sources.s1.channels=c1

3、启动指定to_kafka_master.conf配置文件的flume进程

nohup flume-ng agent -n a1 -c /root/flume/apache-flume-1.9.0-bin/conf -f /root/flume/apache-flume-1.9.0-bin/conf/to_kafka_master.conf -Dflume.root.logger=INFO,console  &

4、启动指定to_kafka_slave.conf配置文件的flume进程

nohup flume-ng agent -n a1 -c /root/newflume/apache-flume-1.9.0-bin/conf/ -f /root/newflume/apache-flume-1.9.0-bin/conf/to_kafka_slave.conf  -Dflume.root.logger=INFO,console &

5、如果配置prometheus+grafana监控,启动命令可以加如下参数

-Dflume.monitoring.type=http -Dflume.monitoring.port=34545

nohup flume-ng agent --conf conf --conf-file /root/flume-log-collect/yt-log-kafka-2-hdfs.conf -Dflume.monitoring.type=http -Dflume.monitoring.port=34545 --name a2 -Dflume.root.logger=INFO,console &

6、推到master kafka数据量统计

[root@hzyg-ip-bd06 ~]# kafka-run-class kafka.tools.GetOffsetShell --broker-list 10.129.16.28:9092 --topic yt-log-collect-tes3 --time -1
yt-log-collect-tes3:0:5115802
yt-log-collect-tes3:1:5115802
yt-log-collect-tes3:2:5115802

1.png

7、推到slave kafka数据量统计

[root@hzyg-ip-bd32 ~]# kafka-run-class kafka.tools.GetOffsetShell --broker-list 10.129.16.40:9092 --topic yt-log-collect-tes4 --time -1
yt-log-collect-tes4:0:5161642
yt-log-collect-tes4:1:5161642
yt-log-collect-tes4:2:5161643

2.png

四、数据存在偏差

3.png

五、两个配置文件合成一个,测试结果


配置文件如下:

#flume-ng agent --conf conf --conf-file /root/flume-conf/to_kafka.conf --name a1 -Dflume.root.logger=INFO,console
#nohup ./flume-ng agent -n a1 -c /flume/apache-flume-1.9.0-bin/conf -f /flume/apache-flume-1.9.0-bin/conf/to_kafka.conf -Dflume.root.logger=INFO,console  &

a1.sources=s1 s2
a1.sinks=k1 k2
a1.channels=c1 c2

a1.sources.s1.type=TAILDIR
a1.sources.s1.positionFile =/tmp/newreport/taildir_new_position.json
a1.sources.s1.filegroups =f1 f2
a1.sources.s1.filegroups.f1 =/tmp/newreport/log_.*._v2
a1.sources.s1.filegroups.f2 =/tmp/newreport/log_.*._test

a1.sources.s2.type=TAILDIR
a1.sources.s2.positionFile =/tmp/newreport/taildir_position.json
a1.sources.s2.filegroups =f1 f2
a1.sources.s2.filegroups.f1 =/tmp/newreport/log_.*._v2
a1.sources.s2.filegroups.f2 =/tmp/newreport/log_.*._test
#如果日志流量很大,适当增加此值可以增大Source的吞吐量,但是不能超过Channel的capacity和transactionCapacity的限制
a1.sources.s2.batchSize  = 100
#默认为3000ms
a1.sources.s2.writePosInterval  = 1000
#a1.sources.s2.maxBackoffSleep  = 5000
a1.sources.s2.fileHeader = true
#设置Memory Channel

#slave
a1.channels.c1.type=memory
a1.channels.c1.capacity=10000  ##channel中最多缓存多少
a1.channels.c1.transactionCapacity=5000  #channel一次最多吐给sink多少
a1.channels.c1.keep-alive = 50


a1.channels.c2.type=memory
a1.channels.c2.capacity=5000  ##channel中最多缓存多少
a1.channels.c2.transactionCapacity=2000  #channel一次最多吐给sink多少
a1.channels.c2.keep-alive = 10

##发送到kafka
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers=hzyg-ip-bd40:9092,hzyg-ip-bd41:9092,hzyg-ip-bd42:9092
a1.sinks.k1.kafka.topic=yt-log-collect-tes8
a1.sinks.k1.kafka.flumeBatchSize = 100
#a1.sinks.k1.kafka.producer.acks = 1

a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k2.kafka.bootstrap.servers=hzyg-ip-bd28:9092,hzyg-ip-bd29:9092,hzyg-ip-bd30:9092
a1.sinks.k2.kafka.topic=yt-log-collect-tes7
a1.sinks.k2.kafka.flumeBatchSize = 20
a1.sinks.k1.channel=c1
a1.sources.s1.channels=c1
a1.sinks.k2.channel=c2
a1.sources.s2.channels=c2

4.png



相关文章

使用helm在k8s集群部署rancher

使用helm在k8s集群部署rancher由于我们的k8s版本是1.22,所以我们直接安装latest版本的rancher。不同版本的rancher helm仓库可以看下面链接https://docs...

MySQL系统空间上涨

MySQL系统空间上涨

系统空间上涨一、介绍MySQL 的 ibdata1 是一个用来构建 innodb 系统表空间的文件。该文件包含了 undo ,还包含在用户在系统表空间创建的表信息和索引数据。一般如果数据空间均为独立表...

在kubernetes中,让某个node成为专属节点

如何让node 去”选择”只有谁(pod)能部署到自身上面?看了下现有的Node Selectors、Node Affinity、Node Taints, 经过比对,发现Node Taints 更适合...

apache Hbase2.x  使用hbck2修复工具

apache Hbase2.x 使用hbck2修复工具

1、背景默认情况下apache hbase 使用hbck2时,无法使用-j 来加载hbck2的jar包,无法进行修复2、解决办法是由于默认情况下只使用自带的hbase hbck修复命令,大部分功能在2...

HBase HBCK运维指南

HBase HBCK运维指南

HBase HBCK是HBase运维人员经常会用到的一个HBase运维工具,主要是用于检查 HBase region等元数据一致性以及修复的工具。目前HBCK工具有两个版本,本次主要介绍用于HBase...

Dockerfile

Dockerfile

一、什么是镜像?镜像可以看成是由多个镜像层叠加起来的一个文件系统(通过UnionFS与AUFS文件联合系统实现),镜像层也可以简单理解为一个基本的镜像,而每个镜像层之间通过指针的形式进行叠加。根据上图...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。