Flume抽取到kafka数据对比测试

耀灵5个月前技术文章106

一、前言

同一台机器两个flume进程抽取同一个目录下日志到kafka,对比kafka中数据量

二、创建测试topic

1、主集群创建topic --tes3

kafka-topics --create --zookeeper 10.129.16.2:2181 --replication-factor 3 --partitions 3 --topic yt-log-collect-tes3

2、备集群创建topic--tes4

kafka-topics --create --zookeeper 10.129.16.32:2181 --replication-factor 3 --partitions 3 --topic yt-log-collect-tes4


三、创建flume配置文件

1、修改to_kafka_master.conf配置文件

[root@hzyg-ip-bd07 conf]# vim to_kafka_master.conf 
#flume-ng agent --conf conf --conf-file /root/flume-conf/to_kafka.conf --name a1 -Dflume.root.logger=INFO,console
#nohup ./flume-ng agent -n a1 -c /flume/apache-flume-1.9.0-bin/conf -f /flume/apache-flume-1.9.0-bin/conf/to_kafka.conf -Dflume.root.logger=INFO,console  &
a1.channels=c2
a1.sources=s2
a1.sinks=k2

a1.sources.s2.type=TAILDIR
a1.sources.s2.positionFile =/tmp/newreport/taildir_position.json
a1.sources.s2.filegroups =f1 f2
a1.sources.s2.filegroups.f1 =/tmp/newreport/log_.*._v2
a1.sources.s2.filegroups.f2 =/tmp/newreport/log_.*._test
#如果日志流量很大,适当增加此值可以增大Source的吞吐量,但是不能超过Channel的capacity和transactionCapacity的限制
a1.sources.s2.batchSize  = 100
#默认为3000ms
a1.sources.s2.writePosInterval  = 1000
#a1.sources.s2.maxBackoffSleep  = 5000
a1.sources.s2.fileHeader = true


a1.channels.c2.type=memory
a1.channels.c2.capacity=5000  ##channel中最多缓存多少
a1.channels.c2.transactionCapacity=2000  #channel一次最多吐给sink多少
a1.channels.c2.keep-alive = 10

a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k2.kafka.bootstrap.servers=hzyg-ip-bd28:9092,hzyg-ip-bd29:9092,hzyg-ip-bd30:9092
a1.sinks.k2.kafka.topic=yt-log-collect-tes3
a1.sinks.k2.kafka.flumeBatchSize = 20

a1.sinks.k2.channel=c2
a1.sources.s2.channels=c2

2、修改to_kafka_slave.conf配置文件

[root@hzyg-ip-bd07 conf]# vim to_kafka_slave.conf 
#flume-ng agent --conf conf --conf-file /root/flume-conf/to_kafka.conf --name a1 -Dflume.root.logger=INFO,console
#nohup ./flume-ng agent -n a1 -c /flume/apache-flume-1.9.0-bin/conf -f /flume/apache-flume-1.9.0-bin/conf/to_kafka.conf -Dflume.root.logger=INFO,console  &
a1.channels=c1
a1.sources=s1
a1.sinks=k1

a1.sources.s1.type=TAILDIR
a1.sources.s1.positionFile =/tmp/newreport/taildir_new_position.json
a1.sources.s1.filegroups =f1 f2
a1.sources.s1.filegroups.f1 =/tmp/newreport/log_.*._v2
a1.sources.s1.filegroups.f2 =/tmp/newreport/log_.*._test
#如果日志流量很大,适当增加此值可以增大Source的吞吐量,但是不能超过Channel的capacity和transactionCapacity的限制
a1.sources.s1.batchSize  = 100
#默认为3000ms
a1.sources.s1.writePosInterval  = 1000
#a1.sources.s2.maxBackoffSleep  = 5000
a1.sources.s1.fileHeader = true


a1.channels.c1.type=memory
a1.channels.c1.capacity=10000  ##channel中最多缓存多少
a1.channels.c1.transactionCapacity=5000  #channel一次最多吐给sink多少
a1.channels.c1.keep-alive = 50

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers=hzyg-ip-bd40:9092,hzyg-ip-bd41:9092,hzyg-ip-bd42:9092
#a1.sinks.r.metadata.broker.list=61.49.60.102:9092,61.49.60.103:9092,61.49.60.104:9092
a1.sinks.k1.kafka.topic=yt-log-collect-tes4
a1.sinks.k1.kafka.flumeBatchSize = 100
a1.sinks.k1.kafka.producer.acks = 1

a1.sinks.k1.channel=c1
a1.sources.s1.channels=c1

3、启动指定to_kafka_master.conf配置文件的flume进程

nohup flume-ng agent -n a1 -c /root/flume/apache-flume-1.9.0-bin/conf -f /root/flume/apache-flume-1.9.0-bin/conf/to_kafka_master.conf -Dflume.root.logger=INFO,console  &

4、启动指定to_kafka_slave.conf配置文件的flume进程

nohup flume-ng agent -n a1 -c /root/newflume/apache-flume-1.9.0-bin/conf/ -f /root/newflume/apache-flume-1.9.0-bin/conf/to_kafka_slave.conf  -Dflume.root.logger=INFO,console &

5、如果配置prometheus+grafana监控,启动命令可以加如下参数

-Dflume.monitoring.type=http -Dflume.monitoring.port=34545

nohup flume-ng agent --conf conf --conf-file /root/flume-log-collect/yt-log-kafka-2-hdfs.conf -Dflume.monitoring.type=http -Dflume.monitoring.port=34545 --name a2 -Dflume.root.logger=INFO,console &

6、推到master kafka数据量统计

[root@hzyg-ip-bd06 ~]# kafka-run-class kafka.tools.GetOffsetShell --broker-list 10.129.16.28:9092 --topic yt-log-collect-tes3 --time -1
yt-log-collect-tes3:0:5115802
yt-log-collect-tes3:1:5115802
yt-log-collect-tes3:2:5115802

1.png

7、推到slave kafka数据量统计

[root@hzyg-ip-bd32 ~]# kafka-run-class kafka.tools.GetOffsetShell --broker-list 10.129.16.40:9092 --topic yt-log-collect-tes4 --time -1
yt-log-collect-tes4:0:5161642
yt-log-collect-tes4:1:5161642
yt-log-collect-tes4:2:5161643

2.png

四、数据存在偏差

3.png

五、两个配置文件合成一个,测试结果


配置文件如下:

#flume-ng agent --conf conf --conf-file /root/flume-conf/to_kafka.conf --name a1 -Dflume.root.logger=INFO,console
#nohup ./flume-ng agent -n a1 -c /flume/apache-flume-1.9.0-bin/conf -f /flume/apache-flume-1.9.0-bin/conf/to_kafka.conf -Dflume.root.logger=INFO,console  &

a1.sources=s1 s2
a1.sinks=k1 k2
a1.channels=c1 c2

a1.sources.s1.type=TAILDIR
a1.sources.s1.positionFile =/tmp/newreport/taildir_new_position.json
a1.sources.s1.filegroups =f1 f2
a1.sources.s1.filegroups.f1 =/tmp/newreport/log_.*._v2
a1.sources.s1.filegroups.f2 =/tmp/newreport/log_.*._test

a1.sources.s2.type=TAILDIR
a1.sources.s2.positionFile =/tmp/newreport/taildir_position.json
a1.sources.s2.filegroups =f1 f2
a1.sources.s2.filegroups.f1 =/tmp/newreport/log_.*._v2
a1.sources.s2.filegroups.f2 =/tmp/newreport/log_.*._test
#如果日志流量很大,适当增加此值可以增大Source的吞吐量,但是不能超过Channel的capacity和transactionCapacity的限制
a1.sources.s2.batchSize  = 100
#默认为3000ms
a1.sources.s2.writePosInterval  = 1000
#a1.sources.s2.maxBackoffSleep  = 5000
a1.sources.s2.fileHeader = true
#设置Memory Channel

#slave
a1.channels.c1.type=memory
a1.channels.c1.capacity=10000  ##channel中最多缓存多少
a1.channels.c1.transactionCapacity=5000  #channel一次最多吐给sink多少
a1.channels.c1.keep-alive = 50


a1.channels.c2.type=memory
a1.channels.c2.capacity=5000  ##channel中最多缓存多少
a1.channels.c2.transactionCapacity=2000  #channel一次最多吐给sink多少
a1.channels.c2.keep-alive = 10

##发送到kafka
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers=hzyg-ip-bd40:9092,hzyg-ip-bd41:9092,hzyg-ip-bd42:9092
a1.sinks.k1.kafka.topic=yt-log-collect-tes8
a1.sinks.k1.kafka.flumeBatchSize = 100
#a1.sinks.k1.kafka.producer.acks = 1

a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k2.kafka.bootstrap.servers=hzyg-ip-bd28:9092,hzyg-ip-bd29:9092,hzyg-ip-bd30:9092
a1.sinks.k2.kafka.topic=yt-log-collect-tes7
a1.sinks.k2.kafka.flumeBatchSize = 20
a1.sinks.k1.channel=c1
a1.sources.s1.channels=c1
a1.sinks.k2.channel=c2
a1.sources.s2.channels=c2

4.png



相关文章

Kafka数据恢复

一、增量恢复增量恢复需要使用 MirrorMaker 来实现,下面是 MirrorMaker 的用法示例:# 创建MirrorMaker 配置文件cat > /tmp/mirror-maker....

Mac安装Hadoop文档-保姆级操作(一)

Mac安装Hadoop文档-保姆级操作(一)

首先配置ssh环境在Mac下如果想使用Hadoop,必须要配置ssh环境, 如果不执行这一步,后面启动hadoop时会出现Connection refused连接被拒绝的错误。首先终端命令框输入:ss...

谈谈K8S Pod Eviction 机制

Pod Eviction 简介Pod Eviction 是k8s一个特色功能,它在某些场景下应用,如节点NotReady、Node节点资源不足,把pod驱逐至其它Node节点。从发起模块的角度,pod...

CDH实操--集群ip替换

CDH实操--集群ip替换

1 背景恰逢机房迁移,自建CDH集群需要调整ip网段。。。2 操作步骤2.1 停止CDH集群2.1.1 控制台停止集群服务2.1.2 控制台停止Cloudera Management Ser...

MySQL 创建索引报错

创建索引报错添加索引发现报错,具体报错如下:create unique index sm_sample_clothing_skc_SkcUniqueKey_uindex on sm_sample_cl...

MySQL 切换主备(三)

MySQL 切换主备(三)

三、切换主备:3.1、确认主库角色查看 vip 状态,目前在主库上面。3.2、确认备库角色此时备库read_only=1只读不写查看数据库读写状态:show global variables like...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。