Flume使用案例之Flume与Flume之间数据传递,多Flume汇总数据到单Flume

楼高2年前技术文章643

目标:flume11监控文件hive.logflume-22监控某一个端口的数据流,flume11flume-22将数据发送给flume-33flume33将最终数据写入到HDFS

分步实现:

1. 创建flume11.conf,用于监控hive.log文件,同时sink数据到flume-33

# 1 agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1

 

# 2 source

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /opt/Andy

a1.sources.r1.shell = /bin/bash -c

 

# 3 sink

a1.sinks.k1.type = avro

a1.sinks.k1.hostname = dtstack_hdfs

a1.sinks.k1.port = 4141

 

# 4 channel

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

 

# 5. Bind

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

 2. 创建flume-22.conf,用于监控端口44444数据流,同时sink数据到flume-33

# 1 agent

a2.sources = r1

a2.sinks = k1

a2.channels = c1

 

# 2 source

a2.sources.r1.type = netcat

a2.sources.r1.bind = dtstack_hdfs

a2.sources.r1.port = 44444

 

#3 sink

a2.sinks.k1.type = avro

a2.sinks.k1.hostname = dtstack_hdfs

a2.sinks.k1.port = 4141

 

# 4 channel

a2.channels.c1.type = memory

a2.channels.c1.capacity = 1000

a2.channels.c1.transactionCapacity = 100

 

# 5 Bind

a2.sources.r1.channels = c1

a2.sinks.k1.channel = c1

 3.  创建flume33.conf,用于接收flume11flume22发送过来的数据流,最终合并后sinkHDFS

# 1 agent

a3.sources = r1

a3.sinks = k1

a3.channels = c1

 

# 2 source

a3.sources.r1.type = avro

a3.sources.r1.bind = dtstack_hdfs

a3.sources.r1.port = 4141

 

# 3 sink

a3.sinks.k1.type = hdfs

a3.sinks.k1.hdfs.path = hdfs://dtstack_hdfs:9000/flume3/%H

#上传文件的前缀

a3.sinks.k1.hdfs.filePrefix = flume3-

#是否按照时间滚动文件夹

a3.sinks.k1.hdfs.round = true

#多少时间单位创建一个新的文件夹

a3.sinks.k1.hdfs.roundValue = 1

#重新定义时间单位

a3.sinks.k1.hdfs.roundUnit = hour

#是否使用本地时间戳

a3.sinks.k1.hdfs.useLocalTimeStamp = true

#积攒多少个EventflushHDFS一次

a3.sinks.k1.hdfs.batchSize = 100

#设置文件类型,可支持压缩

a3.sinks.k1.hdfs.fileType = DataStream

#多久生成一个新的文件

a3.sinks.k1.hdfs.rollInterval = 600

#设置每个文件的滚动大小大概是128M

a3.sinks.k1.hdfs.rollSize = 134217700

#文件的滚动与Event数量无关

a3.sinks.k1.hdfs.rollCount = 0

#最小冗余数

a3.sinks.k1.hdfs.minBlockReplicas = 1

 

# 4 channel

a3.channels.c1.type = memory

a3.channels.c1.capacity = 1000

a3.channels.c1.transactionCapacity = 100

 

# 5 Bind

a3.sources.r1.channels = c1

a3.sinks.k1.channel = c1

 4. 执行测试:分别开启对应flume-job(依次启动flume-33flume-22flume11),同时产生文件变动并观察结果

$ bin/flume-ng agent --conf conf/ --name a3 --conf-file jobconf/flume33.conf

$ bin/flume-ng agent --conf conf/ --name a2 --conf-file jobconf/flume22.conf

$ bin/flume-ng agent --conf conf/ --name a1 --conf-file jobconf/flume11.conf

数据发送

1) telnet dtstack_hdfs 44444    打开后发送java

2) /opt/Andy 中追加python

相关文章

磁盘存储和文件系统详解

磁盘存储和文件系统详解

1、磁盘结构设备文件:关联至一个设备驱动程序,进而能够与之对应硬件设备进行通信I/O Ports:I/O 设备地址一切皆文件:open(),read(),write(),close()设备类型:块设备...

MySQL 初始化推荐关注的参数

MySQL 初始化推荐关注的参数

前言新部署的 MySQL 实例如何配置?本 SOP 将提供一些 MySQL 关键参数及设置方法。必须设置的参数1. innodb_buffer_pool_size对于 innodb 表引擎来说,用户数...

MySQL DBA 常用工具 SQL

MySQL DBA 常用工具 SQL

【前言】本篇文章介绍一些 MySQL 管理的实用语句及适用的场景。SQL 基于 MySQL 5.7 版本。1. 长事务事务长时间未提交,即使状态为Sleep也可能造成一些锁等待的问题,使用该查询可以查...

scylladb集群如何添加新数据中心

1、信息收集· 收集现有集群信息cat /etc/scylla/scylla.yaml | grep cluster_namecat /etc/scylla/scylla.yaml | grep se...

oracle v$archive_log视图过期信息清理

      在使用RMAN命令删除归档后,查询v$archived_log视图会发现name列为空了,但其他列的信息还保留,时间长了会留下很多过期的信息,影响维护工作,需要将过期的信息删除。 出现这样...

PostgreSQL 锁等待排查

PostgreSQL 锁等待排查

说明在数据库中,常用 锁 和 MVCC 来保障事务的一致性及提高并发性。锁问题的定位和排查也是数据库运维人员必会的技能,本篇文章介绍 PostgreSQL 如何排查定位锁堵塞问题。1. Postgre...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。