Hadoop集群小文件合并优化建议指导

南墨2年前技术文章1323

1 综述

1.1 综述

本指导书旨在指导大数据集群使用者在大数据集群小文件较多的情况下,针对小文件进行优化处理,有效减小集群文件对象数目,减缓namenode所承载的压力,减少mapreduce任务扫描文件数。

2 关于小文件

2.1 什么是小文件

小文件是指文件大小明显小于HDFS上块(block)大小(FI 默认128MB)的文件。这样的文件给hadoop的扩展性和性能带来严重问题。

首先,在HDFS中,任何一个文件,目录或者blockNameNode节点的内存中均以一个对象表示(元数据),而这受到NameNode物理内存容量的限制。每个元数据对象约占150byte,所以如果有1千万个小文件,每个文件占用一个block,则NameNode大约需要15G空间。如果存储1亿个文件,则NameNode需要150G空间,这毫无疑问1亿个小文件是不可取的。

其次,处理小文件并非Hadoop的设计目标。因而,在HDFS中存储大量小文件是很低效的。访问大量小文件经常会导致大量的寻找,以及不断的从一个DataNode跳到另一个DataNode去检索小文件,这都不是一个很有效的访问模式,严重影响性能。最后,处理大量小文件速度远远小于处理同等大小的大文件的速度。每一个小文件要占用一个slot,而task启动将耗费大量时间甚至大部分时间都耗费在启动task和释放task上。

2.2 如何界定小文件

当一个文件的大小明显小于HDFS的块大小(FI 默认128MB),就将认定为小文件否则就是大文件。建议存储在HDFS上的单个文件大小为块大小或块的整数倍大小

2.3 判断集群小文件情况

2.3.1 通过目录文件数和大小的对应关系判断

针对集群目录执行hdfs dfs –count –v –h /xxxx/xxx/xx/x,根据回显确认当前目录下文件数和大小,确定小文件情况。(此命令统计的大小为单副本文件大小)

例如:当前目录总大小5G,文件数上百万,此时即可认为此目录下小文件较多。

2.3.2 根据MR任务判断小文件情况

观察MR任务的mapreduce数量,如果某个任务的mapreduce数量较多,则该任务存储路径下极有可能存在大量小文件,分为以下两种情况

1.     没有reduce的任务,map数量较多。

2.     存在mapreduce的任务,reduce数量较多。

2.4 小文件对集群的影响

1.     HDFS不适合大量小文件的存储,因为namenode将文件系统的元数据存放在NameNode的内存中,因此存储的文件数目受限于 namenode的内存大小。HDFS中每个文件、目录、数据块占用150Bytes。如果存放的文件数目过多的话将直接导致NameNode的内存溢出

2.     HDFS适用于高吞吐量,而不适合低时间延迟的访问。如果同时存入大量的小文件会花费很长的时间,严重影响读写性能。

3.     流式读取的方式,不适合多用户写入,以及任意位置写入。如果访问小文件,则必须从一个datanode跳转到另外一个datanode,这样大大降低了读取性能。小文件合并操作指导

2.5 针对小文件进行优化处理的方式方法

2.5.1 Hive表产生或存储在Hive表中的小文件

1.     如果文件存储格式是orc格式的表

Orc的表合并小文件请使用:

Alter Table tableNmae [PARTITION (partition_key = 'partition_value'[,...])] CONCATENATE;

2.     其他格式表合并小文件

Create Table As Select (CTAS),即用 hive 把数据从源表(含大量小文件)查出并插入到一张临时表,所有数据插入到临时表后,源表和临时表的表名互换即可。

insert overwrite table tb_name partition(p=xx) select * from tb_name where p=xx;

注意,你需要给 hive 会话添加下面的配置来控制小文件合并的条件:

set hive.merge.mapfiles=true;
set hive.merge.mapredfiles = true;
set mapreduce.input.fileinputformat.split.maxsize=256000000;
set mapreduce.input.fileinputformat.split.minsize=256000000;
set hive.merge.smallfiles.avgsize = 128000000; 
set mapred.max.split.size=100000000;
set mapred.min.split.size.per.node=100000000;
set mapred.min.split.size.per.rack=100000000;

对于新业务,设置完参数直接运行sql即可。

上述参数都是客户端参数,在业务侧设置参数即可

3.     具体参数含义如下

set hive.merge.mapfiles = true                            
#在 map only 的任务结束时合并小文件

set mapreduce.input.fileinputformat.split.maxsize    
#map前每个分片数据量的最大值

set mapreduce.input.fileinputformat.split.minsize     
#map前每个分片数据量的最大值

set hive.merge.mapredfiles = false                       
#在 MapReduce 的任务结束时合并小文件

set hive.merge.smallfiles.avgsize                         
#当输出文件的平均大小小于xx值时,启动一个独立的map-reduce任务进行文件merge

2.5.2 直接写入HDFS的小文件

1.     数据导出合并后重新导入

2.     针对数据创建hive表,并通过hive表进行处理

3.     确认数据已可老化,直接删除数据

4.     hadoop3及以上版本,单纯的HDFS文件合并小文件在文件是文本格式(txt)的时候可以使用一下命令(注意:仅限于文本格式文件,其他格式会导致文件数据错误,出现脏数据甚至到时数据损坏)

hdfs dfs -getmerge xxx xxx

5.     其他格式文件建议通过写MR程序读出来再写入的方式处理

6.     使用parquet-tools工具对parquet文件进行合并(此方式为开源方式,未经验证,使用前请自行验证)

通过parquet官方途径获取parquet-tools工具,对多个 parquet 文件进行合并,使用方法:

# 合并 HDFS 上的 parquet 文件

hadoop  jar  parquet-tools-1.9.0.jar  merge  /tmp/a.parquet  /tmp/b.parquet

# 合并本地的 parquet 文件

java  -jar  parquet-tools-1.9.0.jar  merge  /tmp/a.parquet  /tmp/b.parquet

2.5.3 HBase小文件

HBase组件默认开启compaction功能,理论上不存在小文件,如果部份表存在小文件,需要执行compaction对表中的小文件进行合并

1.     判断HBase表是否存在大量小文件

根据2.3.1章节内容对HBase表目录进行count操作,进行判断

2.     针对已存在大量小文件的HBase表进行compaction操作合并小文件(影响性能,请在业务低峰期操作)

确认需要进行compaction操作的表名,执行如下命令:

major_compact '表名'

例:major_compact 't1'

3.     开启HBase compaction功能(hbase-site.xml

配置HBase参数hbase.hregion.majorcompaction不为0

2.5.4 Sparksql合并小文件

Spark SQL表中,经常会存在很多小文件(大小远小于HDFS的块大小),每个小文件默认对应Spark中的一个Partition,即一个Task。在有很多小文件时,Spark会启动很多Task,此时当SQL逻辑中存在Shuffle操作时,会大大增加hash分桶数,严重影响系统性能。

针对小文件很多的场景,DataSource在创建RDD时,先将Table中的split生成PartitionedFile,再将这些PartitionedFile进行合并。即将多个PartitionedFile组成一个partition,从而减少partition数量,避免在Shuffle操作时生成过多的hash分桶,如图所示:

image.png

要启动小文件优化,在Spark客户端的“spark-defaults.conf”配置文件中进行设置。

spark.sql.files.maxPartitionBytes

·       在读取文件时,将单个分区打包的最大字节数。

·       单位:byte

spark.files.openCostInBytes

·       打开文件的预估成本,按照同一时间能够扫描的字节数来测量。当一个分区写入多个文件时使用。高估更好,这样小文件分区将比大文件分区更先被调度。

或者在执行sparksql任务前在客户端设置如下参数

set spark.sql.shuffle.partition = xxx(待设置参数值);
#在设置之前可先执行如下命令查询当前配置
set spark.sql.shuffle.partition;

此参数默认值200,在数据量可控的情况下可以适当调小此参数值,减少partition数量来减少小文件。调小参数值可能导致任务执行速度变慢

需要注意的是,当使用insert overwrite table select * from tmp的时候,设置的spark.sql.shuffle.partition 不生效,是由于spark DAG有宽依赖和窄依赖之分。select * 走的是窄依赖,不经过shuffer。

可以使用group by 等语法,强制使用宽依赖,进行buffer。

合并小文件三种方法

1、通过spark的coalesce()方法和repartition()方法

#shell,或者代码方式可以启用
val rdd2 = rdd1.coalesce(8, true) (true表示是否shuffle)
val rdd3 = rdd1.repartition(8)

说明:

coalesce:coalesce()方法的作用是返回指定一个新的指定分区的Rdd,

如果是生成一个窄依赖的结果,那么可以不发生shuffle,

分区的数量发生激烈的变化,计算节点不足,不设置true可能会出错。

repartition:coalesce()方法shuffle为true的情况。

2.5.5 使用HAR,针对存量文件进行处理。

Hadoop Archive或者HAR,是一个高效地将小文件放入HDFS块中的文件存档工具,它能够将多个小文件打包成一个HAR文件,这样在减少namenode内存使用的同时,仍然允许对文件进行透明的访问。

对某个目录/foo/bar下的所有小文件存档成/outputdir/ zoo.har:也可以指定HAR的大小(使用-Dhar.block.size)

hadoop archive -archiveName zoo.har -p /foo/bar /outputdir

可以这样查看HAR文件存档中的文件:

hadoop dfs -ls har:///user/zoo/foo.har

输出:

har:///user/zoo/foo.har/hadoop/dir1

har:///user/zoo/foo.har/hadoop/dir2

使用HAR时需要两点,第一,对小文件进行存档后,原文件并不会自动被删除,需要用户自己删除;第二,创建HAR文件的过程实际上是在运行一个mapreduce作业

此外,HAR还有一些缺陷:第一,一旦创建,Archives便不可改变。要增加或移除里面的文件,必须重新创建归档文件。第二,要归档的文件名中不能有空格,否则会抛出异常,可以将空格用其他符号替换(使用-Dhar.space.replacement.enable=true -Dhar.space.replacement参数)

2.5.6 其他组件管控的小文件

1.     KafkaESSolr推荐单独挂盘,数据不推荐存储在HDFS

2.     其他组件若存在小文件的可能,建议优先确认文件使用方式后参考2.5.2章节进行合并操


相关文章

远程DEBUG HADOOP源码方法

远程DEBUG HADOOP源码方法

1. 安装IDEA2. 下载hadoop源码,必须与集群服务代码版本一致,否则会导致有的类无法找到3. 将源码导入IDEA工程并完成build4. 点击 菜单栏--运行--编辑配置 进行相关debug...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。