1.介绍
Spark是基于内存
的分布式计算引擎,其内置强大的内存管理机制,保证数据优先内存
处理,并支持数据磁盘存储。
2.调整内存对性能的提升
增加了内存量以后,对性能的提升,有三点:
1、如果需要对RDD进行cache,那么更多的内存,就可以缓存更多的数据,将更少的数据写入磁盘,甚至不写入磁盘。减少了磁盘IO。
2、对于shuffle操作,reduce端,会需要内存来存放拉取的数据并进行聚合。如果内存不够,也会写入磁盘。如果给executor分配更多内存以后,就有更少的数据,需要写入磁盘,甚至不需要写入磁盘。减少了磁盘IO,提升了性能。
3、对于task的执行,可能会创建很多对象。如果内存比较小,可能会频繁导致JVM堆内存满了,然后频繁GC,垃圾回收,minor GC和full GC。(速度很慢)。内存加大以后,带来更少的GC,垃圾回收,避免了速度变慢,性能提升。
3.内存的划分及用途
内存区域划分 | 堆内内存 | 堆外内存 | 作用 |
---|---|---|---|
内存空间总大小 | spark.executor.memory | spark.memory.offHeap.size | |
Usable Memory | 等于总内存减去预留内存 | 无 | 可用内存=存储内存+执行内存+Other内存 |
保留内存 | 固定为300M | 无 | 作用和Other相同,可保障留出足够的空间 |
Other内存 | 占 Usable Memory 内存的40% | 无 | Other内存区域主要用来存储用户定义的数据结构、Spark内部元数据 |
存储内存 | spark.executor.memory *
spark.memory.fraction * spark.memory.storageFraction |
spark.memory.offHeap.size * spark.memory.storageFraction | 用来容纳 RDD 缓存和广播变量 |
执行内存 | spark.executor.memory *
spark.memory.fraction * (1 – spark.memory.storageFraction) |
spark.memory.offHeap.size * (1-spark.memory.storageFraction) | 用于分布式任务执行,如 Shuffle、Sort 和 Aggregate 等操作 |
4.堆内内存和堆外内存规划
Executor 的内存管理建立在 JVM 的内存管理之上,Spark 对 JVM 的堆内(On-heap)空间进行了更为详细的分配,以充分利用内存。同时,Spark 引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间,进一步优化了内存的使用。
4.1.堆内内存
堆内内存的大小,由 Spark 应用程序启动时的 executor-memory 或 spark.executor.memory 参数配置。Executor 内运行的并发任务共享 JVM 堆内内存,这些任务在缓存 RDD 数据和广播(Broadcast)数据时占用的内存被规划为存储(Storage)内存,而这些任务在执行 Shuffle 时占用的内存被规划为执行(Execution)内存,剩余的部分不做特殊规划,那些 Spark 内部的对象实例,或者用户定义的 Spark 应用程序中的对象实例,均占用剩余的空间。不同的管理模式下,这三部分占用的空间大小各不相同。
4.2.堆外内存
为了进一步优化内存的使用以及提高 Shuffle 时排序的效率,Spark 引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间,存储经过序列化的二进制数据。
堆外内存意味着把内存对象分配在Java虚拟机的堆以外的内存,这些内存直接受操作系统管理(而不是虚拟机)。这样做的结果就是能保持一个较小的堆,以减少垃圾收集对应用的影响。
在默认情况下堆外内存并不启用,可通过配置 spark.memory.offHeap.enabled 参数启用,并由 spark.memory.offHeap.size 参数设定堆外空间的大小。除了没有 other 空间,堆外内存与堆内内存的划分方式相同,所有运行中的并发任务共享存储内存和执行内存。
5.内存空间分配
Spark 1.6 之后引入的统一内存管理机制,存储内存和执行内存共享同一块空间,可以动态占用对方的空闲区域。
统一内存管理的堆内内存结构如图
堆内内存整体划分为Usable Memory
(可用内存)和Reversed Memory
(预留内存)两大部分。其中预留内存作为OOM等异常情况的内存使用区域,默认被分配300M的空间。可用内存可进一步分为(Unified Memory)统一内存和Other内存其他两部分,默认占比为6:4。
统一内存管理的堆外内存结构如图:
堆外内存默认值为384M。整体分为Storage和Execution两部分,且启用动态内存占用
机制,其中默认的初始化占比值均为0.5。
其中最重要的优化在于动态占用机制,其规则如下:
1、设定基本的存储内存和执行内存区域(spark.storage.storageFraction 参数),该设定确定了双方各自拥有的空间的范围;
2、双方的空间都不足时,则存储到硬盘;若己方空间不足而对方空余时,可借用对方的空间;(存储空间不足是指不足以放下一个完整的 Block)
3、执行内存的空间被对方占用后,可让对方将占用的部分转存到硬盘,然后”归还”借用的空间;
4、存储内存的空间被对方占用后,无法让对方”归还”,因为需要考虑 Shuffle 过程中的很多因素,实现起来较为复杂。
统一内存管理的动态占用机制如图:
如果存储内存的空间太大或者说缓存的数据过多,反而会导致频繁的全量垃圾回收,降低任务执行时的性能,因为缓存的 RDD 数据通常都是长期驻留内存的。
6.存储内存管理
6.1.RDD的持久化机制
Task 在启动之初读取一个分区时,会先判断这个分区是否已经被持久化,如果没有则需要检查 Checkpoint 或按照血缘重新计算。所以如果一个 RDD 上要执行多次行动,可以在第一次行动中使用 persist 或 cache 方法,在内存或磁盘中持久化或缓存这个 RDD,从而在后面的行动时提升计算速度。
事实上,cache 方法是使用默认的 MEMORY_ONLY 的存储级别将 RDD 持久化到内存,故缓存是一种特殊的持久化。 堆内和堆外存储内存的设计,便可以对缓存 RDD 时使用的内存做统一的规划和管理。
RDD 的持久化由 Spark 的 Storage 模块负责,实现了 RDD 与物理存储的解耦合。Storage 模块负责管理 Spark 在计算过程中产生的数据,将那些在内存或磁盘、在本地或远程存取数据的功能封装了起来。
Storage 模块在逻辑上以 Block 为基本存储单位,RDD 的每个 Partition 经过处理后唯一对应一个 Block
Driver 端和 Executor 端的 Storage 模块构成了主从式的架构,即 Driver 端的 BlockManager 为 Master,Executor 端的 BlockManager 为 Slave。Driver端的Master 负责整个 Spark 应用程序的 Block 的元数据信息的管理和维护,而Executor端的 Slave 需要将 Block 的更新等状态上报到 Master,同时接收 Master 的命令,例如新增或删除一个 RDD。
Storage模块示意图如下:
class StorageLevel private( private var _useDisk: Boolean, //磁盘 private var _useMemory: Boolean, //这里其实是指堆内内存 private var _useOffHeap: Boolean, //堆外内存 private var _deserialized: Boolean, //是否为非序列化 private var _replication: Int = 1 //副本个数 )
通过对数据结构的分析,可以看出存储级别从三个维度定义了 RDD 的 Partition(同时也就是 Block)的存储方式:
1、存储位置:磁盘/堆内内存/堆外内存。如 MEMORY_AND_DISK 是同时在磁盘和堆内内存上存储,实现了冗余备份。OFF_HEAP 则是只在堆外内存存储,目前选择堆外内存时不能同时存储到其他位置。
2、存储形式:Block 缓存到存储内存后,是否为非序列化的形式。如 MEMORY_ONLY 是非序列化方式存储,OFF_HEAP 是序列化方式存储。
3、副本数量:大于 1 时需要远程冗余备份到其他节点。如 DISK_ONLY_2 需要远程备份 1 个副本。
6.2.淘汰与落盘
由于同一个Executor的所有的计算任务共享有限的存储内存空间,当有新的 Block 需要缓存但是剩余空间不足且无法动态占用时,就要对LinkedHashMap中的旧Block进行淘汰(Eviction),而被淘汰的Block如果其存储级别中同时包含存储到磁盘的要求,则要对其进行落盘(Drop),否则直接删除该Block。
存储内存的淘汰规则为:
- 被淘汰的旧Block要与新Block的MemoryMode相同,即同属于堆外或堆内内存;
- 新旧Block不能属于同一个RDD,避免循环淘汰;
- 旧Block所属RDD不能处于被读状态,避免引发一致性问题;
- 遍历LinkedHashMap中Block,按照最近最少使用(LRU)的顺序淘汰,直到满足新Block所需的空间。其中LRU是LinkedHashMap的特性。
useDisk为true的条件,再根据其deserialized判断是否是非序列化的形式,若是则对其进行序列化,最后将数据存储到磁盘,在Storage模块中更新其信息。
7.执行内存管理
执行内存主要用来存储任务在执行Shuffle时占用的内存,Shuffle是按照一定规则对RDD数据重新分区的过程,我们来看Shuffle的Write和Read两阶段对执行内存的使用:
1)Shuffle Write
若在map端选择快速排序方式,在内存中存储数据时主要占用堆内执行空间。
在内存中存储数据时可以占用堆外或堆内执行空间,取决于用户是否开启了堆外内存以及堆外执行内存是否足够。
2)Shuffle Read
在对reduce端的数据进行聚合时,要将数据交给Aggregator处理,在内存中存储数据时占用堆内执行空间。
转载请注明:西门飞冰的博客 » Spark 内存管理