Spark优化之配置参数

九月1年前技术文章688

一、资源参数优化

所谓的Spark资源参数调优,其实主要就是对Spark运行过程中各 个使用资源的地方,通过调节各种参数,来优化资源使用的效率,从而提升Spark作业的执行性能。以下参数就是Spark中主要的资源参数,每个参数都对 应着作业运行原理中的某个部分,我们同时也给出了一个调优的参考值。

num-executors

  • 参数说明:该参数用于设置Spark作业总共要用多少个Executor进程来执行。Driver在向YARN集群管理器申请资源时,YARN集 群管理器会尽可能按照你的设置来在集群的各个工作节点上,启动相应数量的Executor进程。这个参数非常之重要,如果不设置的话,默认只会给你启动少 量的Executor进程,此时你的Spark作业的运行速度是非常慢的。

  • 参数调优建议:每个Spark作业的运行一般设置50~100个左右的Executor进程比较合适,设置太少或太多的Executor进程都不好。设置的太少,无法充分利用集群资源;设置的太多的话,大部分队列可能无法给予充分的资源。

executor-memory

  • 参数说明:该参数用于设置每个Executor进程的内存。Executor内存的大小,很多时候直接决定了Spark作业的性能,而且跟常见的JVM OOM异常,也有直接的关联。

  • 参数调优建议:每个Executor进程的内存设置4G~8G较为合适。但是这只是一个参考值,具体的设置还是得根据不同部门的资源队列来定。可 以看看自己团队的资源队列的最大内存限制是多少,num-executors乘以executor-memory,就代表了你的Spark作业申请到的总 内存量(也就是所有Executor进程的内存总和),这个量是不能超过队列的最大内存量的。此外,如果你是跟团队里其他人共享这个资源队列,那么申请的 总内存量最好不要超过资源队列最大总内存的1/3~1/2,避免你自己的Spark作业占用了队列所有的资源,导致别的同学的作业无法运行。

executor-cores

  • 参数说明:该参数用于设置每个Executor进程的CPU core数量。这个参数决定了每个Executor进程并行执行task线程的能力。因为每个CPU core同一时间只能执行一个task线程,因此每个Executor进程的CPU core数量越多,越能够快速地执行完分配给自己的所有task线程。

  • 参数调优建议:Executor的CPU core数量设置为2~4个较为合适。同样得根据不同部门的资源队列来定,可以看看自己的资源队列的最大CPU core限制是多少,再依据设置的Executor数量,来决定每个Executor进程可以分配到几个CPU core。同样建议,如果是跟他人共享这个队列,那么num-executors * executor-cores不要超过队列总CPU core的1/3~1/2左右比较合适,也是避免影响其他同学的作业运行。

driver-memory

  • 参数说明:该参数用于设置Driver进程的内存。

  • 参数调优建议:Driver的内存通常来说不设置,或者设置1G左右应该就够了。唯一需要注意的一点是,如果需要使用collect算子将RDD的数据全部拉取到Driver上进行处理,那么必须确保Driver的内存足够大,否则会出现OOM内存溢出的问题。

spark.default.parallelism

  • 参数说明:该参数用于设置每个stage的默认task数量。这个参数极为重要,如果不设置可能会直接影响你的Spark作业性能。

  • 参数调优建议:Spark作业的默认task数量为500~1000个较为合适。很多同学常犯的一个错误就是不去设置这个参数,那么此时就会导致 Spark自己根据底层HDFS的block数量来设置task的数量,默认是一个HDFS block对应一个task。通常来说,Spark默认设置的数量是偏少的(比如就几十个task),如果task数量偏少的话,就会导致你前面设置好的 Executor的参数都前功尽弃。试想一下,无论你的Executor进程有多少个,内存和CPU有多大,但是task只有1个或者10个,那么90% 的Executor进程可能根本就没有task执行,也就是白白浪费了资源!因此Spark官网建议的设置原则是,设置该参数为num- executors * executor-cores的2~3倍较为合适,比如Executor的总CPU core数量为300个,那么设置1000个task是可以的,此时可以充分地利用Spark集群的资源。

spark.storage.memoryFraction

  • 参数说明:该参数用于设置RDD持久化数据在Executor内存中能占的比例,默认是0.6。也就是说,默认Executor 60%的内存,可以用来保存持久化的RDD数据。根据你选择的不同的持久化策略,如果内存不够时,可能数据就不会持久化,或者数据会写入磁盘。

  • 参数调优建议:如果Spark作业中,有较多的RDD持久化操作,该参数的值可以适当提高一些,保证持久化的数据能够容纳在内存中。避免内存不够 缓存所有的数据,导致数据只能写入磁盘中,降低了性能。但是如果Spark作业中的shuffle类操作比较多,而持久化操作比较少,那么这个参数的值适 当降低一些比较合适。此外,如果发现作业由于频繁的gc导致运行缓慢(通过spark web ui可以观察到作业的gc耗时),意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。

spark.shuffle.memoryFraction

  • 参数说明:该参数用于设置shuffle过程中一个task拉取到上个stage的task的输出后,进行聚合操作时能够使用的Executor 内存的比例,默认是0.2。也就是说,Executor默认只有20%的内存用来进行该操作。shuffle操作在进行聚合时,如果发现使用的内存超出了 这个20%的限制,那么多余的数据就会溢写到磁盘文件中去,此时就会极大地降低性能。

  • 参数调优建议:如果Spark作业中的RDD持久化操作较少,shuffle操作较多时,建议降低持久化操作的内存占比,提高shuffle操作 的内存占比比例,避免shuffle过程中数据过多时内存不够用,必须溢写到磁盘上,降低了性能。此外,如果发现作业由于频繁的gc导致运行缓慢,意味着 task执行用户代码的内存不够用,那么同样建议调低这个参数的值。

资源参数的调优,没有一个固定的值,需要同学们根据自己的实际情况(包括Spark作业中的shuffle操作数量、RDD持久化操作数量以及spark web ui中显示的作业gc情况),同时参考本篇文章中给出的原理以及调优建议,合理地设置上述参数。


资源参数参考示例

以下是一份spark-submit命令的示例,大家可以参考一下,并根据自己的实际情况进行调节:

./bin/spark-submit \
  --master yarn-cluster \
  --num-executors 100 \
  --executor-memory 6G \
  --executor-cores 4 \
  --driver-memory 1G \
  --conf spark.default.parallelism=1000 \
  --conf spark.storage.memoryFraction=0.5 \
  --conf spark.shuffle.memoryFraction=0.3 \

二、扩展spark driver

动态资源分配

在 Facebook,Spark 集群启用了动态资源分配(Dynamic Executor Allocation),以便更好的使用集群资源,而且在 Facebook 内部,Spark 是运行在多租户的集群上,所以这个也是非常合适的。比如典型的配置如下:

  • spark.dynamicAllocation.enabled = true

  • spark.dynamicAllocation.executorIdleTimeout = 2m

  • spark.dynamicAllocation.minExecutors = 1

  • spark.dynamicAllocation.maxExecutors = 2000

图1.png

多线程事件处理

在 Spark 2.3 版本之前,事件处理是单线程的架构,也就是说,事件队列里面的事件得一个一个处理。如果你的作业很大,并且有很多 tasks,很可能会导致事件处理出现延迟,进一步导致作业性能出现问题,甚至使当前作业失败。为了解决这个问题,SPARK-18838 这个 ISSUE 引入了多线程事件处理架构,每个事件都有其单独的单线程 executor service 去处理,这样就可以大大减少事件处理延时的问题。另外,由于每类事件都有单独的事件队列,所以会增加 Driver 端的内存占用。

图2.png




更好的 Fetch 失败处理

在 Spark 2.3 版本之前,如果 Spark 探测到 fetch failure,那么它会把产生这个 shuffle 文件的 Executor 移除掉。但是如果这个 Executor 所在的机器有很多 Executor,而且是因为这台机器挂掉导致 fetch failure,那么会导致很多的 fetch 重试,这种处理机制很低下。SPARK-19753 这个 ISSUE 使得 Spark 可以把上述场景所有 Executor 的 shuffle 文件移除,也就是不再去重试就知道 shuffle 文件不可用。


图3.png

另外,Spark 最大 Fetch 重试次数也可以通过 spark.max.fetch.failures.per.stage 参数配置。

FetchFailed 会在 ShuffleReader 取数据失败 N 次后抛出,然后由 executor 通过 statusUpdate 传到 driver 端,实际的处理会在 DAGScheduler.handleTaskCompletion,它会重新提交该 Stage 和该 Stage 对应的 ShuffleMapStage,重试次数超过 spark.stage.maxConsecutiveAttempts 时会退出。

RPC 服务线程调优

当 Spark 同时运行大量的 tasks 时,Driver 很容易出现 OOM,这是因为在 Driver 端的 Netty 服务器上产生大量 RPC 的请求积压,我们可以通过加大 RPC 服务的线程数解决 OOM 问题,比如 spark.rpc.io.serverThreads = 64


三、spark shuffle相关的参数调优

spark.shuffle.file.buffer

默认值:32k

参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。

调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。

 

spark.reducer.maxSizeInFlight

默认值:48m

参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。

调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。

 

spark.shuffle.io.maxRetries

默认值:3

参数说明:shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败。

调优建议:对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性。

 

spark.shuffle.io.retryWait

默认值:5s

参数说明:具体解释同上,该参数代表了每次重试拉取数据的等待间隔,默认是5s。

调优建议:建议加大间隔时长(比如60s),以增加shuffle操作的稳定性。

 

spark.shuffle.memoryFraction

默认值:0.2

参数说明:该参数代表了Executor内存中,分配给shuffle read task进行聚合操作的内存比例,默认是20%。

调优建议:如果内存充足,而且很少使用持久化操作,建议调高这个比例,给shuffle read的聚合操作更多内存,以避免由于内存不足导致聚合过程中频繁读写磁盘。在实践中发现,合理调节该参数可以将性能提升10%左右。

spark.shuffle.manager

默认值:sort

参数说明:该参数用于设置ShuffleManager的类型。Spark 1.5以后,有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项,但是Spark 1.2以及之后的版本默认都是SortShuffleManager了。tungsten-sort与sort类似,但是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。

调优建议:由于SortShuffleManager默认会对数据进行排序,因此如果你的业务逻辑中需要该排序机制的话,则使用默认的SortShuffleManager就可以;而如果你的业务逻辑不需要对数据进行排序,那么建议参考后面的几个参数调优,通过bypass机制或优化的HashShuffleManager来避免排序操作,同时提供较好的磁盘读写性能。这里要注意的是,tungsten-sort要慎用,因为之前发现了一些相应的bug。

spark.shuffle.sort.bypassMergeThreshold

默认值:200

参数说明:当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。

调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高。

spark.shuffle.consolidateFiles

默认值:false

参数说明:如果使用HashShuffleManager,该参数有效。如果设置为true,那么就会开启consolidate机制,会大幅度合并shuffle write的输出文件,对于shuffle read task数量特别多的情况下,这种方法可以极大地减少磁盘IO开销,提升性能。

调优建议:如果的确不需要SortShuffleManager的排序机制,那么除了使用bypass机制,还可以尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%。

spark.reducer.maxBlocksInFlightPerAddress

默认值:Int.MaxValue(2的31次方-1)

限制了每个主机每次reduce可以被多少台远程主机拉取文件块,调低这个参数可以有效减轻node manager的负载。

spark.reducer.maxReqsInFlight

默认值:Int.MaxValue(2的31次方-1)

限制远程机器拉取本机器文件块的请求数,随着集群增大,需要对此做出限制。否则可能会使本机负载过大而挂掉。。

spark.reducer.maxReqSizeShuffleToMem

默认值:Long.MaxValue

shuffle请求的文件块大小 超过这个参数值,就会被强行落盘,防止一大堆并发请求把内存占满。

spark.shuffle.compress

默认压缩 true

是否压缩map输出文件

spark.shuffle.spill.compress

默认:true

shuffle过程中溢出的文件是否压缩,使用spark.io.compression.codec压缩。



相关文章

 企业级大数据安全架构(七)

企业级大数据安全架构(七)

在企业级大数据安全方案中,本节主要介绍服务安全问题,引入kerberos认证机制,目前直接对接kerberos使用较多,这里我们使用FreeIPA来集成kerberosFreeIPA官网下载地址:ht...

SQL隐式转换导致索引失效_函数

SQL隐式转换导致索引失效_函数

一、隐式转换分类1.函数2.数据类型3.字符集4.校验规则二、常见案例本节将会针对第一部分提到的四种隐式转换内容,举例说明。1.索引列使用函数导致索引失效示例 SQL 如下,该 SQL 的 where...

数据湖技术之iceberg(四)iceberg的数据类型

1  Iceberg数据类型类型描述笔记boolean布尔类型,true或者falseint32 位有符号整数可以转换成long类型long64 位有符号整数float单精度浮点型可以转换成...

netca报错UnsatisfiedLinkError exception loading native library

1、netca报错:UnsatisfiedLinkError exception loading native library: njni11报错:[oracle@test-db ~]$ netca...

SonarQube 代码质量平台

SonarQube 代码质量平台

官网:https://www.sonarqube.org/SonarQube 是一个开源的代码质量管理系统。可以对代码进行自动审查,检测代码中的错误、漏洞和代码味道。它可以与您现有的工作流程集成,以实...

Oracle上云找云掣

背景介绍:随着公有云技术成熟、稳定,越来被大中企业接受。自建IDC机房大成本投入终被云的高效方便稳定所替代。企业基于0racle核心的业务系统势必上云,企业面临0racle如何上云,上云后如何保障数据...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。