Flume抽取到kafka数据对比测试

耀灵2年前技术文章613

一、前言

同一台机器两个flume进程抽取同一个目录下日志到kafka,对比kafka中数据量

二、创建测试topic

1、主集群创建topic --tes3

kafka-topics --create --zookeeper 10.129.16.2:2181 --replication-factor 3 --partitions 3 --topic yt-log-collect-tes3

2、备集群创建topic--tes4

kafka-topics --create --zookeeper 10.129.16.32:2181 --replication-factor 3 --partitions 3 --topic yt-log-collect-tes4


三、创建flume配置文件

1、修改to_kafka_master.conf配置文件

[root@hzyg-ip-bd07 conf]# vim to_kafka_master.conf 
#flume-ng agent --conf conf --conf-file /root/flume-conf/to_kafka.conf --name a1 -Dflume.root.logger=INFO,console
#nohup ./flume-ng agent -n a1 -c /flume/apache-flume-1.9.0-bin/conf -f /flume/apache-flume-1.9.0-bin/conf/to_kafka.conf -Dflume.root.logger=INFO,console  &
a1.channels=c2
a1.sources=s2
a1.sinks=k2

a1.sources.s2.type=TAILDIR
a1.sources.s2.positionFile =/tmp/newreport/taildir_position.json
a1.sources.s2.filegroups =f1 f2
a1.sources.s2.filegroups.f1 =/tmp/newreport/log_.*._v2
a1.sources.s2.filegroups.f2 =/tmp/newreport/log_.*._test
#如果日志流量很大,适当增加此值可以增大Source的吞吐量,但是不能超过Channel的capacity和transactionCapacity的限制
a1.sources.s2.batchSize  = 100
#默认为3000ms
a1.sources.s2.writePosInterval  = 1000
#a1.sources.s2.maxBackoffSleep  = 5000
a1.sources.s2.fileHeader = true


a1.channels.c2.type=memory
a1.channels.c2.capacity=5000  ##channel中最多缓存多少
a1.channels.c2.transactionCapacity=2000  #channel一次最多吐给sink多少
a1.channels.c2.keep-alive = 10

a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k2.kafka.bootstrap.servers=hzyg-ip-bd28:9092,hzyg-ip-bd29:9092,hzyg-ip-bd30:9092
a1.sinks.k2.kafka.topic=yt-log-collect-tes3
a1.sinks.k2.kafka.flumeBatchSize = 20

a1.sinks.k2.channel=c2
a1.sources.s2.channels=c2

2、修改to_kafka_slave.conf配置文件

[root@hzyg-ip-bd07 conf]# vim to_kafka_slave.conf 
#flume-ng agent --conf conf --conf-file /root/flume-conf/to_kafka.conf --name a1 -Dflume.root.logger=INFO,console
#nohup ./flume-ng agent -n a1 -c /flume/apache-flume-1.9.0-bin/conf -f /flume/apache-flume-1.9.0-bin/conf/to_kafka.conf -Dflume.root.logger=INFO,console  &
a1.channels=c1
a1.sources=s1
a1.sinks=k1

a1.sources.s1.type=TAILDIR
a1.sources.s1.positionFile =/tmp/newreport/taildir_new_position.json
a1.sources.s1.filegroups =f1 f2
a1.sources.s1.filegroups.f1 =/tmp/newreport/log_.*._v2
a1.sources.s1.filegroups.f2 =/tmp/newreport/log_.*._test
#如果日志流量很大,适当增加此值可以增大Source的吞吐量,但是不能超过Channel的capacity和transactionCapacity的限制
a1.sources.s1.batchSize  = 100
#默认为3000ms
a1.sources.s1.writePosInterval  = 1000
#a1.sources.s2.maxBackoffSleep  = 5000
a1.sources.s1.fileHeader = true


a1.channels.c1.type=memory
a1.channels.c1.capacity=10000  ##channel中最多缓存多少
a1.channels.c1.transactionCapacity=5000  #channel一次最多吐给sink多少
a1.channels.c1.keep-alive = 50

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers=hzyg-ip-bd40:9092,hzyg-ip-bd41:9092,hzyg-ip-bd42:9092
#a1.sinks.r.metadata.broker.list=61.49.60.102:9092,61.49.60.103:9092,61.49.60.104:9092
a1.sinks.k1.kafka.topic=yt-log-collect-tes4
a1.sinks.k1.kafka.flumeBatchSize = 100
a1.sinks.k1.kafka.producer.acks = 1

a1.sinks.k1.channel=c1
a1.sources.s1.channels=c1

3、启动指定to_kafka_master.conf配置文件的flume进程

nohup flume-ng agent -n a1 -c /root/flume/apache-flume-1.9.0-bin/conf -f /root/flume/apache-flume-1.9.0-bin/conf/to_kafka_master.conf -Dflume.root.logger=INFO,console  &

4、启动指定to_kafka_slave.conf配置文件的flume进程

nohup flume-ng agent -n a1 -c /root/newflume/apache-flume-1.9.0-bin/conf/ -f /root/newflume/apache-flume-1.9.0-bin/conf/to_kafka_slave.conf  -Dflume.root.logger=INFO,console &

5、如果配置prometheus+grafana监控,启动命令可以加如下参数

-Dflume.monitoring.type=http -Dflume.monitoring.port=34545

nohup flume-ng agent --conf conf --conf-file /root/flume-log-collect/yt-log-kafka-2-hdfs.conf -Dflume.monitoring.type=http -Dflume.monitoring.port=34545 --name a2 -Dflume.root.logger=INFO,console &

6、推到master kafka数据量统计

[root@hzyg-ip-bd06 ~]# kafka-run-class kafka.tools.GetOffsetShell --broker-list 10.129.16.28:9092 --topic yt-log-collect-tes3 --time -1
yt-log-collect-tes3:0:5115802
yt-log-collect-tes3:1:5115802
yt-log-collect-tes3:2:5115802

1.png

7、推到slave kafka数据量统计

[root@hzyg-ip-bd32 ~]# kafka-run-class kafka.tools.GetOffsetShell --broker-list 10.129.16.40:9092 --topic yt-log-collect-tes4 --time -1
yt-log-collect-tes4:0:5161642
yt-log-collect-tes4:1:5161642
yt-log-collect-tes4:2:5161643

2.png

四、数据存在偏差

3.png

五、两个配置文件合成一个,测试结果


配置文件如下:

#flume-ng agent --conf conf --conf-file /root/flume-conf/to_kafka.conf --name a1 -Dflume.root.logger=INFO,console
#nohup ./flume-ng agent -n a1 -c /flume/apache-flume-1.9.0-bin/conf -f /flume/apache-flume-1.9.0-bin/conf/to_kafka.conf -Dflume.root.logger=INFO,console  &

a1.sources=s1 s2
a1.sinks=k1 k2
a1.channels=c1 c2

a1.sources.s1.type=TAILDIR
a1.sources.s1.positionFile =/tmp/newreport/taildir_new_position.json
a1.sources.s1.filegroups =f1 f2
a1.sources.s1.filegroups.f1 =/tmp/newreport/log_.*._v2
a1.sources.s1.filegroups.f2 =/tmp/newreport/log_.*._test

a1.sources.s2.type=TAILDIR
a1.sources.s2.positionFile =/tmp/newreport/taildir_position.json
a1.sources.s2.filegroups =f1 f2
a1.sources.s2.filegroups.f1 =/tmp/newreport/log_.*._v2
a1.sources.s2.filegroups.f2 =/tmp/newreport/log_.*._test
#如果日志流量很大,适当增加此值可以增大Source的吞吐量,但是不能超过Channel的capacity和transactionCapacity的限制
a1.sources.s2.batchSize  = 100
#默认为3000ms
a1.sources.s2.writePosInterval  = 1000
#a1.sources.s2.maxBackoffSleep  = 5000
a1.sources.s2.fileHeader = true
#设置Memory Channel

#slave
a1.channels.c1.type=memory
a1.channels.c1.capacity=10000  ##channel中最多缓存多少
a1.channels.c1.transactionCapacity=5000  #channel一次最多吐给sink多少
a1.channels.c1.keep-alive = 50


a1.channels.c2.type=memory
a1.channels.c2.capacity=5000  ##channel中最多缓存多少
a1.channels.c2.transactionCapacity=2000  #channel一次最多吐给sink多少
a1.channels.c2.keep-alive = 10

##发送到kafka
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers=hzyg-ip-bd40:9092,hzyg-ip-bd41:9092,hzyg-ip-bd42:9092
a1.sinks.k1.kafka.topic=yt-log-collect-tes8
a1.sinks.k1.kafka.flumeBatchSize = 100
#a1.sinks.k1.kafka.producer.acks = 1

a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k2.kafka.bootstrap.servers=hzyg-ip-bd28:9092,hzyg-ip-bd29:9092,hzyg-ip-bd30:9092
a1.sinks.k2.kafka.topic=yt-log-collect-tes7
a1.sinks.k2.kafka.flumeBatchSize = 20
a1.sinks.k1.channel=c1
a1.sources.s1.channels=c1
a1.sinks.k2.channel=c2
a1.sources.s2.channels=c2

4.png



相关文章

EMR-java配置国密SM4加密

EMR-java配置国密SM4加密

首先找到bcprov-jdk15on-1.56.jar这个包<dependency>    <groupId>org.bouncycastle</groupId> ...

ES架构模型

ES架构模型

1.整体架构Elasticsearch是一个高度可扩展的开源全文搜索和分析引擎。它允许您快速、近实时地存储、搜索和分析大量数据。它通常用作支持具有复杂搜索功能和需求的应用程序的底层引擎/技术。Elas...

DG概念与机制

1. 相关概念1.1 什么是DG  DG全称Data Guard,官方给出的定义是“Oracle Data Guard ensures high availability, data protecti...

开源大数据集群部署(二十)Trino部署

开源大数据集群部署(二十)Trino部署

2.9.1 解压trino的包到opt目录cd /root/bigdata tar -xzvf trino-server-389.tar.gz -C /opt/ ln -s /opt/trino-...

 大数据集群监控配置操作指导(二)node_exporter+mysql_exporter部署

大数据集群监控配置操作指导(二)node_exporter+mysql_exporter部署

2.node_exporter监控集群服务器(所有集群服务器)wget https://github.com/prometheus/node_exporter/releases/download/v1...

Dockerfile

Dockerfile

一、什么是镜像?镜像可以看成是由多个镜像层叠加起来的一个文件系统(通过UnionFS与AUFS文件联合系统实现),镜像层也可以简单理解为一个基本的镜像,而每个镜像层之间通过指针的形式进行叠加。根据上图...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。