Spark在降本增效中的一些思考

背景

在大环境不好的情况下,本司也开始了“降本增效”,本文探讨一下,在这种背景下 Spark怎么做的降本增效。
Yarn 基于 EMR CPU 是xlarge,也就是内存和核的比例在7:1左右的 ,磁盘是基于 NVMe SSD
Spark 3.5.0(也是刚由3.1 升级而来 )
JDK 8
这里为什么强调 NVMe ,因为相比于 HDD 来说,他的磁盘IO有更高的读写速度。 导致我们在 Spark上 做的一些常规优化是不起效果的

注意:如没特别说明 P99 P95 avg等时间单位是秒

优化手段

调整JVM GC策略

因为我们内部存在于类似 Apache kyuubi这种 long running的服务,而且内存都是20GB起步,所以第一步就想到调整 CMS 策略为 G1 或者 Parallel.
测试结果:

GC类型 p100 p99 p95 p90 p50 avg task_count
CMS 9569 2085.3800000000047 688 208 3 43.85805120688368 87363
G1 9380 2164.9100000000035 853 378 6 52.22754511406197 88110
Parallel 10919 2192.970000000001 888 393 6 52.06006478898587 88904

从结果上来看,CMS在 吞吐上比 G1和Parallel 都要好的,附上 JVM的一些关键参数如下
Spark driver端的配置都是一样:

"spark.driver.extraJavaOptions": "-verbose:gc -XX:+UseParNewGC -XX:MaxTenuringThreshold=15 -XX:+UseCMSCompactAtFullCollection -XX:CMSFullGCsBeforeCompaction=5  -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70 -XX:+ParallelRefProcEnabled -XX:+ExplicitGCInvokesConcurrent -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=heapdump",

CMS的 Executor 配置如下:

"spark.executor.extraJavaOptions": " -verbose:gc -XX:+UseParNewGC -XX:MaxTenuringThreshold=15 -XX:+UseCMSCompactAtFullCollection -XX:CMSFullGCsBeforeCompaction=5  -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70 -XX:+ParallelRefProcEnabled -XX:+ExplicitGCInvokesConcurrent -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=heapdump"

G1的 Executor 配置如下:

"spark.executor.extraJavaOptions": "-verbose:gc -XX:+UseG1GC -XX:ParallelGCThreads=10 -XX:ConcGCThreads=5 -XX:G1ReservePercent=15 -XX:MaxTenuringThreshold=15 -XX:+ParallelRefProcEnabled -XX:+ExplicitGCInvokesConcurrent -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=heapdump"

Parellel 的 Executor 配置如下:

"spark.executor.extraJavaOptions": "-Dcarbon.properties.filepath=carbon.properties -Dlog4j.configuration=xql-log4j.properties -verbose:gc -XX:++UseParallelGC -XX:+UseParallelOldGC -XX:ParallelGCThreads=10 -XX:+UseAdaptiveSizePolicy -XX:+ParallelRefProcEnabled  -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=heapdump"

增发并发度

在对任务的运行时长进行统计的时候我们发现,绝大部分任务都是运行时间在5分钟左右,而与此同时在任务高峰期的时候,整体任务的等待数量一直在1800左右,这种情况下,增大任务的并发度能够很好的增加整体的吞吐,实践中,起码后20%的提速,具体的运行时间如下:

p99 p95 p90 p50 avg
2391 946 562 38 193.19207605335595

引入ESS或者RSS

对于 ESS 可以参考:push-based shuffle to improve shuffle efficiency,但是这里请注意一点,该问题的提出点是基于 HDD 类型的磁盘的,因为我们现在是基于 NVMe SSD 的,所以几乎没有什么提升。
对于 RSS 可以参考:Apache celeborn,结果也是差不多。

引入向量化插件

对于向量化加速这块建议参考: Gluten,这也是笔者一直在关注的项目,根据 TPC-H 测试结果显示起码有2倍的性能提升,但是实际效果还是得看SQL的pattern。但是由于目前我们的Spark 是基于 3.5.0的,是比较新的版本,而社区这块的融合还在继续,所以这块今年应该可以行动起来,可以参考Add spark35 scala213

设置压缩格式为ZSTD

众所周知,ZSTD在压缩率和压缩解压缩上的性能都是很突出,但是在 NVMe SSD 光环下,改成了ZSTD的压缩格式以后,效果却不如人意,而且我们还合并了Parallel Compression Support for ZSTD到我们内部的Spark版本中,具体的效果如下:
具体的配置项为:“spark.io.compression.codec”:"zstd"

压缩格式 p99 p95 p90 p50 avg task_count
lz4 2806.5100000000034 121 60 5 85.62898089171975 628
zstd 1814.3700000000076 118.3499999999998 78.30000000000001 11 117.2063492063492 378

注意: 我们批集群的CPU利用率在60%以上,引入zstd以后会增加CPU的使用率,而且在这种 long running的服务下,得增加driver的core数,要不然会导致在进行广播的时候卡住,具体的可以加如下配置:

"spark.driver.cores":"8", 
"spark.io.compression.zstd.workers":"2",

至于效果为什么不理想,主要原因还是因为 shuffle数据量不大,可能对于单个shuffle数据量大的应用来说,可能效果比较明显。

增大可广播join的阈值大小

因为对于这种 long running的服务,driver端的内存一般都是挺大的,所以可以适当增大spark.sql.autoBroadcastJoinThreshold的配置,笔者这里设置了 50MB

引入动态资源分配

对于这种long running服务来说,每个时段的任务数肯定是不一样的,那这个时候引入动态资源分配的话,效果就是很好的,具体的配置可以参考如下:

"spark.shuffle.service.enabled":"false"
"spark.dynamicAllocation.enabled": "true",
"spark.dynamicAllocation.shuffleTracking.enabled": "true",
"spark.dynamicAllocation.minExecutors": "20",
"spark.dynamicAllocation.maxExecutors": "40",
"spark.dynamicAllocation.executorIdleTimeout":"3600"

相关推荐

  1. Spark增效一些思考

    2024-01-22 13:46:02       29 阅读
  2. 开源软件企业应用:增效利器(AI)

    2024-01-22 13:46:02       29 阅读
  3. 网站安全攻防:增效解决之道

    2024-01-22 13:46:02       39 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-01-22 13:46:02       16 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-01-22 13:46:02       16 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-01-22 13:46:02       15 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-01-22 13:46:02       18 阅读

热门阅读

  1. brpc负载均衡load balance和服务发现name servicing

    2024-01-22 13:46:02       29 阅读
  2. 计算机通信:HTTP协议

    2024-01-22 13:46:02       34 阅读
  3. 编程笔记 html5&css&js 050 CSS表格2-1

    2024-01-22 13:46:02       26 阅读
  4. Golang爬虫技术

    2024-01-22 13:46:02       34 阅读
  5. 用go语言删除重复文件

    2024-01-22 13:46:02       35 阅读
  6. Vue.js:构建用户界面的渐进式框架

    2024-01-22 13:46:02       26 阅读