Flume使用案例之Flume与Flume之间数据传递,多Flume汇总数据到单Flume
目标:flume11监控文件hive.log,flume-22监控某一个端口的数据流,flume11与flume-22将数据发送给flume-33,flume33将最终数据写入到HDFS。
分步实现:
1. 创建flume11.conf,用于监控hive.log文件,同时sink数据到flume-33
# 1 agent a1.sources = r1 a1.sinks = k1 a1.channels = c1
# 2 source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /opt/Andy a1.sources.r1.shell = /bin/bash -c
# 3 sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = dtstack_hdfs a1.sinks.k1.port = 4141
# 4 channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100
# 5. Bind a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 |
2. 创建flume-22.conf,用于监控端口44444数据流,同时sink数据到flume-33
# 1 agent a2.sources = r1 a2.sinks = k1 a2.channels = c1
# 2 source a2.sources.r1.type = netcat a2.sources.r1.bind = dtstack_hdfs a2.sources.r1.port = 44444
#3 sink a2.sinks.k1.type = avro a2.sinks.k1.hostname = dtstack_hdfs a2.sinks.k1.port = 4141
# 4 channel a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100
# 5 Bind a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1 |
3. 创建flume33.conf,用于接收flume11与flume22发送过来的数据流,最终合并后sink到HDFS
# 1 agent a3.sources = r1 a3.sinks = k1 a3.channels = c1
# 2 source a3.sources.r1.type = avro a3.sources.r1.bind = dtstack_hdfs a3.sources.r1.port = 4141
# 3 sink a3.sinks.k1.type = hdfs a3.sinks.k1.hdfs.path = hdfs://dtstack_hdfs:9000/flume3/%H #上传文件的前缀 a3.sinks.k1.hdfs.filePrefix = flume3- #是否按照时间滚动文件夹 a3.sinks.k1.hdfs.round = true #多少时间单位创建一个新的文件夹 a3.sinks.k1.hdfs.roundValue = 1 #重新定义时间单位 a3.sinks.k1.hdfs.roundUnit = hour #是否使用本地时间戳 a3.sinks.k1.hdfs.useLocalTimeStamp = true #积攒多少个Event才flush到HDFS一次 a3.sinks.k1.hdfs.batchSize = 100 #设置文件类型,可支持压缩 a3.sinks.k1.hdfs.fileType = DataStream #多久生成一个新的文件 a3.sinks.k1.hdfs.rollInterval = 600 #设置每个文件的滚动大小大概是128M a3.sinks.k1.hdfs.rollSize = 134217700 #文件的滚动与Event数量无关 a3.sinks.k1.hdfs.rollCount = 0 #最小冗余数 a3.sinks.k1.hdfs.minBlockReplicas = 1
# 4 channel a3.channels.c1.type = memory a3.channels.c1.capacity = 1000 a3.channels.c1.transactionCapacity = 100
# 5 Bind a3.sources.r1.channels = c1 a3.sinks.k1.channel = c1 |
4. 执行测试:分别开启对应flume-job(依次启动flume-33,flume-22,flume11),同时产生文件变动并观察结果
$ bin/flume-ng agent --conf conf/ --name a3 --conf-file jobconf/flume33.conf $ bin/flume-ng agent --conf conf/ --name a2 --conf-file jobconf/flume22.conf $ bin/flume-ng agent --conf conf/ --name a1 --conf-file jobconf/flume11.conf |
数据发送
1) telnet dtstack_hdfs 44444 打开后发送java 2) 在/opt/Andy 中追加python |