由于blog各种垃圾评论太多,而且本人审核评论周期较长,所以懒得管理评论了,就把评论功能关闭,有问题可以直接qq骚扰我

Spark RDD 持久化(Cache和CheckPoint)

大数据 西门飞冰 5758℃
[隐藏]

1.介绍

持久化的作用,供RDD的重复使用,针对计算耗时比较长,可以提高计算的效率,针对数据比较重要的数据保存到持久化中,数据的安全性也可以得到保障。

image-20221027121011945

image-20221027121033959

持久化操作是在行动算子执行时完成的。

注意:RDD中不存储数据,如果一个RDD需要重复使用,那么需要从头再次执行来获取数据,RDD对象可以重用的,但是数据无法重用

2.Cache缓存

RDD通过Cache或者Persist方法将前面的计算结果缓存,默认情况下会把数据以序列化的形式缓存在JVM的堆内存中。

如果使用完了缓存,可以通过unpersist()方法释放缓存。

但是并不是这两个方法被调用时立即缓存,而是触发后面的action算子时,该RDD将会被缓存在计算节点的内存中,并供后面重用。

代码实现:

public class Cache {

    public static void main(String[] args) throws InterruptedException {
        // 1 创建SparkConf
        SparkConf sparkConf = new SparkConf().setAppName("SparkCore").setMaster("local[2]");

        // 2 创建SparkContext
        JavaSparkContext sc = new JavaSparkContext(sparkConf);

        // 3 编写代码
        JavaRDD<String> lineRDD = sc.textFile("input/1.txt");

        JavaRDD<String> flatMap = lineRDD.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String line) throws Exception {
                String[] words = line.split(" ");
                return Arrays.stream(words).iterator();
            }
        });

        JavaRDD<String> filter = flatMap.filter(new Function<String, Boolean>() {
            @Override
            public Boolean call(String word) throws Exception {
                return !"".equals(word) && word != null;
            }
        });

        JavaPairRDD<String, Integer> mapToPair = filter.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                return new Tuple2<>(word, 1);
            }
        });
      	// 数据缓存。
        filter.cache();
				// 数据缓存。
        mapToPair.cache();
        // persist方法可以更改存储级别
      	// cache底层调用的就是persist方法,缓存级别默认用的是MEMORY_ONLY
        mapToPair.persist(StorageLevel.MEMORY_ONLY());

        JavaPairRDD<String, Integer> reduceByKey = mapToPair.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer sum, Integer elem) throws Exception {
                return sum + elem;
            }
        });
      	// 触发执行逻辑
        reduceByKey.collect();

        JavaPairRDD<String, Iterable<Integer>> groupByKey = mapToPair.groupByKey();
        // 再次触发执行逻辑
        groupByKey.collect();
        System.out.println(groupByKey.toDebugString());
        Thread.sleep(999999);

        // 4 关闭资源
        sc.stop();
    }
}

缓存结果验证:

DAG中要是出现有下图的小绿点,就证明缓存成功

image-20221024204952044

Spark Web Ui Storage页面查看:

image-20221024210141927

查看血缘关系:

(2) MapPartitionsRDD[7] at groupByKey at Test01_cache.java:63 []
 |  ShuffledRDD[6] at groupByKey at Test01_cache.java:63 []
 +-(2) MapPartitionsRDD[4] at mapToPair at Test01_cache.java:44 []
    |      CachedPartitions: 2; MemorySize: 145.9 MiB; DiskSize: 0.0 B	// 表示缓存在内存中
    |  MapPartitionsRDD[3] at filter at Test01_cache.java:37 []
    |      CachedPartitions: 2; MemorySize: 103.6 MiB; DiskSize: 0.0 B
    |  MapPartitionsRDD[2] at flatMap at Test01_cache.java:29 []
    |  input/1.txt MapPartitionsRDD[1] at textFile at Test01_cache.java:27 []
    |  input/1.txt HadoopRDD[0] at textFile at Test01_cache.java:27 []

2.1.缓存的级别

注意:只有persist方法可以更改存储级别

默认的存储级别都是仅在内存存储一份。在存储级别的末尾加上“_2”表示持久化的数据存为两份。SER:表示序列化。

存储级别源码:

object StorageLevel extends scala.AnyRef with scala.Serializable {
  val NONE : org.apache.spark.storage.StorageLevel = { /* compiled code */ }
  val DISK_ONLY : org.apache.spark.storage.StorageLevel = { /* compiled code */ }
  val DISK_ONLY_2 : org.apache.spark.storage.StorageLevel = { /* compiled code */ }
  val DISK_ONLY_3 : org.apache.spark.storage.StorageLevel = { /* compiled code */ }
  val MEMORY_ONLY : org.apache.spark.storage.StorageLevel = { /* compiled code */ }
  val MEMORY_ONLY_2 : org.apache.spark.storage.StorageLevel = { /* compiled code */ }
  val MEMORY_ONLY_SER : org.apache.spark.storage.StorageLevel = { /* compiled code */ }
  val MEMORY_ONLY_SER_2 : org.apache.spark.storage.StorageLevel = { /* compiled code */ }
  val MEMORY_AND_DISK : org.apache.spark.storage.StorageLevel = { /* compiled code */ }
  val MEMORY_AND_DISK_2 : org.apache.spark.storage.StorageLevel = { /* compiled code */ }
  val MEMORY_AND_DISK_SER : org.apache.spark.storage.StorageLevel = { /* compiled code */ }
  val MEMORY_AND_DISK_SER_2 : org.apache.spark.storage.StorageLevel = { /* compiled code */ }
  val OFF_HEAP : org.apache.spark.storage.StorageLevel = { /* compiled code */ }

image-20221024225646185

缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。

2.2.自带缓存算子

一些带shuffle的算子,会自带缓存

Spark会自动对一些Shuffle操作的中间数据做持久化操作(比如:reduceByKey)。这样做的目的是为了当一个节点Shuffle失败了避免重新计算整个输入。但是,在实际使用的时候,如果想重用数据,仍然建议调用persist或cache。

如何确定算子自带缓存?

举例:查看下面第一个和第二个job的DAG图。说明:增加缓存后血缘依赖关系仍然有,但是,第二个job取的数据是从缓存中取的,所以Stage2 执行显示灰色

image-20221027122954799

3.CheckPoint检查点

为什么要做检查点?

由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。

检查点存储路径:Checkpoint的数据通常是存储在HDFS等容错、高可用的文件系统

检查点数据存储格式为:二进制的文件

检查点切断血缘:在Checkpoint的过程中,该RDD的所有依赖于父RDD中的信息将全部被移除。

检查点触发时间:对RDD进行Checkpoint操作并不会马上被执行,必须执行Action操作才能触发。但是检查点为了数据安全,会从血缘关系的最开始执行一遍。

建议对checkpoint()的RDD使用Cache缓存,这样checkpoint的job只需从Cache缓存中读取数据即可,否则需要再从头计算一次RDD。

设置检查点步骤

(1)设置检查点数据存储路径:sc.setCheckpointDir(“./checkpoint1”)

(2)调用检查点方法:wordToOneRdd.checkpoint()

代码实现:

public class CheckPoint {
    public static void main(String[] args) {

        // 1.创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");

        // 2. 创建sparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);

        // 设置检查点存储路径
        sc.setCheckpointDir("ck");

        // 3. 编写代码
        JavaRDD<String> lineRDD = sc.textFile("input/2.txt");

        JavaPairRDD<String, Long> tupleRDD = lineRDD.mapToPair(new PairFunction<String, String, Long>() {
            @Override
            public Tuple2<String, Long> call(String s) throws Exception {
                return new Tuple2<String, Long>(s, System.currentTimeMillis());
            }
        });

        // 查看血缘关系
        System.out.println(tupleRDD.toDebugString());

        // 增加检查点避免计算两次
        tupleRDD.cache();

        // 进行检查点
        tupleRDD.checkpoint();

        tupleRDD. collect().forEach(System.out::println);

        System.out.println(tupleRDD.toDebugString());
        // 第二次计算
        tupleRDD. collect().forEach(System.out::println);
        // 第三次计算
        tupleRDD. collect().forEach(System.out::println);

        // 4. 关闭sc
        sc.stop();
    }
}

4.缓存和检查点区别

cache:

  • 将数据临时存储在内存中进行数据重用
  • 会在血缘关系中添加新的依赖。一旦出现问题,可以从头读取数据

persist:

  • 将数据临时存储在磁盘文件中进行数据重用
  • 涉及到磁盘IO,性能较低,但是数据安全
  • 如果作业执行完成,临时保存的数据文件就会丢失

checkpoint:

  • 将数据长久地保存在磁盘文件中进行数据重用
  • 涉及到磁盘IO,性能较低,但是数据是安全的
  • 为了保证数据安全,所以一般情况下,会独立执行作业
  • 执行过程中,会切断血缘关系。重新建立新的血缘关系。因为读取数据的数据源发生了改变。
  • checkpoint等同于新的数据源。

 

转载请注明:西门飞冰的博客 » Spark RDD 持久化(Cache和CheckPoint)

喜欢 (0)or分享 (0)