大数据集群二次开发及调优使用指导(三)-Hive

南墨2年前技术文章933

1.   业务调优:

Hive业务的业务主要以批量处理作业为主,批处理主要特点是耗时时间长,消耗的资源比较多,主要的调优和设计推荐如下:

1.       尽量使用ORC File 配上合适的压缩算法, 主要可选的压缩算法为ZlibSnappy。其中Zlib压缩比高,但压缩解压时间比Snappy长,消耗资源比如Snappy多。Snappy平衡了的压缩比和压缩解压的性能。推荐使用Snappy

2.       尽量使用Map Join减少Shuffle的次数,大幅提升性能

3.       不同SQL语句,完成同一个功能,生成Map Reduce的数量越少越好

4.       Hive系统默认是典型的配置场景,结合业务实际情况,可以做一些参数的调整,如文件块的大小,Map个数与Reduce的个数,压缩算法等。

5.       合理的使用分区,分区数量不要太多,查询的SQL尽量指定具体的分区值;

2.   二次开发业务应用指导

通常数据进入Hadoop的处理流程大概如下:

1.       导入原始数据到HDFS

原始业务的数据可能是在FTP服务器上,可能在本地磁盘上,可能在网络共享存储上, 可以通过sqoop组件,HDFS APIHDFS 命令行工具将原始数据导入HDFS,或者通过KafkaFlume等直接写入HDFS

2.       格式转换(可选):

如果HDFS的数据格式不是列格式,建议通过HiveSQL语句转换成列存储格式,如ORC格式,这样可以优化IO访问,合并小文件。

3.       针对性分析数据:

调用JDBC接口或者Beeline执行SQL语句对数据进行处理和转换,这里就是业务自己编写SQL语句,进行分析数据,生成结果。

4.       输出结果数据:

一般将结果数据导出到Relation Database,供其它的报表工具进一步使用,可以通过JDBC或直接从HDFS导出将数据转移到其它地方。

3.   Hive的HQL调优

这一节讲解SQL语句优化,进行性能调优的开发人员,需要知道一些通常的原理,比如一个group by是一个MapReduce,一个join是一个MapReduce,一个order by是一个MapReduce。所以当一个SQL慢的时候,需要查看执行计划进行相应的调优。

3.1     HQL执行计划

Hive作为处理SQL的层,具体自己独特的SQL执行计划,查看SQL的执行计划能够快速了解SQL的执行过程,对于性能的调优会有很大的帮助。

Hive提供了Explain命令显示查询计划,语法如下:

EXPLAIN [EXTEENDED] query

EXPLAIN语句使用EXTENDED提供执行计划关于操作的的额外信息,如文件名。

EXPLAIN输出包括三个部分:

1.    查询的抽象语法构

2.    执行计划和计划的不同阶段之间的依赖关系

3.    每个操作的描述,如FilterOperatorSelectOperatorFileSinkOperator

下面以例子的形式来看看Hive的执行计划的情况:

hive> explain select s.id, s.name from student s left outer join student_tmp st on s.name = st.name;
OK
ABSTRACT SYNTAX TREE:      #
抽象语法树
(TOK_QUERY (TOK_FROM (TOK_LEFTOUTERJOIN (TOK_TABREF (TOK_TABNAME student) s) (TOK_TABREF (TOK_TABNAME student_tmp) st) (= (. (TOK_TABLE_OR_COL s) name) (. (TOK_TABLE_OR_COL st) name)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL s) id)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL s) name)))))
 
STAGE DEPENDENCIES:       #依赖图
#这个sql将被分成两个阶段执行。基本上每个阶段会对应一个mapreduce jobStage-0除外。因为Stage-0只是fetch结果集,不需要mapreduce job
Stage-1 is a root stage
Stage-0 is a root stage
 
STAGE PLANS:
Stage: Stage-1
Map Reduce
Alias -> Map Operator Tree:  #map job开始
s
TableScan
alias: s  #扫描表student
Reduce Output Operator #这里描述map的输出,也就是reduce的输入。比如keypartitionsort等信息。
key expressions:   #reduce jobkey
expr: name
type: string
sort order: +  #这里表示按一个字段排序,如果是按两个字段排序,那么就会有两个+(++),更多以此类推
Map-reduce partition columns:
#partition的信息,由此也可以看出hivejoin的时候会以join on后的列作为partition的列,以#保证具有相同此列的值的行被分到同一个reduce中去
expr: name
type: string
tag: 0  #用于标示这个扫描的结果,后面的join会用到它
value expressions: #表示select 后面的列
expr: id
type: int
expr: name
type: string
st
TableScan  #开始扫描第二张表
alias: st
Reduce Output Operator
key expressions:
expr: name
type: string
sort order: +
Map-reduce partition columns:
expr: name
type: string
tag: 1
Reduce Operator Tree:  #reduce job开始
Join Operator
condition map:
Left Outer Join0 to 1  #tag 0 out join tag 1
condition expressions: #这里也是描述select 后的列。这里我们的select后的列是 s.id   s.name, #所以0后面有两个字段, 1后面没有
0 {VALUE._col0} {VALUE._col2}
1
handleSkewJoin: false
outputColumnNames: _col0, _col2
Select Operator
expressions:
expr: _col0
type: int
expr: _col2
type: string
outputColumnNames: _col0, _col1
File Output Operator
compressed: false
GlobalTableId: 0
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
 
Stage: Stage-0
Fetch Operator
limit: -1
 
Time taken: 0.216 seconds

3.2     除去多余的操作

CREATE TABLE clicks
(  timestamp date,
sessionID string,
url string,
source_ip string  )
STORED as ORC
tblproperties (“orc.compress" = "SNAPPY");

比如上边这个表,如果要获取每一个sessionID最新的访问记录,可以这样写:

SELECT clicks.*
FROM clicks inner join ( select sessionID, max(timestamp) as max_ts
from clicks
group by sessionID) latest
ON clicks.sessionID = latest.sessionID and clicks.timestamp = latest.max_ts;

语句解释:

查看执行计划会发现该语句会产生两个MapReduce,第一个MapReduce生成最新的sessionID信息表,第二MapReduce做一个Join,前面输出的结果与原表进行Join得到最后的结果。

再来看看下边的写法:

SELECT  *
FROM  (   SELECT  *,
RANK() over (partition by sessionID, order by timestamp desc) as rank
FROM clicks
)  ranked_clicks
WHERE ranked_clicks.rank=1;

语句解释:

查看执行计划会发现只会有一个MapReduce,该语句将数据读出来对sesssionID进行分组并按时间进行倒序排序,最后过滤出排序后的第一条记录。

比较第一个语句和第二个语句,发现第二个语句减少了一次MapReduce操作,性能自然大幅提升。很明显第一个语句多余执行了一次Join操作,第二语句以分组排序替代了Join,除去了不必要的Join操作,带来了性能的提升。

3.3     Distinct聚合优化

操作场景

SELECT COUNT( DISTINCT id ) FROM TABLE_NAME ;

优化前的问题:只有一个reduce处理全量数据,并发度不够,存在单点瓶颈。

换种写法,reduce就会有多个,性能提升很多。

SELECT COUNT(*) FROM (SELECT DISTINCT id FROM TABLE_NAME ) t;

3.4     Order By

操作场景

默认情况下,Order By会生成一个Reduce进行全局排序,所以,一般要求Order By要加上Limit来使用,或者修改为Distribute BySort By

对于必须进行全量,全局排序的,可以考虑对数据进行抽样后,通过Distribute BySort By进行并行排序。

修改参数

参数名

描述

hive.optimize.sampling.orderby

默认是false

hive.optimize.sampling.orderby.number

默认1000,抽取多少个样本

hive.optimize.sampling.orderby.percent

默认0.1,每条记录生成一个随机数,当随机数小于这个值时,选中该记录

 

3.5     Multi Insert

对于从同一份源表往不同的表中插数据,可以采用multi insert语法,减少job个数

FROM (SELECT a.status, b.school, b.gender
        FROM status_updates a JOIN profiles b
 
 
             ON (a.userid = b.userid anda.ds='2009-03-20' )
      ) subq1
 
 
INSERT OVERWRITE TABLE gender_summary PARTITION(ds='2009-03-20')
    SELECT subq1.gender, COUNT(1) GROUP BY subq1.gender
 
 
INSERT OVERWRITE TABLE school_summary PARTITION(ds='2009-03-20')
    SELECT subq1.school, COUNT(1) GROUP BY subq1.school

上述查询语句使用了Multi-Insert特性连续insert了两张不同的表,减少了一轮MapReduce操作。

3.6     动态分区+distribute by

一般在从textorc的时候,会用到动态分区,对于

INSERT OVERWRITE TABLE T1 PARTITION(P1)
 
      SELECT C1,C2,P1FROM T2

最好对分区字段加上distribute by,否则最坏的情况就是,一个Task包含每个分区的数据,往每个分区插入数据,导致Task的内存溢出。

INSERT OVERWRITE TABLE T1 PARTITION(P1)

     SELECT C1, C2, P1 FROM T2 DISTRIBUTE BY P1

4.   性能调优常用方法

4.1     表模型优化

a)       数据类型选择

避免使用复杂的MAPSTRUCT等类型。

字段类型要精确,例如是INTEGER型的,不要定义成STRING。精确的类型可以让计算机CPU更快的处理。

b)      表分区

将数据分区,有助于减少查询时的数据量读取。例如按照天分区,那么按照时间跨度的查询,就可以通过排除掉一些分区,减少需要读取的文件数。

分区支持多层,即对多个字段分区,支持动态分区,使用动态分区请参考10.4.2.5 Multi Insert章节的注意事项;

单表的分区要尽量控制在1万以内,分区太多,会导致产生很多小文件,获取分区信息耗时等问题;

修改参数

参数名

优化描述

hive.exec.dynamic.partition.mode

默认值strict,修改值nonstrict

hive.exec.max.dynamic.partitions

默认值1000,最大动态partition的数量,可以根据分区的大小进行修改;

hive.exec.max.created.files

默认值100000,一个MR job中最多可以创建文件的数量;

 

4.2     文件格式

表存储的文件格式有文本格式,有列存储类型的格式,对于Hive,建议使用列存储类型ORC File,虽然转化成ORC文件类型,需要多一些时间,但是,会让后边查询操作节省大量的时间。

修改参数

参数名

优化描述

orc.compress

默认值ZLIB

 

4.3     小文件

现在社区对于输入是小文件是没有自动合并功能,提供的合并命令,不够实用, 比如一个表有很多分区,需要对每个分区执行合并操作,我们暂时不建议用户用。

现在如果输入有小文件要合并,我们是建议用户先原始数据加载到hive表中,再启动一个MR从小文件临时表插入最终表,这个过程即解决小文件也解决文件存储格式(ORC + Snappy)。

对于输出,现在hive有参数控制多小的文件是小文件,对于输出的小文件是否要进行合并的参数,如下。

对于顺序执行的作业链,只有最后一张表的数据需要持久化,中间临时结果用完就删除的情况,可以在最后生成结果表之前开启下面参数,防止之前的作业也会生成合并任务使作业变慢。

修改参数

参数名

描述

hive.merge.mapfiles

默认为 True,是否合并 Map 输出文件

hive.merge.mapredfiles

默认为 False,是否合并 Reduce 输出文件

hive.merge.size.per.task

默认256*1000*1000,合并后单个文件的大小

hive.merge.smallfiles.avgsize

默认16 * 1000 * 1000,文件平均大小小于该值时,认为需要合并

 

4.4     压缩格式

通常来说压缩会对性能有提升,虽然消耗了一点CPU,但是节省了磁盘IO,节省了网络带宽。

对于ORC File文件,在定义表的时候,就指定了压缩类型。

对于中间结果,一般是Sequence File类型,因此可以指定中间文件的压缩类型和压缩算法;

修改参数

参数名

描述

hive.exec.compress.output

默认是false;对于文本文件格式,可以指定修改为trueorc文件类型不受此参数控制;

mapred.output.compression.codec

默认是org.apache.hadoop.io.compress.DeflateCodec,建议改成org.apache.hadoop.io.compress.SnappyCodec

hive.exec.compress.intermediate

中间结果是否进行压缩,默认是false

hive.intermediate.compression.codec

默认为空,建议改成org.apache.hadoop.io.compress.SnappyCodec

 

4.5     并行度控制

SQL会转换成MapReduceMap的数量由总数据量除以Map处理的数据量来定,Reduce的数量是总数据量除以Reduce处理的数量,不超过最大值。

修改参数

参数名

描述

mapreduce.input.fileinputformat.split.maxsize

默认256000000map处理的最大数据量,一般不用改

hive.exec.reducers.bytes.per.reducer

默认256000000reduce处理的最大数据量,一般不用改

hive.exec.reducers.max

默认1099,对于集群比较大的情况,可以适当改大。

 

4.6     Task内存

默认情况下,每个MapReduce使用的最大内存都是4GB,堆内存是3GB

原因如下: (说明格式:虽然分配了4G,但有时候任务实际可能只会到2G

hive当前每个map处理的数据量是这个参数mapreduce.input.fileinputformat.split.maxsize控制的, 默认是256MB 通常情况下,这个256MB是压缩文件,由于压缩文件压缩率不定,解压后数据量不同,对内存就会有不同的需求。

HiveMapjoinMapjoin是把小表加载到Map的内存中, 需要估计好加载后的内存,防止加载到内存过大导致内存溢出。

数据可能有些微的倾斜情况,导致Reduce等处理的数据量变大,如果内存小会导致失败。

对于大内存机器,可以尽量使用大的内存降低从内存刷磁盘的频率,这样可以减少IO。正确的做法应该是基于job的情况设置合适的内存,但是考虑到这样业务就需要花费更多的时间去调优, 因此配置成一个更大一点,更通用的值,保证job运行的稳定性。


相关文章

Clickhouse表引擎介绍

Clickhouse表引擎介绍

引擎分类ClickHouse表引擎一共分为四个系列,分别是Log、MergeTree、Integration、Special。其中包含了两种特殊的表引擎Replicated、Distributed,功...

Serverless 技术选型

Serverless 技术选型

在 Serverless 这个大领域中,不只有函数计算这一种产品形态和应用类型,而是面向不同的用户群体和使用习惯,都有其各自适用的 Serverless 产品。例如面向函数的函数计算、面向应用的 Se...

CDH-集群节点下线

CDH-集群节点下线

1、前期准备确认下线节点确认节点组件信息确认下线节点数据存储大小确定剩余节点存储大小如果下线节点数据存储大小大于剩余节点存储大小,则不能进行下线,可能存在数据丢失的情况2、操作首先确认待下线节点中是否...

hive元数据迁移

hive元数据迁移

一、在新集群中创建hive数据库,作为新集群中的元数据库。注意点:创建hive数据库时注意用户和用户的权限及使用的编码格式一致。查看旧集群中角色权限和编码格式,在新的hive元数据库中设置相同的角色权...

mysql双主更改为主从架构分析

mysql双主更改为主从架构分析

客户需求客户业务运行的在mysql双主架构上,因为客户经常误操作触发双写,导致数据不一致,对业务的稳定运行造成加大的影响。客户现有数据库架构图解决方案基于客户业务和底层数据库架构实际情况,云掣科技提供...

keycloak高可用部署

keycloak高可用部署

添加keycloak应用rancher应用商店模式添加keycloak仓库地址rancher应用商店添加bitnami的helm仓库地址https://charts.bitnami.com/bitna...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。