Spark优化之配置参数

九月9个月前技术文章304

一、资源参数优化

所谓的Spark资源参数调优,其实主要就是对Spark运行过程中各 个使用资源的地方,通过调节各种参数,来优化资源使用的效率,从而提升Spark作业的执行性能。以下参数就是Spark中主要的资源参数,每个参数都对 应着作业运行原理中的某个部分,我们同时也给出了一个调优的参考值。

num-executors

  • 参数说明:该参数用于设置Spark作业总共要用多少个Executor进程来执行。Driver在向YARN集群管理器申请资源时,YARN集 群管理器会尽可能按照你的设置来在集群的各个工作节点上,启动相应数量的Executor进程。这个参数非常之重要,如果不设置的话,默认只会给你启动少 量的Executor进程,此时你的Spark作业的运行速度是非常慢的。

  • 参数调优建议:每个Spark作业的运行一般设置50~100个左右的Executor进程比较合适,设置太少或太多的Executor进程都不好。设置的太少,无法充分利用集群资源;设置的太多的话,大部分队列可能无法给予充分的资源。

executor-memory

  • 参数说明:该参数用于设置每个Executor进程的内存。Executor内存的大小,很多时候直接决定了Spark作业的性能,而且跟常见的JVM OOM异常,也有直接的关联。

  • 参数调优建议:每个Executor进程的内存设置4G~8G较为合适。但是这只是一个参考值,具体的设置还是得根据不同部门的资源队列来定。可 以看看自己团队的资源队列的最大内存限制是多少,num-executors乘以executor-memory,就代表了你的Spark作业申请到的总 内存量(也就是所有Executor进程的内存总和),这个量是不能超过队列的最大内存量的。此外,如果你是跟团队里其他人共享这个资源队列,那么申请的 总内存量最好不要超过资源队列最大总内存的1/3~1/2,避免你自己的Spark作业占用了队列所有的资源,导致别的同学的作业无法运行。

executor-cores

  • 参数说明:该参数用于设置每个Executor进程的CPU core数量。这个参数决定了每个Executor进程并行执行task线程的能力。因为每个CPU core同一时间只能执行一个task线程,因此每个Executor进程的CPU core数量越多,越能够快速地执行完分配给自己的所有task线程。

  • 参数调优建议:Executor的CPU core数量设置为2~4个较为合适。同样得根据不同部门的资源队列来定,可以看看自己的资源队列的最大CPU core限制是多少,再依据设置的Executor数量,来决定每个Executor进程可以分配到几个CPU core。同样建议,如果是跟他人共享这个队列,那么num-executors * executor-cores不要超过队列总CPU core的1/3~1/2左右比较合适,也是避免影响其他同学的作业运行。

driver-memory

  • 参数说明:该参数用于设置Driver进程的内存。

  • 参数调优建议:Driver的内存通常来说不设置,或者设置1G左右应该就够了。唯一需要注意的一点是,如果需要使用collect算子将RDD的数据全部拉取到Driver上进行处理,那么必须确保Driver的内存足够大,否则会出现OOM内存溢出的问题。

spark.default.parallelism

  • 参数说明:该参数用于设置每个stage的默认task数量。这个参数极为重要,如果不设置可能会直接影响你的Spark作业性能。

  • 参数调优建议:Spark作业的默认task数量为500~1000个较为合适。很多同学常犯的一个错误就是不去设置这个参数,那么此时就会导致 Spark自己根据底层HDFS的block数量来设置task的数量,默认是一个HDFS block对应一个task。通常来说,Spark默认设置的数量是偏少的(比如就几十个task),如果task数量偏少的话,就会导致你前面设置好的 Executor的参数都前功尽弃。试想一下,无论你的Executor进程有多少个,内存和CPU有多大,但是task只有1个或者10个,那么90% 的Executor进程可能根本就没有task执行,也就是白白浪费了资源!因此Spark官网建议的设置原则是,设置该参数为num- executors * executor-cores的2~3倍较为合适,比如Executor的总CPU core数量为300个,那么设置1000个task是可以的,此时可以充分地利用Spark集群的资源。

spark.storage.memoryFraction

  • 参数说明:该参数用于设置RDD持久化数据在Executor内存中能占的比例,默认是0.6。也就是说,默认Executor 60%的内存,可以用来保存持久化的RDD数据。根据你选择的不同的持久化策略,如果内存不够时,可能数据就不会持久化,或者数据会写入磁盘。

  • 参数调优建议:如果Spark作业中,有较多的RDD持久化操作,该参数的值可以适当提高一些,保证持久化的数据能够容纳在内存中。避免内存不够 缓存所有的数据,导致数据只能写入磁盘中,降低了性能。但是如果Spark作业中的shuffle类操作比较多,而持久化操作比较少,那么这个参数的值适 当降低一些比较合适。此外,如果发现作业由于频繁的gc导致运行缓慢(通过spark web ui可以观察到作业的gc耗时),意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。

spark.shuffle.memoryFraction

  • 参数说明:该参数用于设置shuffle过程中一个task拉取到上个stage的task的输出后,进行聚合操作时能够使用的Executor 内存的比例,默认是0.2。也就是说,Executor默认只有20%的内存用来进行该操作。shuffle操作在进行聚合时,如果发现使用的内存超出了 这个20%的限制,那么多余的数据就会溢写到磁盘文件中去,此时就会极大地降低性能。

  • 参数调优建议:如果Spark作业中的RDD持久化操作较少,shuffle操作较多时,建议降低持久化操作的内存占比,提高shuffle操作 的内存占比比例,避免shuffle过程中数据过多时内存不够用,必须溢写到磁盘上,降低了性能。此外,如果发现作业由于频繁的gc导致运行缓慢,意味着 task执行用户代码的内存不够用,那么同样建议调低这个参数的值。

资源参数的调优,没有一个固定的值,需要同学们根据自己的实际情况(包括Spark作业中的shuffle操作数量、RDD持久化操作数量以及spark web ui中显示的作业gc情况),同时参考本篇文章中给出的原理以及调优建议,合理地设置上述参数。


资源参数参考示例

以下是一份spark-submit命令的示例,大家可以参考一下,并根据自己的实际情况进行调节:

./bin/spark-submit \
  --master yarn-cluster \
  --num-executors 100 \
  --executor-memory 6G \
  --executor-cores 4 \
  --driver-memory 1G \
  --conf spark.default.parallelism=1000 \
  --conf spark.storage.memoryFraction=0.5 \
  --conf spark.shuffle.memoryFraction=0.3 \

二、扩展spark driver

动态资源分配

在 Facebook,Spark 集群启用了动态资源分配(Dynamic Executor Allocation),以便更好的使用集群资源,而且在 Facebook 内部,Spark 是运行在多租户的集群上,所以这个也是非常合适的。比如典型的配置如下:

  • spark.dynamicAllocation.enabled = true

  • spark.dynamicAllocation.executorIdleTimeout = 2m

  • spark.dynamicAllocation.minExecutors = 1

  • spark.dynamicAllocation.maxExecutors = 2000

图1.png

多线程事件处理

在 Spark 2.3 版本之前,事件处理是单线程的架构,也就是说,事件队列里面的事件得一个一个处理。如果你的作业很大,并且有很多 tasks,很可能会导致事件处理出现延迟,进一步导致作业性能出现问题,甚至使当前作业失败。为了解决这个问题,SPARK-18838 这个 ISSUE 引入了多线程事件处理架构,每个事件都有其单独的单线程 executor service 去处理,这样就可以大大减少事件处理延时的问题。另外,由于每类事件都有单独的事件队列,所以会增加 Driver 端的内存占用。

图2.png




更好的 Fetch 失败处理

在 Spark 2.3 版本之前,如果 Spark 探测到 fetch failure,那么它会把产生这个 shuffle 文件的 Executor 移除掉。但是如果这个 Executor 所在的机器有很多 Executor,而且是因为这台机器挂掉导致 fetch failure,那么会导致很多的 fetch 重试,这种处理机制很低下。SPARK-19753 这个 ISSUE 使得 Spark 可以把上述场景所有 Executor 的 shuffle 文件移除,也就是不再去重试就知道 shuffle 文件不可用。


图3.png

另外,Spark 最大 Fetch 重试次数也可以通过 spark.max.fetch.failures.per.stage 参数配置。

FetchFailed 会在 ShuffleReader 取数据失败 N 次后抛出,然后由 executor 通过 statusUpdate 传到 driver 端,实际的处理会在 DAGScheduler.handleTaskCompletion,它会重新提交该 Stage 和该 Stage 对应的 ShuffleMapStage,重试次数超过 spark.stage.maxConsecutiveAttempts 时会退出。

RPC 服务线程调优

当 Spark 同时运行大量的 tasks 时,Driver 很容易出现 OOM,这是因为在 Driver 端的 Netty 服务器上产生大量 RPC 的请求积压,我们可以通过加大 RPC 服务的线程数解决 OOM 问题,比如 spark.rpc.io.serverThreads = 64


三、spark shuffle相关的参数调优

spark.shuffle.file.buffer

默认值:32k

参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。

调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。

 

spark.reducer.maxSizeInFlight

默认值:48m

参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。

调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。

 

spark.shuffle.io.maxRetries

默认值:3

参数说明:shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败。

调优建议:对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性。

 

spark.shuffle.io.retryWait

默认值:5s

参数说明:具体解释同上,该参数代表了每次重试拉取数据的等待间隔,默认是5s。

调优建议:建议加大间隔时长(比如60s),以增加shuffle操作的稳定性。

 

spark.shuffle.memoryFraction

默认值:0.2

参数说明:该参数代表了Executor内存中,分配给shuffle read task进行聚合操作的内存比例,默认是20%。

调优建议:如果内存充足,而且很少使用持久化操作,建议调高这个比例,给shuffle read的聚合操作更多内存,以避免由于内存不足导致聚合过程中频繁读写磁盘。在实践中发现,合理调节该参数可以将性能提升10%左右。

spark.shuffle.manager

默认值:sort

参数说明:该参数用于设置ShuffleManager的类型。Spark 1.5以后,有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项,但是Spark 1.2以及之后的版本默认都是SortShuffleManager了。tungsten-sort与sort类似,但是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。

调优建议:由于SortShuffleManager默认会对数据进行排序,因此如果你的业务逻辑中需要该排序机制的话,则使用默认的SortShuffleManager就可以;而如果你的业务逻辑不需要对数据进行排序,那么建议参考后面的几个参数调优,通过bypass机制或优化的HashShuffleManager来避免排序操作,同时提供较好的磁盘读写性能。这里要注意的是,tungsten-sort要慎用,因为之前发现了一些相应的bug。

spark.shuffle.sort.bypassMergeThreshold

默认值:200

参数说明:当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。

调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高。

spark.shuffle.consolidateFiles

默认值:false

参数说明:如果使用HashShuffleManager,该参数有效。如果设置为true,那么就会开启consolidate机制,会大幅度合并shuffle write的输出文件,对于shuffle read task数量特别多的情况下,这种方法可以极大地减少磁盘IO开销,提升性能。

调优建议:如果的确不需要SortShuffleManager的排序机制,那么除了使用bypass机制,还可以尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%。

spark.reducer.maxBlocksInFlightPerAddress

默认值:Int.MaxValue(2的31次方-1)

限制了每个主机每次reduce可以被多少台远程主机拉取文件块,调低这个参数可以有效减轻node manager的负载。

spark.reducer.maxReqsInFlight

默认值:Int.MaxValue(2的31次方-1)

限制远程机器拉取本机器文件块的请求数,随着集群增大,需要对此做出限制。否则可能会使本机负载过大而挂掉。。

spark.reducer.maxReqSizeShuffleToMem

默认值:Long.MaxValue

shuffle请求的文件块大小 超过这个参数值,就会被强行落盘,防止一大堆并发请求把内存占满。

spark.shuffle.compress

默认压缩 true

是否压缩map输出文件

spark.shuffle.spill.compress

默认:true

shuffle过程中溢出的文件是否压缩,使用spark.io.compression.codec压缩。



相关文章

mysql部署

安装前准备创建用户和目录创建用户:groupadd mysqluseradd -g mysql mysql 创建目录mkdir /data/mysqlmkdir /log/mysql添加环境变量vim...

数据湖技术之iceberg(三)Iceberg数据存储格式

数据湖技术之iceberg(三)Iceberg数据存储格式

1  Iceberg数据存储格式1.1.  Iceberg术语l   data files(数据文件):数据文件是Apache Iceberg表真实存储数据的文...

MySQL运维实战之Clone插件(10.2)Clone插件原理

MySQL运维实战之Clone插件(10.2)Clone插件原理

clone插件实现clone操作主要分为几个阶段:1、初始阶段。初始阶段,会开启页面跟踪(Page Tracking)。开启页面跟踪后,修改过的页面的编号会被记录下来。页面的修改可分为两个阶段:首先在...

MySQL 异常:max key length is 767 bytes

MySQL 异常:max key length is 767 bytes

前言最近迁移几张表,又遇到 767 异常,迁移前只检查了 sql_mode 忽略对比了这个参数,导致几张表创建失败,其实解决方法也很简单,开启 innodb_large_prefix 参数重新导入即可...

开源大数据集群部署(二十)Trino部署

开源大数据集群部署(二十)Trino部署

2.9.1 解压trino的包到opt目录cd /root/bigdata tar -xzvf trino-server-389.tar.gz -C /opt/ ln -s /opt/trino-...

Linux命令traceroute—追踪网络路由利器

说明:通过traceroute我们可以知道信息从你的计算机到互联网另一端的主机是走的什么路径。当然每次数据包由某一同样的出发点(source)到达某一同样的目的地(destination)走的路径可能...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。