大数据集群二次开发及调优使用指导(三)-Hive
1. 业务调优:
Hive业务的业务主要以批量处理作业为主,批处理主要特点是耗时时间长,消耗的资源比较多,主要的调优和设计推荐如下:
1. 尽量使用ORC File, 配上合适的压缩算法, 主要可选的压缩算法为Zlib和Snappy。其中Zlib压缩比高,但压缩解压时间比Snappy长,消耗资源比如Snappy多。Snappy平衡了的压缩比和压缩解压的性能。推荐使用Snappy。
2. 尽量使用Map Join减少Shuffle的次数,大幅提升性能
3. 不同SQL语句,完成同一个功能,生成Map Reduce的数量越少越好
4. Hive系统默认是典型的配置场景,结合业务实际情况,可以做一些参数的调整,如文件块的大小,Map个数与Reduce的个数,压缩算法等。
5. 合理的使用分区,分区数量不要太多,查询的SQL尽量指定具体的分区值;
2. 二次开发业务应用指导
通常数据进入Hadoop的处理流程大概如下:
1. 导入原始数据到HDFS:
原始业务的数据可能是在FTP服务器上,可能在本地磁盘上,可能在网络共享存储上, 可以通过sqoop组件,HDFS API,HDFS 命令行工具将原始数据导入HDFS,或者通过Kafka,Flume等直接写入HDFS。
2. 格式转换(可选):
如果HDFS的数据格式不是列格式,建议通过Hive的SQL语句转换成列存储格式,如ORC格式,这样可以优化IO访问,合并小文件。
3. 针对性分析数据:
调用JDBC接口或者Beeline执行SQL语句对数据进行处理和转换,这里就是业务自己编写SQL语句,进行分析数据,生成结果。
4. 输出结果数据:
一般将结果数据导出到Relation Database,供其它的报表工具进一步使用,可以通过JDBC或直接从HDFS导出将数据转移到其它地方。
3. Hive的HQL调优
这一节讲解SQL语句优化,进行性能调优的开发人员,需要知道一些通常的原理,比如一个group by是一个MapReduce,一个join是一个MapReduce,一个order by是一个MapReduce。所以当一个SQL慢的时候,需要查看执行计划进行相应的调优。
3.1 HQL执行计划
Hive作为处理SQL的层,具体自己独特的SQL执行计划,查看SQL的执行计划能够快速了解SQL的执行过程,对于性能的调优会有很大的帮助。
Hive提供了Explain命令显示查询计划,语法如下:
EXPLAIN [EXTEENDED] query
EXPLAIN语句使用EXTENDED提供执行计划关于操作的的额外信息,如文件名。
EXPLAIN输出包括三个部分:
1. 查询的抽象语法构
2. 执行计划和计划的不同阶段之间的依赖关系
3. 每个操作的描述,如FilterOperator,SelectOperator,FileSinkOperator
下面以例子的形式来看看Hive的执行计划的情况:
hive> explain select s.id, s.name from student s left outer join student_tmp st on s.name = st.name;
OK
ABSTRACT SYNTAX TREE: #抽象语法树
(TOK_QUERY (TOK_FROM (TOK_LEFTOUTERJOIN (TOK_TABREF (TOK_TABNAME student) s) (TOK_TABREF (TOK_TABNAME student_tmp) st) (= (. (TOK_TABLE_OR_COL s) name) (. (TOK_TABLE_OR_COL st) name)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL s) id)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL s) name)))))
STAGE DEPENDENCIES: #依赖图
#这个sql将被分成两个阶段执行。基本上每个阶段会对应一个mapreduce job,Stage-0除外。因为Stage-0只是fetch结果集,不需要mapreduce job
Stage-1 is a root stage
Stage-0 is a root stage
STAGE PLANS:
Stage: Stage-1
Map Reduce
Alias -> Map Operator Tree: #map job开始
s
TableScan
alias: s #扫描表student
Reduce Output Operator #这里描述map的输出,也就是reduce的输入。比如key,partition,sort等信息。
key expressions: #reduce job的key
expr: name
type: string
sort order: + #这里表示按一个字段排序,如果是按两个字段排序,那么就会有两个+(++),更多以此类推
Map-reduce partition columns:
#partition的信息,由此也可以看出hive在join的时候会以join on后的列作为partition的列,以#保证具有相同此列的值的行被分到同一个reduce中去
expr: name
type: string
tag: 0 #用于标示这个扫描的结果,后面的join会用到它
value expressions: #表示select 后面的列
expr: id
type: int
expr: name
type: string
st
TableScan #开始扫描第二张表
alias: st
Reduce Output Operator
key expressions:
expr: name
type: string
sort order: +
Map-reduce partition columns:
expr: name
type: string
tag: 1
Reduce Operator Tree: #reduce job开始
Join Operator
condition map:
Left Outer Join0 to 1 #tag 0 out join tag 1
condition expressions: #这里也是描述select 后的列。这里我们的select后的列是 s.id 和 s.name, #所以0后面有两个字段, 1后面没有
0 {VALUE._col0} {VALUE._col2}
1
handleSkewJoin: false
outputColumnNames: _col0, _col2
Select Operator
expressions:
expr: _col0
type: int
expr: _col2
type: string
outputColumnNames: _col0, _col1
File Output Operator
compressed: false
GlobalTableId: 0
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Stage: Stage-0
Fetch Operator
limit: -1
Time taken: 0.216 seconds
3.2 除去多余的操作
CREATE TABLE clicks
( timestamp date,
sessionID string,
url string,
source_ip string )
STORED as ORC
tblproperties (“orc.compress" = "SNAPPY");
比如上边这个表,如果要获取每一个sessionID最新的访问记录,可以这样写:
SELECT clicks.*
FROM clicks inner join ( select sessionID, max(timestamp) as max_ts
from clicks
group by sessionID) latest
ON clicks.sessionID = latest.sessionID and clicks.timestamp = latest.max_ts;
语句解释:
查看执行计划会发现该语句会产生两个MapReduce,第一个MapReduce生成最新的sessionID信息表,第二MapReduce做一个Join,前面输出的结果与原表进行Join得到最后的结果。
再来看看下边的写法:
SELECT *
FROM ( SELECT *,
RANK() over (partition by sessionID, order by timestamp desc) as rank
FROM clicks
) ranked_clicks
WHERE ranked_clicks.rank=1;
语句解释:
查看执行计划会发现只会有一个MapReduce,该语句将数据读出来对sesssionID进行分组并按时间进行倒序排序,最后过滤出排序后的第一条记录。
比较第一个语句和第二个语句,发现第二个语句减少了一次MapReduce操作,性能自然大幅提升。很明显第一个语句多余执行了一次Join操作,第二语句以分组排序替代了Join,除去了不必要的Join操作,带来了性能的提升。
3.3 Distinct聚合优化
操作场景
SELECT COUNT( DISTINCT id ) FROM TABLE_NAME ;
优化前的问题:只有一个reduce处理全量数据,并发度不够,存在单点瓶颈。
换种写法,reduce就会有多个,性能提升很多。
SELECT COUNT(*) FROM (SELECT DISTINCT id FROM TABLE_NAME ) t;
3.4 Order By
操作场景
默认情况下,Order By会生成一个Reduce进行全局排序,所以,一般要求Order By要加上Limit来使用,或者修改为Distribute By和Sort By 。
对于必须进行全量,全局排序的,可以考虑对数据进行抽样后,通过Distribute By和Sort By进行并行排序。
修改参数
参数名 | 描述 |
hive.optimize.sampling.orderby | 默认是false |
hive.optimize.sampling.orderby.number | 默认1000,抽取多少个样本 |
hive.optimize.sampling.orderby.percent | 默认0.1,每条记录生成一个随机数,当随机数小于这个值时,选中该记录 |
3.5 Multi Insert
对于从同一份源表往不同的表中插数据,可以采用multi insert语法,减少job个数
FROM (SELECT a.status, b.school, b.gender
FROM status_updates a JOIN profiles b
ON (a.userid = b.userid anda.ds='2009-03-20' )
) subq1
INSERT OVERWRITE TABLE gender_summary PARTITION(ds='2009-03-20')
SELECT subq1.gender, COUNT(1) GROUP BY subq1.gender
INSERT OVERWRITE TABLE school_summary PARTITION(ds='2009-03-20')
SELECT subq1.school, COUNT(1) GROUP BY subq1.school
上述查询语句使用了Multi-Insert特性连续insert了两张不同的表,减少了一轮MapReduce操作。
3.6 动态分区+distribute by
一般在从text转orc的时候,会用到动态分区,对于
INSERT OVERWRITE TABLE T1 PARTITION(P1)
SELECT C1,C2,P1FROM T2
最好对分区字段加上distribute by,否则最坏的情况就是,一个Task包含每个分区的数据,往每个分区插入数据,导致Task的内存溢出。
INSERT OVERWRITE TABLE T1 PARTITION(P1)
SELECT C1, C2, P1 FROM T2 DISTRIBUTE BY P1
4. 性能调优常用方法
4.1 表模型优化
避免使用复杂的MAP,STRUCT等类型。
字段类型要精确,例如是INTEGER型的,不要定义成STRING。精确的类型可以让计算机CPU更快的处理。
将数据分区,有助于减少查询时的数据量读取。例如按照天分区,那么按照时间跨度的查询,就可以通过排除掉一些分区,减少需要读取的文件数。
分区支持多层,即对多个字段分区,支持动态分区,使用动态分区请参考10.4.2.5 Multi Insert章节的注意事项;
单表的分区要尽量控制在1万以内,分区太多,会导致产生很多小文件,获取分区信息耗时等问题;
修改参数
参数名 | 优化描述 |
hive.exec.dynamic.partition.mode | 默认值strict,修改值nonstrict; |
hive.exec.max.dynamic.partitions | 默认值1000,最大动态partition的数量,可以根据分区的大小进行修改; |
hive.exec.max.created.files | 默认值100000,一个MR job中最多可以创建文件的数量; |
4.2 文件格式
表存储的文件格式有文本格式,有列存储类型的格式,对于Hive,建议使用列存储类型ORC File,虽然转化成ORC文件类型,需要多一些时间,但是,会让后边查询操作节省大量的时间。
修改参数
参数名 | 优化描述 |
orc.compress | 默认值ZLIB |
4.3 小文件
现在社区对于输入是小文件是没有自动合并功能,提供的合并命令,不够实用, 比如一个表有很多分区,需要对每个分区执行合并操作,我们暂时不建议用户用。
现在如果输入有小文件要合并,我们是建议用户先原始数据加载到hive表中,再启动一个MR从小文件临时表插入最终表,这个过程即解决小文件也解决文件存储格式(ORC + Snappy)。
对于输出,现在hive有参数控制多小的文件是小文件,对于输出的小文件是否要进行合并的参数,如下。
对于顺序执行的作业链,只有最后一张表的数据需要持久化,中间临时结果用完就删除的情况,可以在最后生成结果表之前开启下面参数,防止之前的作业也会生成合并任务使作业变慢。
修改参数
参数名 | 描述 |
hive.merge.mapfiles | 默认为 True,是否合并 Map 输出文件 |
hive.merge.mapredfiles | 默认为 False,是否合并 Reduce 输出文件 |
hive.merge.size.per.task | 默认256*1000*1000,合并后单个文件的大小 |
hive.merge.smallfiles.avgsize | 默认16 * 1000 * 1000,文件平均大小小于该值时,认为需要合并 |
4.4 压缩格式
通常来说压缩会对性能有提升,虽然消耗了一点CPU,但是节省了磁盘IO,节省了网络带宽。
对于ORC File文件,在定义表的时候,就指定了压缩类型。
对于中间结果,一般是Sequence File类型,因此可以指定中间文件的压缩类型和压缩算法;
修改参数
参数名 | 描述 |
hive.exec.compress.output | 默认是false;对于文本文件格式,可以指定修改为true;orc文件类型不受此参数控制; |
mapred.output.compression.codec | 默认是org.apache.hadoop.io.compress.DeflateCodec,建议改成org.apache.hadoop.io.compress.SnappyCodec |
hive.exec.compress.intermediate | 中间结果是否进行压缩,默认是false |
hive.intermediate.compression.codec | 默认为空,建议改成org.apache.hadoop.io.compress.SnappyCodec |
4.5 并行度控制
SQL会转换成Map和Reduce,Map的数量由总数据量除以Map处理的数据量来定,Reduce的数量是总数据量除以Reduce处理的数量,不超过最大值。
修改参数
参数名 | 描述 |
mapreduce.input.fileinputformat.split.maxsize | 默认256000000,map处理的最大数据量,一般不用改 |
hive.exec.reducers.bytes.per.reducer | 默认256000000,reduce处理的最大数据量,一般不用改 |
hive.exec.reducers.max | 默认1099,对于集群比较大的情况,可以适当改大。 |
4.6 Task内存
默认情况下,每个Map和Reduce使用的最大内存都是4GB,堆内存是3GB。
原因如下: (说明格式:虽然分配了4G,但有时候任务实际可能只会到2G)
hive当前每个map处理的数据量是这个参数mapreduce.input.fileinputformat.split.maxsize控制的, 默认是256MB, 通常情况下,这个256MB是压缩文件,由于压缩文件压缩率不定,解压后数据量不同,对内存就会有不同的需求。
Hive有Mapjoin,Mapjoin是把小表加载到Map的内存中, 需要估计好加载后的内存,防止加载到内存过大导致内存溢出。
数据可能有些微的倾斜情况,导致Reduce等处理的数据量变大,如果内存小会导致失败。
对于大内存机器,可以尽量使用大的内存降低从内存刷磁盘的频率,这样可以减少IO。正确的做法应该是基于job的情况设置合适的内存,但是考虑到这样业务就需要花费更多的时间去调优, 因此配置成一个更大一点,更通用的值,保证job运行的稳定性。