Flume抽取到kafka数据对比测试

耀灵1年前技术文章438

一、前言

同一台机器两个flume进程抽取同一个目录下日志到kafka,对比kafka中数据量

二、创建测试topic

1、主集群创建topic --tes3

kafka-topics --create --zookeeper 10.129.16.2:2181 --replication-factor 3 --partitions 3 --topic yt-log-collect-tes3

2、备集群创建topic--tes4

kafka-topics --create --zookeeper 10.129.16.32:2181 --replication-factor 3 --partitions 3 --topic yt-log-collect-tes4


三、创建flume配置文件

1、修改to_kafka_master.conf配置文件

[root@hzyg-ip-bd07 conf]# vim to_kafka_master.conf 
#flume-ng agent --conf conf --conf-file /root/flume-conf/to_kafka.conf --name a1 -Dflume.root.logger=INFO,console
#nohup ./flume-ng agent -n a1 -c /flume/apache-flume-1.9.0-bin/conf -f /flume/apache-flume-1.9.0-bin/conf/to_kafka.conf -Dflume.root.logger=INFO,console  &
a1.channels=c2
a1.sources=s2
a1.sinks=k2

a1.sources.s2.type=TAILDIR
a1.sources.s2.positionFile =/tmp/newreport/taildir_position.json
a1.sources.s2.filegroups =f1 f2
a1.sources.s2.filegroups.f1 =/tmp/newreport/log_.*._v2
a1.sources.s2.filegroups.f2 =/tmp/newreport/log_.*._test
#如果日志流量很大,适当增加此值可以增大Source的吞吐量,但是不能超过Channel的capacity和transactionCapacity的限制
a1.sources.s2.batchSize  = 100
#默认为3000ms
a1.sources.s2.writePosInterval  = 1000
#a1.sources.s2.maxBackoffSleep  = 5000
a1.sources.s2.fileHeader = true


a1.channels.c2.type=memory
a1.channels.c2.capacity=5000  ##channel中最多缓存多少
a1.channels.c2.transactionCapacity=2000  #channel一次最多吐给sink多少
a1.channels.c2.keep-alive = 10

a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k2.kafka.bootstrap.servers=hzyg-ip-bd28:9092,hzyg-ip-bd29:9092,hzyg-ip-bd30:9092
a1.sinks.k2.kafka.topic=yt-log-collect-tes3
a1.sinks.k2.kafka.flumeBatchSize = 20

a1.sinks.k2.channel=c2
a1.sources.s2.channels=c2

2、修改to_kafka_slave.conf配置文件

[root@hzyg-ip-bd07 conf]# vim to_kafka_slave.conf 
#flume-ng agent --conf conf --conf-file /root/flume-conf/to_kafka.conf --name a1 -Dflume.root.logger=INFO,console
#nohup ./flume-ng agent -n a1 -c /flume/apache-flume-1.9.0-bin/conf -f /flume/apache-flume-1.9.0-bin/conf/to_kafka.conf -Dflume.root.logger=INFO,console  &
a1.channels=c1
a1.sources=s1
a1.sinks=k1

a1.sources.s1.type=TAILDIR
a1.sources.s1.positionFile =/tmp/newreport/taildir_new_position.json
a1.sources.s1.filegroups =f1 f2
a1.sources.s1.filegroups.f1 =/tmp/newreport/log_.*._v2
a1.sources.s1.filegroups.f2 =/tmp/newreport/log_.*._test
#如果日志流量很大,适当增加此值可以增大Source的吞吐量,但是不能超过Channel的capacity和transactionCapacity的限制
a1.sources.s1.batchSize  = 100
#默认为3000ms
a1.sources.s1.writePosInterval  = 1000
#a1.sources.s2.maxBackoffSleep  = 5000
a1.sources.s1.fileHeader = true


a1.channels.c1.type=memory
a1.channels.c1.capacity=10000  ##channel中最多缓存多少
a1.channels.c1.transactionCapacity=5000  #channel一次最多吐给sink多少
a1.channels.c1.keep-alive = 50

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers=hzyg-ip-bd40:9092,hzyg-ip-bd41:9092,hzyg-ip-bd42:9092
#a1.sinks.r.metadata.broker.list=61.49.60.102:9092,61.49.60.103:9092,61.49.60.104:9092
a1.sinks.k1.kafka.topic=yt-log-collect-tes4
a1.sinks.k1.kafka.flumeBatchSize = 100
a1.sinks.k1.kafka.producer.acks = 1

a1.sinks.k1.channel=c1
a1.sources.s1.channels=c1

3、启动指定to_kafka_master.conf配置文件的flume进程

nohup flume-ng agent -n a1 -c /root/flume/apache-flume-1.9.0-bin/conf -f /root/flume/apache-flume-1.9.0-bin/conf/to_kafka_master.conf -Dflume.root.logger=INFO,console  &

4、启动指定to_kafka_slave.conf配置文件的flume进程

nohup flume-ng agent -n a1 -c /root/newflume/apache-flume-1.9.0-bin/conf/ -f /root/newflume/apache-flume-1.9.0-bin/conf/to_kafka_slave.conf  -Dflume.root.logger=INFO,console &

5、如果配置prometheus+grafana监控,启动命令可以加如下参数

-Dflume.monitoring.type=http -Dflume.monitoring.port=34545

nohup flume-ng agent --conf conf --conf-file /root/flume-log-collect/yt-log-kafka-2-hdfs.conf -Dflume.monitoring.type=http -Dflume.monitoring.port=34545 --name a2 -Dflume.root.logger=INFO,console &

6、推到master kafka数据量统计

[root@hzyg-ip-bd06 ~]# kafka-run-class kafka.tools.GetOffsetShell --broker-list 10.129.16.28:9092 --topic yt-log-collect-tes3 --time -1
yt-log-collect-tes3:0:5115802
yt-log-collect-tes3:1:5115802
yt-log-collect-tes3:2:5115802

1.png

7、推到slave kafka数据量统计

[root@hzyg-ip-bd32 ~]# kafka-run-class kafka.tools.GetOffsetShell --broker-list 10.129.16.40:9092 --topic yt-log-collect-tes4 --time -1
yt-log-collect-tes4:0:5161642
yt-log-collect-tes4:1:5161642
yt-log-collect-tes4:2:5161643

2.png

四、数据存在偏差

3.png

五、两个配置文件合成一个,测试结果


配置文件如下:

#flume-ng agent --conf conf --conf-file /root/flume-conf/to_kafka.conf --name a1 -Dflume.root.logger=INFO,console
#nohup ./flume-ng agent -n a1 -c /flume/apache-flume-1.9.0-bin/conf -f /flume/apache-flume-1.9.0-bin/conf/to_kafka.conf -Dflume.root.logger=INFO,console  &

a1.sources=s1 s2
a1.sinks=k1 k2
a1.channels=c1 c2

a1.sources.s1.type=TAILDIR
a1.sources.s1.positionFile =/tmp/newreport/taildir_new_position.json
a1.sources.s1.filegroups =f1 f2
a1.sources.s1.filegroups.f1 =/tmp/newreport/log_.*._v2
a1.sources.s1.filegroups.f2 =/tmp/newreport/log_.*._test

a1.sources.s2.type=TAILDIR
a1.sources.s2.positionFile =/tmp/newreport/taildir_position.json
a1.sources.s2.filegroups =f1 f2
a1.sources.s2.filegroups.f1 =/tmp/newreport/log_.*._v2
a1.sources.s2.filegroups.f2 =/tmp/newreport/log_.*._test
#如果日志流量很大,适当增加此值可以增大Source的吞吐量,但是不能超过Channel的capacity和transactionCapacity的限制
a1.sources.s2.batchSize  = 100
#默认为3000ms
a1.sources.s2.writePosInterval  = 1000
#a1.sources.s2.maxBackoffSleep  = 5000
a1.sources.s2.fileHeader = true
#设置Memory Channel

#slave
a1.channels.c1.type=memory
a1.channels.c1.capacity=10000  ##channel中最多缓存多少
a1.channels.c1.transactionCapacity=5000  #channel一次最多吐给sink多少
a1.channels.c1.keep-alive = 50


a1.channels.c2.type=memory
a1.channels.c2.capacity=5000  ##channel中最多缓存多少
a1.channels.c2.transactionCapacity=2000  #channel一次最多吐给sink多少
a1.channels.c2.keep-alive = 10

##发送到kafka
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers=hzyg-ip-bd40:9092,hzyg-ip-bd41:9092,hzyg-ip-bd42:9092
a1.sinks.k1.kafka.topic=yt-log-collect-tes8
a1.sinks.k1.kafka.flumeBatchSize = 100
#a1.sinks.k1.kafka.producer.acks = 1

a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k2.kafka.bootstrap.servers=hzyg-ip-bd28:9092,hzyg-ip-bd29:9092,hzyg-ip-bd30:9092
a1.sinks.k2.kafka.topic=yt-log-collect-tes7
a1.sinks.k2.kafka.flumeBatchSize = 20
a1.sinks.k1.channel=c1
a1.sources.s1.channels=c1
a1.sinks.k2.channel=c2
a1.sources.s2.channels=c2

4.png



相关文章

证书过期问题之IPV6协议

证书过期问题之IPV6协议

1、首先了解情况:客户的SSL/TLS证书要过期了,所以进行了证书替换工作,但是替换之后,有一部分客户端还是提示证书已过期,如图所示:2、看这个报错是非常的清晰的,就是证书过期的问题,但是为什么同事没...

使用clickhouse-backup备份和恢复数据

使用clickhouse-backup备份和恢复数据

介绍clickhouse-backup是altinity提供的一个clickhouse数据库备份和恢复的工具,开源项目地址:https://github.com/Altinity/clickhouse...

MySQL性能优化(九)range和ref

MySQL性能优化(九)range和ref

有的时候,我们会遇到这样的情况:明明有索引,明明有更好的执行计划,但是优化器并没有选择这个最优的执行计划。优化器可能会选择并非最优的索引,可能选择并非最优的数据访问方式。下面是一个真实的例子:一个例子...

Scheduler调度器

一、论 Pod 调度在 kubernetes 中,无论是 Deployment、Statefulset 等多种控制器,它最终都是创建 Pod,在 Pod 创建是需要被调度到 Kubernetes 集群...

Mysql删除binlog

binlog 是记录所有数据库表结构变更(例如CREATE、ALTER TABLE…)以及表数据修改(INSERT、UPDATE、DELETE…)的二进制日志。一、手动删除直接在 /var/lib/m...

Spark 对接 Alluxio

Spark 对接 Alluxio

1、概览        Spark 1.1 或更高版本的 Spark可以通过其与 HDFS 兼容的接口直接访问 Alluxio 集群。 使用 Alluxio 作为数据访问层,Spark 应用程序可以透...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。