由于blog各种垃圾评论太多,而且本人审核评论周期较长,所以懒得管理评论了,就把评论功能关闭,有问题可以直接qq骚扰我

Spark RDD 共享变量

大数据 西门飞冰 1370℃
[隐藏]

1.介绍

一般情况下,当一个传递给Spark操作(例如map和reduce)的函数在远程节点上面运行时,Spark操作实际上操作的是这个函数所用变量的一个独立副本。这些变量被复制到每台机器上,并且这些变量在远程机器上 的所有更新都不会传递回驱动程序。通常跨任务的读写变量是低效的,但是,Spark还是为两种常见的使用模式提供了两种有限的共享变量:广播变量(broadcast variable)和累加器(accumulator)

2.累加器

累加器:分布式共享只写变量。(Executor和Executor之间不能读数据)。

累加器用来把Executor端变量信息聚合到Driver端。在Driver中定义的一个变量,在Executor端的每个task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行合并计算。

2.1.累加器原理

未使用累加器的场合:

在Spark中,如果想要在计算过程中共享变量,直接在driver中声明是不起作用的,因为Spark不会将这个变量在计算节点中传递。

image-20221028191442855

使用累加器的场合:

累加器是Spark提供的一种共享数据的处理方式,也就意味着如果在Spark计算中共享数据,必须遵循Spark的规则,然后由Spark在计算节点中进行传递

image-20221028192220372

2.2.累加器使用

(1)累加器定义(SparkContext.accumulator(initialValue)方法)

LongAccumulator acc = JavaSparkContext.toSparkContext(sc).longAccumulator();

(2)累加器添加数据(累加器.add方法)

cc.add(v1);

(3)累加器获取数据(累加器.value)

acc.value();

注意:Executor端的任务不能读取累加器的值(例如:在Executor端调用sum.value,获取的值不是累加器最终的值)。因此我们说,累加器是一个分布式共享只写变量。

累加器要放在行动算子中

因为转换算子执行的次数取决于job的数量,如果一个spark应用有多个行动算子,那么转换算子中的累加器可能会发生不止一次更新,导致结果错误。所以,如果想要一个无论在失败还是重复计算时都绝对可靠的累加器,我们必须把它放在foreach()这样的行动算子中。

2.3.累加器使用示例代码

使用累加器实现基础的求和操作

public class Acc {
    public static void main(String[] args) {
        // 1 创建SparkConf
        SparkConf sparkConf = new SparkConf().setAppName("SparkCore").setMaster("local[2]");

        // 2 创建SparkContext
        JavaSparkContext sc = new JavaSparkContext(sparkConf);

        // 创建一个累加器,不同的数据类型选择不同类型的累加器
        LongAccumulator acc = JavaSparkContext.toSparkContext(sc).longAccumulator();

        // 3 编写代码
        // 从集合创建一组数据,并配置两个以上分区
        JavaRDD<Integer> javaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2);
        // 通过map 算子遍历所有数据,并通过累加器add方法求和
        JavaRDD<Integer> map = javaRDD.map(new Function<Integer, Integer>() {
            @Override
            public Integer call(Integer v1) throws Exception {
                acc.add(v1);
                //System.out.println(acc.value());
                return v1;
            }
        });
        map.collect();

        System.out.println("输出累加器计算后的结果");
        System.out.println(acc.value());

        // 4 关闭资源
        sc.stop();
    }
}

累加器结合foreach

累加器最好的使用方式就是结合foreach使用,因为别的行动算子没有办法自定义逻辑,foreach就相当于一个自定义的行动算子,可以在里面自己写逻辑,累加器加foreach可以实现解耦的效果,把累加器拆解开来使用。

public class Acc_foreach {
    public static void main(String[] args) {
        // 1 创建SparkConf
        SparkConf sparkConf = new SparkConf().setAppName("SparkCore").setMaster("local[2]");

        // 2 创建SparkContext
        JavaSparkContext sc = new JavaSparkContext(sparkConf);
        LongAccumulator acc = JavaSparkContext.toSparkContext(sc).longAccumulator();
        LongAccumulator acc2 = JavaSparkContext.toSparkContext(sc).longAccumulator();

        // 3 编写代码
        JavaRDD<Integer> javaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2);

        JavaRDD<Integer> map = javaRDD.map(x -> x);

        map.foreach(new VoidFunction<Integer>() {
            @Override
            public void call(Integer integer) throws Exception {
                acc.add(integer);
            }
        });

        map.foreach(new VoidFunction<Integer>() {
            @Override
            public void call(Integer integer) throws Exception {
                acc2.add(integer);
            }
        });

        System.out.println(acc.value());
        System.out.println(acc2.value());

        // 4 关闭资源
        sc.stop();
    }
}

3.广播变量

广播变量:分布式共享只读变量。

广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个或多个Spark Task操作使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表,广播变量用起来会很顺手。在多个Task并行操作中使用同一个变量,但是Spark会为每个Task任务分别发送。

在可接受的范围内,变量越大,越需要广播! 在能广播的前提下,待广播的数据量越大,提升效率越明显。内存资源占用的减小很明显

如果待广播特别小,就没有广播的必要性了。

3.1.广播变量原理

如果使用广播变量的工作机制,则一个worker中启动的某个executor中的多个Task 就可以共用一份数据,这个广播数据就是存储在 存储内存中,这个内存有可能是堆内内存,也有可能是堆外内存

image-20221028190325908

image-20221028190422279

3.2.广播变量使用

(1)调用SparkContext.broadcast(广播变量)创建出一个广播对象,任何可序列化的类型都可以这么实现。

(2)通过广播变量.value,访问该对象的值。

(3)广播变量只会被发到各个节点一次,作为只读值处理(修改这个值不会影响到别的节点)。

3.3.代码示例

使用Spark Core模拟一个抽奖的小程序:

public class Broadcost {
    public static void main(String[] args) {
        // 1 创建SparkConf
        SparkConf sparkConf = new SparkConf().setAppName("SparkCore").setMaster("local[2]");

        // 2 创建SparkContext
        JavaSparkContext sc = new JavaSparkContext(sparkConf);

        // 3 编写代码
        // 定义中奖的数据
        List<Integer> list = Arrays.asList(5, 6, 7);
        // 定义广播变量(分布式只读变量)
        Broadcast<List<Integer>> broadcast = sc.broadcast(list);

        // 定义抽奖的奖池
        JavaRDD<Integer> javaRDD = sc.parallelize(Arrays.asList(2, 6, 4, 9, 10, 3, 7, 8), 4);

        JavaRDD<Integer> filter = javaRDD.filter(new Function<Integer, Boolean>() {
            @Override
            public Boolean call(Integer v1) throws Exception {
                List<Integer> value = broadcast.value();
                // 判断list集合是否包含v1,包含则返回
                return value.contains(v1);
            }
        });

        filter.collect().forEach(System.out::println);

        // 4 关闭资源
        sc.stop();
    }
}

Broadcast与map进行无shuffle join代码示例:

// 传统的join操作会导致shuffle操作。
// 因为两个RDD中,相同的key都需要通过网络拉取到一个节点上,由一个task进行join操作。
val rdd3 = rdd1.join(rdd2)
// Broadcast+map的join操作,不会导致shuffle操作。
// 使用Broadcast将一个数据量较小的RDD作为广播变量。
val rdd2Data = rdd2.collect()
val rdd2DataBroadcast = sc.broadcast(rdd2Data)
// 在rdd1.map算子中,可以从rdd2DataBroadcast中,获取rdd2的所有数据。
// 然后进行遍历,如果发现rdd2中某条数据的key与rdd1的当前数据的key是相同的,那么就判定可以进行join。
// 此时就可以根据自己需要的方式,将rdd1当前数据与rdd2中可以连接的数据,拼接在一起(String或Tuple)。
val rdd3 = rdd1.map(rdd2DataBroadcast...)
// 注意,以上操作,建议仅仅在rdd2的数据量比较少(比如几百M,或者一两G)的情况下使用。
// 因为每个Executor的内存中,都会驻留一份rdd2的全量数据。

转载请注明:西门飞冰的博客 » Spark RDD 共享变量

喜欢 (0)or分享 (0)