由于blog各种垃圾评论太多,而且本人审核评论周期较长,所以懒得管理评论了,就把评论功能关闭,有问题可以直接qq骚扰我

Spark 内存管理

大数据 西门飞冰 5555℃
[隐藏]

1.介绍

Spark是基于内存的分布式计算引擎,其内置强大的内存管理机制,保证数据优先内存处理,并支持数据磁盘存储。

在执行Spark 的应用程序时,Spark 集群会启动 Driver 和 Executor 两种 JVM 进程,前者为主控进程,负责创建 Spark 上下文,提交 Spark 作业(Job),并将作业转化为计算任务(Task),在各个 Executor 进程间协调任务的调度,后者负责在工作节点上执行具体的计算任务,并将结果返回给 Driver,同时为需要持久化的 RDD 提供存储功能。由于 Driver 的内存管理相对来说较为简单,本节主要对 Executor 的内存管理进行分析,下文中的 Spark 内存均特指 Executor 的内存。

2.调整内存对性能的提升

增加了内存量以后,对性能的提升,有三点:

1、如果需要对RDD进行cache,那么更多的内存,就可以缓存更多的数据,将更少的数据写入磁盘,甚至不写入磁盘。减少了磁盘IO。

2、对于shuffle操作,reduce端,会需要内存来存放拉取的数据并进行聚合。如果内存不够,也会写入磁盘。如果给executor分配更多内存以后,就有更少的数据,需要写入磁盘,甚至不需要写入磁盘。减少了磁盘IO,提升了性能。

3、对于task的执行,可能会创建很多对象。如果内存比较小,可能会频繁导致JVM堆内存满了,然后频繁GC,垃圾回收,minor GC和full GC。(速度很慢)。内存加大以后,带来更少的GC,垃圾回收,避免了速度变慢,性能提升。

3.内存的划分及用途

内存区域划分 堆内内存 堆外内存 作用
内存空间总大小 spark.executor.memory spark.memory.offHeap.size
Usable Memory 等于总内存减去预留内存 可用内存=存储内存+执行内存+Other内存
保留内存 固定为300M 作用和Other相同,可保障留出足够的空间
Other内存 占 Usable Memory 内存的40% Other内存区域主要用来存储用户定义的数据结构、Spark内部元数据
存储内存 spark.executor.memory *

spark.memory.fraction *

spark.memory.storageFraction

spark.memory.offHeap.size * spark.memory.storageFraction 用来容纳 RDD 缓存和广播变量
执行内存 spark.executor.memory *

spark.memory.fraction * (1 – spark.memory.storageFraction)

spark.memory.offHeap.size * (1-spark.memory.storageFraction) 用于分布式任务执行,如 Shuffle、Sort 和 Aggregate 等操作

4.堆内内存和堆外内存规划

Executor 的内存管理建立在 JVM 的内存管理之上,Spark 对 JVM 的堆内(On-heap)空间进行了更为详细的分配,以充分利用内存。同时,Spark 引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间,进一步优化了内存的使用。

wps20

4.1.堆内内存

堆内内存的大小,由 Spark 应用程序启动时的 executor-memory 或 spark.executor.memory 参数配置。Executor 内运行的并发任务共享 JVM 堆内内存,这些任务在缓存 RDD 数据和广播(Broadcast)数据时占用的内存被规划为存储(Storage)内存,而这些任务在执行 Shuffle 时占用的内存被规划为执行(Execution)内存,剩余的部分不做特殊规划,那些 Spark 内部的对象实例,或者用户定义的 Spark 应用程序中的对象实例,均占用剩余的空间。不同的管理模式下,这三部分占用的空间大小各不相同。

4.2.堆外内存

为了进一步优化内存的使用以及提高 Shuffle 时排序的效率,Spark 引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间,存储经过序列化的二进制数据。

堆外内存意味着把内存对象分配在Java虚拟机的堆以外的内存,这些内存直接受操作系统管理(而不是虚拟机)。这样做的结果就是能保持一个较小的堆,以减少垃圾收集对应用的影响。

在默认情况下堆外内存并不启用,可通过配置 spark.memory.offHeap.enabled 参数启用,并由 spark.memory.offHeap.size 参数设定堆外空间的大小。除了没有 other 空间,堆外内存与堆内内存的划分方式相同,所有运行中的并发任务共享存储内存和执行内存。

5.内存空间分配

Spark 1.6 之后引入的统一内存管理机制,存储内存和执行内存共享同一块空间,可以动态占用对方的空闲区域

统一内存管理的堆内内存结构如图

wps21

堆内内存整体划分为Usable Memory(可用内存)和Reversed Memory(预留内存)两大部分。其中预留内存作为OOM等异常情况的内存使用区域,默认被分配300M的空间。可用内存可进一步分为(Unified Memory)统一内存和Other内存其他两部分,默认占比为6:4。

统一内存管理的堆外内存结构如图:

堆外内存默认值为384M。整体分为Storage和Execution两部分,且启用动态内存占用机制,其中默认的初始化占比值均为0.5。

wps22

其中最重要的优化在于动态占用机制,其规则如下:

1、设定基本的存储内存和执行内存区域(spark.storage.storageFraction 参数),该设定确定了双方各自拥有的空间的范围;

2、双方的空间都不足时,则存储到硬盘;若己方空间不足而对方空余时,可借用对方的空间;(存储空间不足是指不足以放下一个完整的 Block)

3、执行内存的空间被对方占用后,可让对方将占用的部分转存到硬盘,然后”归还”借用的空间;

4、存储内存的空间被对方占用后,无法让对方”归还”,因为需要考虑 Shuffle 过程中的很多因素,实现起来较为复杂。

统一内存管理的动态占用机制如图:

wps23

凭借统一内存管理机制,Spark 在一定程度上提高了堆内和堆外内存资源的利用率,降低了开发者维护 Spark 内存的难度,但并不意味着开发者可以高枕无忧。如果存储内存的空间太大或者说缓存的数据过多,反而会导致频繁的全量垃圾回收,降低任务执行时的性能,因为缓存的 RDD 数据通常都是长期驻留内存的所以要想充分发挥 Spark 的性能,需要开发者进一步了解存储内存和执行内存各自的管理方式和实现原理。

6.存储内存管理

6.1.RDD的持久化机制

Task 在启动之初读取一个分区时,会先判断这个分区是否已经被持久化,如果没有则需要检查 Checkpoint 或按照血缘重新计算。所以如果一个 RDD 上要执行多次行动,可以在第一次行动中使用 persist 或 cache 方法,在内存或磁盘中持久化或缓存这个 RDD,从而在后面的行动时提升计算速度。

事实上,cache 方法是使用默认的 MEMORY_ONLY 的存储级别将 RDD 持久化到内存,故缓存是一种特殊的持久化。 堆内和堆外存储内存的设计,便可以对缓存 RDD 时使用的内存做统一的规划和管理。

RDD 的持久化由 Spark 的 Storage 模块负责,实现了 RDD 与物理存储的解耦合。Storage 模块负责管理 Spark 在计算过程中产生的数据,将那些在内存或磁盘、在本地或远程存取数据的功能封装了起来。

Storage 模块在逻辑上以 Block 为基本存储单位,RDD 的每个 Partition 经过处理后唯一对应一个 Block

Driver 端和 Executor 端的 Storage 模块构成了主从式的架构,即 Driver 端的 BlockManager 为 Master,Executor 端的 BlockManager 为 Slave。Driver端的Master 负责整个 Spark 应用程序的 Block 的元数据信息的管理和维护,而Executor端的 Slave 需要将 Block 的更新等状态上报到 Master,同时接收 Master 的命令,例如新增或删除一个 RDD。

Storage模块示意图如下:

wps24

在对 RDD 持久化时,Spark 规定了 MEMORY_ONLY、MEMORY_AND_DISK 等 7 种不同的存储级别 ,而存储级别是以下 5 个变量的组合:

class StorageLevel private(
	private var _useDisk: Boolean, //磁盘
	private var _useMemory: Boolean, //这里其实是指堆内内存	
	private var _useOffHeap: Boolean, //堆外内存
	private var _deserialized: Boolean, //是否为非序列化
	private var _replication: Int = 1 //副本个数
)

Spark中7种存储级别如下:

image-20221028224006575

通过对数据结构的分析,可以看出存储级别从三个维度定义了 RDD 的 Partition(同时也就是 Block)的存储方式:

1、存储位置:磁盘/堆内内存/堆外内存。如 MEMORY_AND_DISK 是同时在磁盘和堆内内存上存储,实现了冗余备份。OFF_HEAP 则是只在堆外内存存储,目前选择堆外内存时不能同时存储到其他位置。

2、存储形式:Block 缓存到存储内存后,是否为非序列化的形式。如 MEMORY_ONLY 是非序列化方式存储,OFF_HEAP 是序列化方式存储。

3、副本数量:大于 1 时需要远程冗余备份到其他节点。如 DISK_ONLY_2 需要远程备份 1 个副本。

6.2.淘汰与落盘

由于同一个Executor的所有的计算任务共享有限的存储内存空间,当有新的 Block 需要缓存但是剩余空间不足且无法动态占用时,就要对LinkedHashMap中的旧Block进行淘汰(Eviction),而被淘汰的Block如果其存储级别中同时包含存储到磁盘的要求,则要对其进行落盘(Drop),否则直接删除该Block。

存储内存的淘汰规则为:

  • 被淘汰的旧Block要与新Block的MemoryMode相同,即同属于堆外或堆内内存;
  • 新旧Block不能属于同一个RDD,避免循环淘汰;
  • 旧Block所属RDD不能处于被读状态,避免引发一致性问题;
  • 遍历LinkedHashMap中Block,按照最近最少使用(LRU)的顺序淘汰,直到满足新Block所需的空间。其中LRU是LinkedHashMap的特性。

落盘的流程则比较简单,如果其存储级别符合useDisk为true的条件,再根据其deserialized判断是否是非序列化的形式,若是则对其进行序列化,最后将数据存储到磁盘,在Storage模块中更新其信息。

7.执行内存管理

执行内存主要用来存储任务在执行Shuffle时占用的内存,Shuffle是按照一定规则对RDD数据重新分区的过程,我们来看Shuffle的Write和Read两阶段对执行内存的使用:

1)Shuffle Write

若在map端选择快速排序方式,在内存中存储数据时主要占用堆内执行空间。

在内存中存储数据时可以占用堆外或堆内执行空间,取决于用户是否开启了堆外内存以及堆外执行内存是否足够。

2)Shuffle Read

在对reduce端的数据进行聚合时,要将数据交给Aggregator处理,在内存中存储数据时占用堆内执行空间。

转载请注明:西门飞冰的博客 » Spark 内存管理

喜欢 (0)or分享 (0)