Flume使用案例之实时读取目录到HDFS

楼高10个月前技术文章215

目标:使用flume监听整个目录的文件

分步实现

1. 创建配置文件flume-dir.conf

#1 Agent

a3.sources = r3

a3.sinks = k3

a3.channels = c3

 

#2 source

a3.sources.r3.type = spooldir

a3.sources.r3.spoolDir = /opt/module/flume1.8.0/upload

a3.sources.r3.fileSuffix = .COMPLETED

a3.sources.r3.fileHeader = true

#忽略所有以.tmp结尾的文件,不上传

a3.sources.r3.ignorePattern = ([^ ]*\.tmp)

 

# 3 sink

a3.sinks.k3.type = hdfs

a3.sinks.k3.hdfs.path = hdfs://dtstack_hdfs:9000/flume/%H

#上传文件的前缀

a3.sinks.k3.hdfs.filePrefix = upload-

#是否按照时间滚动文件夹

a3.sinks.k3.hdfs.round = true

#多少时间单位创建一个新的文件夹

a3.sinks.k3.hdfs.roundValue = 1

#重新定义时间单位

a3.sinks.k3.hdfs.roundUnit = hour

#是否使用本地时间戳

a3.sinks.k3.hdfs.useLocalTimeStamp = true

#积攒多少个EventflushHDFS一次

a3.sinks.k3.hdfs.batchSize = 100

#设置文件类型,可支持压缩

a3.sinks.k3.hdfs.fileType = DataStream

#多久生成一个新的文件

a3.sinks.k3.hdfs.rollInterval = 600

#设置每个文件的滚动大小大概是128M

a3.sinks.k3.hdfs.rollSize = 134217700

#文件的滚动与Event数量无关

a3.sinks.k3.hdfs.rollCount = 0

#最小副本数

a3.sinks.k3.hdfs.minBlockReplicas = 1

 

# Use a channel which buffers events in memory

a3.channels.c3.type = memory

a3.channels.c3.capacity = 1000

a3.channels.c3.transactionCapacity = 100

 

# Bind the source and sink to the channel

a3.sources.r3.channels = c3

a3.sinks.k3.channel = c3

2. 执行测试:执行如下脚本后,请向upload文件夹中添加文件试试

/opt/module/flume1.8.0/bin/flume-ng agent \

--conf /opt/module/flume1.8.0/conf/ \

--name a3 \

--conf-file /opt/module/flume1.8.0/jobconf/flume-dir.conf

提示: 在使用Spooling Directory Source

1) 不要在监控目录中创建并持续修改文件

2) 上传完成的文件会以.COMPLETED结尾

3) 被监控文件夹每500毫秒扫描一次文件变动

相关文章

阿里金融云经典网络和线下某银行实现网络互通

阿里金融云经典网络和线下某银行实现网络互通

需求某银行需要和某阿里金融云账号下的经典网络实例内网打通。已知不考虑将该服务器从经典网络类型迁移至VPC类型。阿里金融云环境下,之前是支持拉线下到经典网络专线的,但是目前和阿里侧核查,确认已不支持,仅...

寻找CPU使用率高进程方法

寻找CPU使用率高进程方法

背景节点报CPU使用率高,需要定位是什么进程占用CPU使用率高。CPU使用率持续较高在对应节点使用 “top”命令,然后键盘输入“P”,即按照CPU使用率排序进程。执行ps -ef | grep &l...

网络策略NetworkPolicy

网络策略NetworkPolicy

目的:为了实现细粒度的容器间网络访问隔离策略。引用:1.3版本NetworkPolicy机制 -> 1.8版本networking.k8s.io/v1稳定版本功能:对pod、ns之间网络通信限制...

A集群导入B集群中的高可用版rancher

A集群导入B集群中的高可用版rancher

问题现象:已知在B集群中采用helm方式部署了一个高可用版本的rancher,该rancher中已经配置导入了三套集群,并且三套集群状态在rancher控制台处均显示正常,日常可借助该rancher管...

Kubernetes源码解读(四)--Lister&Watcher源码分析

Kubernetes源码解读(四)--Lister&Watcher源码分析

Lister&&Watcher是Reflector的一个主要能力提供者,我们来看看Lister&&Watcher是如何实现List()和watch()的过程的。List...

kafka启动失败

kafka启动失败

问题现象客户重启kafka服务,页面显示run fail        问题报错问题原因  &nbs...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。