Flume抽取到kafka数据对比测试
一、前言
同一台机器两个flume进程抽取同一个目录下日志到kafka,对比kafka中数据量
二、创建测试topic
1、主集群创建topic --tes3
kafka-topics --create --zookeeper 10.129.16.2:2181 --replication-factor 3 --partitions 3 --topic yt-log-collect-tes3
2、备集群创建topic--tes4
kafka-topics --create --zookeeper 10.129.16.32:2181 --replication-factor 3 --partitions 3 --topic yt-log-collect-tes4
三、创建flume配置文件
1、修改to_kafka_master.conf配置文件
[root@hzyg-ip-bd07 conf]# vim to_kafka_master.conf #flume-ng agent --conf conf --conf-file /root/flume-conf/to_kafka.conf --name a1 -Dflume.root.logger=INFO,console #nohup ./flume-ng agent -n a1 -c /flume/apache-flume-1.9.0-bin/conf -f /flume/apache-flume-1.9.0-bin/conf/to_kafka.conf -Dflume.root.logger=INFO,console & a1.channels=c2 a1.sources=s2 a1.sinks=k2 a1.sources.s2.type=TAILDIR a1.sources.s2.positionFile =/tmp/newreport/taildir_position.json a1.sources.s2.filegroups =f1 f2 a1.sources.s2.filegroups.f1 =/tmp/newreport/log_.*._v2 a1.sources.s2.filegroups.f2 =/tmp/newreport/log_.*._test #如果日志流量很大,适当增加此值可以增大Source的吞吐量,但是不能超过Channel的capacity和transactionCapacity的限制 a1.sources.s2.batchSize = 100 #默认为3000ms a1.sources.s2.writePosInterval = 1000 #a1.sources.s2.maxBackoffSleep = 5000 a1.sources.s2.fileHeader = true a1.channels.c2.type=memory a1.channels.c2.capacity=5000 ##channel中最多缓存多少 a1.channels.c2.transactionCapacity=2000 #channel一次最多吐给sink多少 a1.channels.c2.keep-alive = 10 a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k2.kafka.bootstrap.servers=hzyg-ip-bd28:9092,hzyg-ip-bd29:9092,hzyg-ip-bd30:9092 a1.sinks.k2.kafka.topic=yt-log-collect-tes3 a1.sinks.k2.kafka.flumeBatchSize = 20 a1.sinks.k2.channel=c2 a1.sources.s2.channels=c2
2、修改to_kafka_slave.conf配置文件
[root@hzyg-ip-bd07 conf]# vim to_kafka_slave.conf #flume-ng agent --conf conf --conf-file /root/flume-conf/to_kafka.conf --name a1 -Dflume.root.logger=INFO,console #nohup ./flume-ng agent -n a1 -c /flume/apache-flume-1.9.0-bin/conf -f /flume/apache-flume-1.9.0-bin/conf/to_kafka.conf -Dflume.root.logger=INFO,console & a1.channels=c1 a1.sources=s1 a1.sinks=k1 a1.sources.s1.type=TAILDIR a1.sources.s1.positionFile =/tmp/newreport/taildir_new_position.json a1.sources.s1.filegroups =f1 f2 a1.sources.s1.filegroups.f1 =/tmp/newreport/log_.*._v2 a1.sources.s1.filegroups.f2 =/tmp/newreport/log_.*._test #如果日志流量很大,适当增加此值可以增大Source的吞吐量,但是不能超过Channel的capacity和transactionCapacity的限制 a1.sources.s1.batchSize = 100 #默认为3000ms a1.sources.s1.writePosInterval = 1000 #a1.sources.s2.maxBackoffSleep = 5000 a1.sources.s1.fileHeader = true a1.channels.c1.type=memory a1.channels.c1.capacity=10000 ##channel中最多缓存多少 a1.channels.c1.transactionCapacity=5000 #channel一次最多吐给sink多少 a1.channels.c1.keep-alive = 50 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.bootstrap.servers=hzyg-ip-bd40:9092,hzyg-ip-bd41:9092,hzyg-ip-bd42:9092 #a1.sinks.r.metadata.broker.list=61.49.60.102:9092,61.49.60.103:9092,61.49.60.104:9092 a1.sinks.k1.kafka.topic=yt-log-collect-tes4 a1.sinks.k1.kafka.flumeBatchSize = 100 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.channel=c1 a1.sources.s1.channels=c1
3、启动指定to_kafka_master.conf配置文件的flume进程
nohup flume-ng agent -n a1 -c /root/flume/apache-flume-1.9.0-bin/conf -f /root/flume/apache-flume-1.9.0-bin/conf/to_kafka_master.conf -Dflume.root.logger=INFO,console &
4、启动指定to_kafka_slave.conf配置文件的flume进程
nohup flume-ng agent -n a1 -c /root/newflume/apache-flume-1.9.0-bin/conf/ -f /root/newflume/apache-flume-1.9.0-bin/conf/to_kafka_slave.conf -Dflume.root.logger=INFO,console &
5、如果配置prometheus+grafana监控,启动命令可以加如下参数
-Dflume.monitoring.type=http -Dflume.monitoring.port=34545
nohup flume-ng agent --conf conf --conf-file /root/flume-log-collect/yt-log-kafka-2-hdfs.conf -Dflume.monitoring.type=http -Dflume.monitoring.port=34545 --name a2 -Dflume.root.logger=INFO,console &
6、推到master kafka数据量统计
[root@hzyg-ip-bd06 ~]# kafka-run-class kafka.tools.GetOffsetShell --broker-list 10.129.16.28:9092 --topic yt-log-collect-tes3 --time -1 yt-log-collect-tes3:0:5115802 yt-log-collect-tes3:1:5115802 yt-log-collect-tes3:2:5115802
7、推到slave kafka数据量统计
[root@hzyg-ip-bd32 ~]# kafka-run-class kafka.tools.GetOffsetShell --broker-list 10.129.16.40:9092 --topic yt-log-collect-tes4 --time -1 yt-log-collect-tes4:0:5161642 yt-log-collect-tes4:1:5161642 yt-log-collect-tes4:2:5161643
四、数据存在偏差
五、两个配置文件合成一个,测试结果
配置文件如下:
#flume-ng agent --conf conf --conf-file /root/flume-conf/to_kafka.conf --name a1 -Dflume.root.logger=INFO,console #nohup ./flume-ng agent -n a1 -c /flume/apache-flume-1.9.0-bin/conf -f /flume/apache-flume-1.9.0-bin/conf/to_kafka.conf -Dflume.root.logger=INFO,console & a1.sources=s1 s2 a1.sinks=k1 k2 a1.channels=c1 c2 a1.sources.s1.type=TAILDIR a1.sources.s1.positionFile =/tmp/newreport/taildir_new_position.json a1.sources.s1.filegroups =f1 f2 a1.sources.s1.filegroups.f1 =/tmp/newreport/log_.*._v2 a1.sources.s1.filegroups.f2 =/tmp/newreport/log_.*._test a1.sources.s2.type=TAILDIR a1.sources.s2.positionFile =/tmp/newreport/taildir_position.json a1.sources.s2.filegroups =f1 f2 a1.sources.s2.filegroups.f1 =/tmp/newreport/log_.*._v2 a1.sources.s2.filegroups.f2 =/tmp/newreport/log_.*._test #如果日志流量很大,适当增加此值可以增大Source的吞吐量,但是不能超过Channel的capacity和transactionCapacity的限制 a1.sources.s2.batchSize = 100 #默认为3000ms a1.sources.s2.writePosInterval = 1000 #a1.sources.s2.maxBackoffSleep = 5000 a1.sources.s2.fileHeader = true #设置Memory Channel #slave a1.channels.c1.type=memory a1.channels.c1.capacity=10000 ##channel中最多缓存多少 a1.channels.c1.transactionCapacity=5000 #channel一次最多吐给sink多少 a1.channels.c1.keep-alive = 50 a1.channels.c2.type=memory a1.channels.c2.capacity=5000 ##channel中最多缓存多少 a1.channels.c2.transactionCapacity=2000 #channel一次最多吐给sink多少 a1.channels.c2.keep-alive = 10 ##发送到kafka a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.bootstrap.servers=hzyg-ip-bd40:9092,hzyg-ip-bd41:9092,hzyg-ip-bd42:9092 a1.sinks.k1.kafka.topic=yt-log-collect-tes8 a1.sinks.k1.kafka.flumeBatchSize = 100 #a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k2.kafka.bootstrap.servers=hzyg-ip-bd28:9092,hzyg-ip-bd29:9092,hzyg-ip-bd30:9092 a1.sinks.k2.kafka.topic=yt-log-collect-tes7 a1.sinks.k2.kafka.flumeBatchSize = 20 a1.sinks.k1.channel=c1 a1.sources.s1.channels=c1 a1.sinks.k2.channel=c2 a1.sources.s2.channels=c2