大数据集群二次开发及调优使用指导(三)-Hive

南墨10个月前技术文章286

1.   业务调优:

Hive业务的业务主要以批量处理作业为主,批处理主要特点是耗时时间长,消耗的资源比较多,主要的调优和设计推荐如下:

1.       尽量使用ORC File 配上合适的压缩算法, 主要可选的压缩算法为ZlibSnappy。其中Zlib压缩比高,但压缩解压时间比Snappy长,消耗资源比如Snappy多。Snappy平衡了的压缩比和压缩解压的性能。推荐使用Snappy

2.       尽量使用Map Join减少Shuffle的次数,大幅提升性能

3.       不同SQL语句,完成同一个功能,生成Map Reduce的数量越少越好

4.       Hive系统默认是典型的配置场景,结合业务实际情况,可以做一些参数的调整,如文件块的大小,Map个数与Reduce的个数,压缩算法等。

5.       合理的使用分区,分区数量不要太多,查询的SQL尽量指定具体的分区值;

2.   二次开发业务应用指导

通常数据进入Hadoop的处理流程大概如下:

1.       导入原始数据到HDFS

原始业务的数据可能是在FTP服务器上,可能在本地磁盘上,可能在网络共享存储上, 可以通过sqoop组件,HDFS APIHDFS 命令行工具将原始数据导入HDFS,或者通过KafkaFlume等直接写入HDFS

2.       格式转换(可选):

如果HDFS的数据格式不是列格式,建议通过HiveSQL语句转换成列存储格式,如ORC格式,这样可以优化IO访问,合并小文件。

3.       针对性分析数据:

调用JDBC接口或者Beeline执行SQL语句对数据进行处理和转换,这里就是业务自己编写SQL语句,进行分析数据,生成结果。

4.       输出结果数据:

一般将结果数据导出到Relation Database,供其它的报表工具进一步使用,可以通过JDBC或直接从HDFS导出将数据转移到其它地方。

3.   Hive的HQL调优

这一节讲解SQL语句优化,进行性能调优的开发人员,需要知道一些通常的原理,比如一个group by是一个MapReduce,一个join是一个MapReduce,一个order by是一个MapReduce。所以当一个SQL慢的时候,需要查看执行计划进行相应的调优。

3.1     HQL执行计划

Hive作为处理SQL的层,具体自己独特的SQL执行计划,查看SQL的执行计划能够快速了解SQL的执行过程,对于性能的调优会有很大的帮助。

Hive提供了Explain命令显示查询计划,语法如下:

EXPLAIN [EXTEENDED] query

EXPLAIN语句使用EXTENDED提供执行计划关于操作的的额外信息,如文件名。

EXPLAIN输出包括三个部分:

1.    查询的抽象语法构

2.    执行计划和计划的不同阶段之间的依赖关系

3.    每个操作的描述,如FilterOperatorSelectOperatorFileSinkOperator

下面以例子的形式来看看Hive的执行计划的情况:

hive> explain select s.id, s.name from student s left outer join student_tmp st on s.name = st.name;
OK
ABSTRACT SYNTAX TREE:      #
抽象语法树
(TOK_QUERY (TOK_FROM (TOK_LEFTOUTERJOIN (TOK_TABREF (TOK_TABNAME student) s) (TOK_TABREF (TOK_TABNAME student_tmp) st) (= (. (TOK_TABLE_OR_COL s) name) (. (TOK_TABLE_OR_COL st) name)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL s) id)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL s) name)))))
 
STAGE DEPENDENCIES:       #依赖图
#这个sql将被分成两个阶段执行。基本上每个阶段会对应一个mapreduce jobStage-0除外。因为Stage-0只是fetch结果集,不需要mapreduce job
Stage-1 is a root stage
Stage-0 is a root stage
 
STAGE PLANS:
Stage: Stage-1
Map Reduce
Alias -> Map Operator Tree:  #map job开始
s
TableScan
alias: s  #扫描表student
Reduce Output Operator #这里描述map的输出,也就是reduce的输入。比如keypartitionsort等信息。
key expressions:   #reduce jobkey
expr: name
type: string
sort order: +  #这里表示按一个字段排序,如果是按两个字段排序,那么就会有两个+(++),更多以此类推
Map-reduce partition columns:
#partition的信息,由此也可以看出hivejoin的时候会以join on后的列作为partition的列,以#保证具有相同此列的值的行被分到同一个reduce中去
expr: name
type: string
tag: 0  #用于标示这个扫描的结果,后面的join会用到它
value expressions: #表示select 后面的列
expr: id
type: int
expr: name
type: string
st
TableScan  #开始扫描第二张表
alias: st
Reduce Output Operator
key expressions:
expr: name
type: string
sort order: +
Map-reduce partition columns:
expr: name
type: string
tag: 1
Reduce Operator Tree:  #reduce job开始
Join Operator
condition map:
Left Outer Join0 to 1  #tag 0 out join tag 1
condition expressions: #这里也是描述select 后的列。这里我们的select后的列是 s.id   s.name, #所以0后面有两个字段, 1后面没有
0 {VALUE._col0} {VALUE._col2}
1
handleSkewJoin: false
outputColumnNames: _col0, _col2
Select Operator
expressions:
expr: _col0
type: int
expr: _col2
type: string
outputColumnNames: _col0, _col1
File Output Operator
compressed: false
GlobalTableId: 0
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
 
Stage: Stage-0
Fetch Operator
limit: -1
 
Time taken: 0.216 seconds

3.2     除去多余的操作

CREATE TABLE clicks
(  timestamp date,
sessionID string,
url string,
source_ip string  )
STORED as ORC
tblproperties (“orc.compress" = "SNAPPY");

比如上边这个表,如果要获取每一个sessionID最新的访问记录,可以这样写:

SELECT clicks.*
FROM clicks inner join ( select sessionID, max(timestamp) as max_ts
from clicks
group by sessionID) latest
ON clicks.sessionID = latest.sessionID and clicks.timestamp = latest.max_ts;

语句解释:

查看执行计划会发现该语句会产生两个MapReduce,第一个MapReduce生成最新的sessionID信息表,第二MapReduce做一个Join,前面输出的结果与原表进行Join得到最后的结果。

再来看看下边的写法:

SELECT  *
FROM  (   SELECT  *,
RANK() over (partition by sessionID, order by timestamp desc) as rank
FROM clicks
)  ranked_clicks
WHERE ranked_clicks.rank=1;

语句解释:

查看执行计划会发现只会有一个MapReduce,该语句将数据读出来对sesssionID进行分组并按时间进行倒序排序,最后过滤出排序后的第一条记录。

比较第一个语句和第二个语句,发现第二个语句减少了一次MapReduce操作,性能自然大幅提升。很明显第一个语句多余执行了一次Join操作,第二语句以分组排序替代了Join,除去了不必要的Join操作,带来了性能的提升。

3.3     Distinct聚合优化

操作场景

SELECT COUNT( DISTINCT id ) FROM TABLE_NAME ;

优化前的问题:只有一个reduce处理全量数据,并发度不够,存在单点瓶颈。

换种写法,reduce就会有多个,性能提升很多。

SELECT COUNT(*) FROM (SELECT DISTINCT id FROM TABLE_NAME ) t;

3.4     Order By

操作场景

默认情况下,Order By会生成一个Reduce进行全局排序,所以,一般要求Order By要加上Limit来使用,或者修改为Distribute BySort By

对于必须进行全量,全局排序的,可以考虑对数据进行抽样后,通过Distribute BySort By进行并行排序。

修改参数

参数名

描述

hive.optimize.sampling.orderby

默认是false

hive.optimize.sampling.orderby.number

默认1000,抽取多少个样本

hive.optimize.sampling.orderby.percent

默认0.1,每条记录生成一个随机数,当随机数小于这个值时,选中该记录

 

3.5     Multi Insert

对于从同一份源表往不同的表中插数据,可以采用multi insert语法,减少job个数

FROM (SELECT a.status, b.school, b.gender
        FROM status_updates a JOIN profiles b
 
 
             ON (a.userid = b.userid anda.ds='2009-03-20' )
      ) subq1
 
 
INSERT OVERWRITE TABLE gender_summary PARTITION(ds='2009-03-20')
    SELECT subq1.gender, COUNT(1) GROUP BY subq1.gender
 
 
INSERT OVERWRITE TABLE school_summary PARTITION(ds='2009-03-20')
    SELECT subq1.school, COUNT(1) GROUP BY subq1.school

上述查询语句使用了Multi-Insert特性连续insert了两张不同的表,减少了一轮MapReduce操作。

3.6     动态分区+distribute by

一般在从textorc的时候,会用到动态分区,对于

INSERT OVERWRITE TABLE T1 PARTITION(P1)
 
      SELECT C1,C2,P1FROM T2

最好对分区字段加上distribute by,否则最坏的情况就是,一个Task包含每个分区的数据,往每个分区插入数据,导致Task的内存溢出。

INSERT OVERWRITE TABLE T1 PARTITION(P1)

     SELECT C1, C2, P1 FROM T2 DISTRIBUTE BY P1

4.   性能调优常用方法

4.1     表模型优化

a)       数据类型选择

避免使用复杂的MAPSTRUCT等类型。

字段类型要精确,例如是INTEGER型的,不要定义成STRING。精确的类型可以让计算机CPU更快的处理。

b)      表分区

将数据分区,有助于减少查询时的数据量读取。例如按照天分区,那么按照时间跨度的查询,就可以通过排除掉一些分区,减少需要读取的文件数。

分区支持多层,即对多个字段分区,支持动态分区,使用动态分区请参考10.4.2.5 Multi Insert章节的注意事项;

单表的分区要尽量控制在1万以内,分区太多,会导致产生很多小文件,获取分区信息耗时等问题;

修改参数

参数名

优化描述

hive.exec.dynamic.partition.mode

默认值strict,修改值nonstrict

hive.exec.max.dynamic.partitions

默认值1000,最大动态partition的数量,可以根据分区的大小进行修改;

hive.exec.max.created.files

默认值100000,一个MR job中最多可以创建文件的数量;

 

4.2     文件格式

表存储的文件格式有文本格式,有列存储类型的格式,对于Hive,建议使用列存储类型ORC File,虽然转化成ORC文件类型,需要多一些时间,但是,会让后边查询操作节省大量的时间。

修改参数

参数名

优化描述

orc.compress

默认值ZLIB

 

4.3     小文件

现在社区对于输入是小文件是没有自动合并功能,提供的合并命令,不够实用, 比如一个表有很多分区,需要对每个分区执行合并操作,我们暂时不建议用户用。

现在如果输入有小文件要合并,我们是建议用户先原始数据加载到hive表中,再启动一个MR从小文件临时表插入最终表,这个过程即解决小文件也解决文件存储格式(ORC + Snappy)。

对于输出,现在hive有参数控制多小的文件是小文件,对于输出的小文件是否要进行合并的参数,如下。

对于顺序执行的作业链,只有最后一张表的数据需要持久化,中间临时结果用完就删除的情况,可以在最后生成结果表之前开启下面参数,防止之前的作业也会生成合并任务使作业变慢。

修改参数

参数名

描述

hive.merge.mapfiles

默认为 True,是否合并 Map 输出文件

hive.merge.mapredfiles

默认为 False,是否合并 Reduce 输出文件

hive.merge.size.per.task

默认256*1000*1000,合并后单个文件的大小

hive.merge.smallfiles.avgsize

默认16 * 1000 * 1000,文件平均大小小于该值时,认为需要合并

 

4.4     压缩格式

通常来说压缩会对性能有提升,虽然消耗了一点CPU,但是节省了磁盘IO,节省了网络带宽。

对于ORC File文件,在定义表的时候,就指定了压缩类型。

对于中间结果,一般是Sequence File类型,因此可以指定中间文件的压缩类型和压缩算法;

修改参数

参数名

描述

hive.exec.compress.output

默认是false;对于文本文件格式,可以指定修改为trueorc文件类型不受此参数控制;

mapred.output.compression.codec

默认是org.apache.hadoop.io.compress.DeflateCodec,建议改成org.apache.hadoop.io.compress.SnappyCodec

hive.exec.compress.intermediate

中间结果是否进行压缩,默认是false

hive.intermediate.compression.codec

默认为空,建议改成org.apache.hadoop.io.compress.SnappyCodec

 

4.5     并行度控制

SQL会转换成MapReduceMap的数量由总数据量除以Map处理的数据量来定,Reduce的数量是总数据量除以Reduce处理的数量,不超过最大值。

修改参数

参数名

描述

mapreduce.input.fileinputformat.split.maxsize

默认256000000map处理的最大数据量,一般不用改

hive.exec.reducers.bytes.per.reducer

默认256000000reduce处理的最大数据量,一般不用改

hive.exec.reducers.max

默认1099,对于集群比较大的情况,可以适当改大。

 

4.6     Task内存

默认情况下,每个MapReduce使用的最大内存都是4GB,堆内存是3GB

原因如下: (说明格式:虽然分配了4G,但有时候任务实际可能只会到2G

hive当前每个map处理的数据量是这个参数mapreduce.input.fileinputformat.split.maxsize控制的, 默认是256MB 通常情况下,这个256MB是压缩文件,由于压缩文件压缩率不定,解压后数据量不同,对内存就会有不同的需求。

HiveMapjoinMapjoin是把小表加载到Map的内存中, 需要估计好加载后的内存,防止加载到内存过大导致内存溢出。

数据可能有些微的倾斜情况,导致Reduce等处理的数据量变大,如果内存小会导致失败。

对于大内存机器,可以尽量使用大的内存降低从内存刷磁盘的频率,这样可以减少IO。正确的做法应该是基于job的情况设置合适的内存,但是考虑到这样业务就需要花费更多的时间去调优, 因此配置成一个更大一点,更通用的值,保证job运行的稳定性。


相关文章

Go 链表的实现

Go 链表的实现

链表是一种物理存储单元上非连续、非顺序的存储结构,数据元素的逻辑顺序是通过链表中的指针链接次序实现的。链表由一系列结点(链表中每一个元素称为结点)组成,结点可以在运行时动态生成。每个结点包括两个部分:...

prometheus黑盒监控

prometheus黑盒监控

黑盒监控即以用户的身份测试服务的外部可见性,常见的黑盒监控包括 HTTP探针、TCP探针、Dns、Icmp等用于检测站点、服务的可访问性、服务的连通性,以及访问效率等。prometheus提供了bla...

企业级大数据安全架构(十)

企业级大数据安全架构(十)

一、DBeaver连接Kerberos认证下的hive1.配置本地hosts因为Kerberos认证过程及集群服务中,很多是以主机名的形式进行访问的,所以工作机要设置hosts. 域名映射,我们通过部...

MySQL性能优化(三)函数运算导致无法使用索引

MySQL性能优化(三)函数运算导致无法使用索引

有时侯我们会遇到这样的情况:明明字段上已经建立了索引,但是查询还是无法使用索引。其中有一种情况是因为SQL中对索引字段进行了运算。一个例子select * from us...

Redis 慢查询相关配置

Redis 慢查询相关配置

一、查询生命周期一条查询的生命周期:发送命令命令排队执行命令返回结果Redis 慢日志只统计 “执行命令” 步骤 3 的耗时,所以没有慢查询并不代表客户端没有超时问题。二、慢日志配置参数慢日志相关的参...

MySQL 使用开源审计插件

MySQL 使用开源审计插件

前言MySQL 只有企业版有审计插件,开源社区版没有审计插件。企业要通过等保需要开通审计,这里记录使用 MariaDB 开源审计插件,让 MySQL 社区版拥有审计功能。1. 审计插件下载审计插件是包...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。