Flume抽取到kafka数据对比测试

耀灵2年前技术文章509

一、前言

同一台机器两个flume进程抽取同一个目录下日志到kafka,对比kafka中数据量

二、创建测试topic

1、主集群创建topic --tes3

kafka-topics --create --zookeeper 10.129.16.2:2181 --replication-factor 3 --partitions 3 --topic yt-log-collect-tes3

2、备集群创建topic--tes4

kafka-topics --create --zookeeper 10.129.16.32:2181 --replication-factor 3 --partitions 3 --topic yt-log-collect-tes4


三、创建flume配置文件

1、修改to_kafka_master.conf配置文件

[root@hzyg-ip-bd07 conf]# vim to_kafka_master.conf 
#flume-ng agent --conf conf --conf-file /root/flume-conf/to_kafka.conf --name a1 -Dflume.root.logger=INFO,console
#nohup ./flume-ng agent -n a1 -c /flume/apache-flume-1.9.0-bin/conf -f /flume/apache-flume-1.9.0-bin/conf/to_kafka.conf -Dflume.root.logger=INFO,console  &
a1.channels=c2
a1.sources=s2
a1.sinks=k2

a1.sources.s2.type=TAILDIR
a1.sources.s2.positionFile =/tmp/newreport/taildir_position.json
a1.sources.s2.filegroups =f1 f2
a1.sources.s2.filegroups.f1 =/tmp/newreport/log_.*._v2
a1.sources.s2.filegroups.f2 =/tmp/newreport/log_.*._test
#如果日志流量很大,适当增加此值可以增大Source的吞吐量,但是不能超过Channel的capacity和transactionCapacity的限制
a1.sources.s2.batchSize  = 100
#默认为3000ms
a1.sources.s2.writePosInterval  = 1000
#a1.sources.s2.maxBackoffSleep  = 5000
a1.sources.s2.fileHeader = true


a1.channels.c2.type=memory
a1.channels.c2.capacity=5000  ##channel中最多缓存多少
a1.channels.c2.transactionCapacity=2000  #channel一次最多吐给sink多少
a1.channels.c2.keep-alive = 10

a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k2.kafka.bootstrap.servers=hzyg-ip-bd28:9092,hzyg-ip-bd29:9092,hzyg-ip-bd30:9092
a1.sinks.k2.kafka.topic=yt-log-collect-tes3
a1.sinks.k2.kafka.flumeBatchSize = 20

a1.sinks.k2.channel=c2
a1.sources.s2.channels=c2

2、修改to_kafka_slave.conf配置文件

[root@hzyg-ip-bd07 conf]# vim to_kafka_slave.conf 
#flume-ng agent --conf conf --conf-file /root/flume-conf/to_kafka.conf --name a1 -Dflume.root.logger=INFO,console
#nohup ./flume-ng agent -n a1 -c /flume/apache-flume-1.9.0-bin/conf -f /flume/apache-flume-1.9.0-bin/conf/to_kafka.conf -Dflume.root.logger=INFO,console  &
a1.channels=c1
a1.sources=s1
a1.sinks=k1

a1.sources.s1.type=TAILDIR
a1.sources.s1.positionFile =/tmp/newreport/taildir_new_position.json
a1.sources.s1.filegroups =f1 f2
a1.sources.s1.filegroups.f1 =/tmp/newreport/log_.*._v2
a1.sources.s1.filegroups.f2 =/tmp/newreport/log_.*._test
#如果日志流量很大,适当增加此值可以增大Source的吞吐量,但是不能超过Channel的capacity和transactionCapacity的限制
a1.sources.s1.batchSize  = 100
#默认为3000ms
a1.sources.s1.writePosInterval  = 1000
#a1.sources.s2.maxBackoffSleep  = 5000
a1.sources.s1.fileHeader = true


a1.channels.c1.type=memory
a1.channels.c1.capacity=10000  ##channel中最多缓存多少
a1.channels.c1.transactionCapacity=5000  #channel一次最多吐给sink多少
a1.channels.c1.keep-alive = 50

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers=hzyg-ip-bd40:9092,hzyg-ip-bd41:9092,hzyg-ip-bd42:9092
#a1.sinks.r.metadata.broker.list=61.49.60.102:9092,61.49.60.103:9092,61.49.60.104:9092
a1.sinks.k1.kafka.topic=yt-log-collect-tes4
a1.sinks.k1.kafka.flumeBatchSize = 100
a1.sinks.k1.kafka.producer.acks = 1

a1.sinks.k1.channel=c1
a1.sources.s1.channels=c1

3、启动指定to_kafka_master.conf配置文件的flume进程

nohup flume-ng agent -n a1 -c /root/flume/apache-flume-1.9.0-bin/conf -f /root/flume/apache-flume-1.9.0-bin/conf/to_kafka_master.conf -Dflume.root.logger=INFO,console  &

4、启动指定to_kafka_slave.conf配置文件的flume进程

nohup flume-ng agent -n a1 -c /root/newflume/apache-flume-1.9.0-bin/conf/ -f /root/newflume/apache-flume-1.9.0-bin/conf/to_kafka_slave.conf  -Dflume.root.logger=INFO,console &

5、如果配置prometheus+grafana监控,启动命令可以加如下参数

-Dflume.monitoring.type=http -Dflume.monitoring.port=34545

nohup flume-ng agent --conf conf --conf-file /root/flume-log-collect/yt-log-kafka-2-hdfs.conf -Dflume.monitoring.type=http -Dflume.monitoring.port=34545 --name a2 -Dflume.root.logger=INFO,console &

6、推到master kafka数据量统计

[root@hzyg-ip-bd06 ~]# kafka-run-class kafka.tools.GetOffsetShell --broker-list 10.129.16.28:9092 --topic yt-log-collect-tes3 --time -1
yt-log-collect-tes3:0:5115802
yt-log-collect-tes3:1:5115802
yt-log-collect-tes3:2:5115802

1.png

7、推到slave kafka数据量统计

[root@hzyg-ip-bd32 ~]# kafka-run-class kafka.tools.GetOffsetShell --broker-list 10.129.16.40:9092 --topic yt-log-collect-tes4 --time -1
yt-log-collect-tes4:0:5161642
yt-log-collect-tes4:1:5161642
yt-log-collect-tes4:2:5161643

2.png

四、数据存在偏差

3.png

五、两个配置文件合成一个,测试结果


配置文件如下:

#flume-ng agent --conf conf --conf-file /root/flume-conf/to_kafka.conf --name a1 -Dflume.root.logger=INFO,console
#nohup ./flume-ng agent -n a1 -c /flume/apache-flume-1.9.0-bin/conf -f /flume/apache-flume-1.9.0-bin/conf/to_kafka.conf -Dflume.root.logger=INFO,console  &

a1.sources=s1 s2
a1.sinks=k1 k2
a1.channels=c1 c2

a1.sources.s1.type=TAILDIR
a1.sources.s1.positionFile =/tmp/newreport/taildir_new_position.json
a1.sources.s1.filegroups =f1 f2
a1.sources.s1.filegroups.f1 =/tmp/newreport/log_.*._v2
a1.sources.s1.filegroups.f2 =/tmp/newreport/log_.*._test

a1.sources.s2.type=TAILDIR
a1.sources.s2.positionFile =/tmp/newreport/taildir_position.json
a1.sources.s2.filegroups =f1 f2
a1.sources.s2.filegroups.f1 =/tmp/newreport/log_.*._v2
a1.sources.s2.filegroups.f2 =/tmp/newreport/log_.*._test
#如果日志流量很大,适当增加此值可以增大Source的吞吐量,但是不能超过Channel的capacity和transactionCapacity的限制
a1.sources.s2.batchSize  = 100
#默认为3000ms
a1.sources.s2.writePosInterval  = 1000
#a1.sources.s2.maxBackoffSleep  = 5000
a1.sources.s2.fileHeader = true
#设置Memory Channel

#slave
a1.channels.c1.type=memory
a1.channels.c1.capacity=10000  ##channel中最多缓存多少
a1.channels.c1.transactionCapacity=5000  #channel一次最多吐给sink多少
a1.channels.c1.keep-alive = 50


a1.channels.c2.type=memory
a1.channels.c2.capacity=5000  ##channel中最多缓存多少
a1.channels.c2.transactionCapacity=2000  #channel一次最多吐给sink多少
a1.channels.c2.keep-alive = 10

##发送到kafka
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers=hzyg-ip-bd40:9092,hzyg-ip-bd41:9092,hzyg-ip-bd42:9092
a1.sinks.k1.kafka.topic=yt-log-collect-tes8
a1.sinks.k1.kafka.flumeBatchSize = 100
#a1.sinks.k1.kafka.producer.acks = 1

a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k2.kafka.bootstrap.servers=hzyg-ip-bd28:9092,hzyg-ip-bd29:9092,hzyg-ip-bd30:9092
a1.sinks.k2.kafka.topic=yt-log-collect-tes7
a1.sinks.k2.kafka.flumeBatchSize = 20
a1.sinks.k1.channel=c1
a1.sources.s1.channels=c1
a1.sinks.k2.channel=c2
a1.sources.s2.channels=c2

4.png



相关文章

ubuntu20.04服务器安全策略设定

ubuntu20.04服务器安全策略设定

密码策略1、经核查,服务器用户身份标识唯一,口令存储在服务器中采用SHA512算法,服务器配置口令复杂度,口令要求8位以上,字母、数字、特殊字符组成,口令180天定期更换。# SHA512算法查看ca...

企业Oracle RAC上云闲谈

企业Oracle RAC上云闲谈

随着计算机技术和互联网的不断推进,云计算平台也更加趋于稳定、安全,其显著的性能、方便的资源管理、快捷的应用部署方式越来越为IT业者所接受。目前,云计算已经成为企业数字化转型的重要驱动力。面对基于Ora...

MySQL运维实战(4.2) 关于SQL_MODE

早期mysql对一些不符合SQL标准的SQL语句和数据的容忍度比较高。mysql 5.7 修改了默认sql mode。系统从低版本升级或迁移到高版本时,需要经过全面的测试,避免影响程序的正常运行。5....

Go 链表的实现

Go 链表的实现

链表是一种物理存储单元上非连续、非顺序的存储结构,数据元素的逻辑顺序是通过链表中的指针链接次序实现的。链表由一系列结点(链表中每一个元素称为结点)组成,结点可以在运行时动态生成。每个结点包括两个部分:...

python-日志分析

1、概述生产中会生成大量的系统日志、应用程序日志、安全日志等等日志,通过对日志的分析可以了解服务器的负载、健康状况,可以分析客户的分布情况、客户的行为,甚至基于这些分析可以做出预测。一般采集流程:日志...

MySQL运维实战之备份和恢复(8.8)恢复单表

xtrabackup支持单表恢复。如果一个表使用了独立表空间(innodb_file_per_table=1),就可以单独恢复这个表。1、Prepareprepare时带上参数--export,xtr...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。