由于blog各种垃圾评论太多,而且本人审核评论周期较长,所以懒得管理评论了,就把评论功能关闭,有问题可以直接qq骚扰我

Spark RDD 分区规则和分区器

大数据 西门飞冰 2757℃
[隐藏]

1.前言

默认情况下,Spark可以将一个作业切分多个任务后,发送给Executor节点并行计算,而能够并行计算的任务数量我们称之为并行度。这个数量可以在构建RDD时指定。注意:这里的并行执行的任务数量,并不是指的切分任务的数量。

Spark分区的目的是为了并行计算,因为一个分区就是一个task进行计算,多个task可以同时在不同Executor执行。注意一个Executor只能运行一个task,属于并发执行,而不是并行执行。

一个任务需要多少个分区,是可以进行手动设置的。

spark可以进行分区的位置有两个:

一个是最初始从文件和集合读取数据的时候,根据分区规则给数据进行分区

一个是在使用Shuffle的时候,使用分区器给数据进行重分区

分区的好处:

1、创建足够的分区,在足够的资源下可以并行计算。

2、面对数据倾斜的场景,可以通过增加分区数量和自定义分区规则,进行规避。

2.RDD 分区规则

2.1.从集合中创建RDD

public class list_partition {
    public static void main(String[] args) {

        // 1 创建Spark配置
        SparkConf sparkConf = new SparkConf().setAppName("SparkCore").setMaster("local[2]");

        // 2 创建SparkContext
        JavaSparkContext sc = new JavaSparkContext(sparkConf);

        // 从集合当中创建RDD的分区规则:
      	// parallelize 方法传递的第二个参数表示分区的数量,不传递则使用默认值
      	// 分区默认值:是当前运行环境的最大可用核数
        JavaRDD<Integer> javaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2);

        // 数据分区的情况
        // 利用整数除机制  左闭右开
        // 分区0 => start 0*5/3=0  end 1*5/3=1 => 存储数据1
        // 分区1 => start 1*5/3=1  end 2*5/3=3 => 存储数据2,3
        // 分区2 => start 2*5/3=3  end 3*5/3=5 => 存储数据4,5
        javaRDD.saveAsTextFile("output");

        // 4 关闭资源
        sc.stop();
    }
}

2.2.从文件中创建RDD

1.txt 文件内容如下

image-20221026100920101

public class file_partition {
    public static void main(String[] args) {
        // 1 创建SparkConf
        SparkConf sparkConf = new SparkConf().setAppName("SparkCore").setMaster("local[*]");

        // 2 创建SparkContext
        JavaSparkContext sc = new JavaSparkContext(sparkConf);

        // 3 编写代码

        // textFile可以将文件作为数据处理的数据源,也可以设定分区
        // minPartitions:最小分区数量
        // 这里设置了2个分区,output实际输出了三个分区
      	// 分区规则:具体的分区个数需要通过公式计算 
      	// 首先获取文件的总长度:totalSize = 7Byte  文件总字节数包含 回车换行等字符
      	// 计算平均长度:7 / 2 = 3Byte   goalSize = totalSize / numSplits
      	// 获取块大小 128M
        // minSize = 1Byte
        // 计算切分大小  splitSize = Math.max(minSize, Math.min(goalSize, blockSize));
        // splitSize = 3byte  Math.max(1Byte, Math.min(3Byte, 128M));
        // 判断剩余的totalSize大小, 是否大于 splitSize的1.1倍   大于 切片, 小于不切
        // 7 / 3 = 2... 1(1.1) + 1 = 3(分区)
      	// 如果数据源为多个文件,那么计算分区时以文件为单位进行分区
        JavaRDD<String> javaRDD = sc.textFile("input/1.txt",  2);
      
        // 数据分区规则:
        // 1、数据以行为单位进行读取
        // spark读取文件,采用的是Hadoop的方式一行一行读取,和字节数没有关系
        // 2、数据读取时以偏移量为单位,偏移量不会被重复读取
        // (文件数据)1@@   =>   (偏移量) 012
        // (文件数据)2@@   =>   (偏移量) 345
        // (文件数据)3		 =>   (偏移量) 6
        // 3、数据分区的偏移量范围的计算,和分区的计算规则一致,分区0和分区1偏移量长度是3是因为分区平均长度是3Byte,分区2偏移量长度是1,是因为计算分区最后余数是1Byte
        // 分区0   =>  偏移量[0,3] ,这里的偏移量3包含3  => 存储数据1,2
        // 分区1   =>  偏移量[3,6], 这里的偏移量6包含6 =>  存储数据3
        // 分区2   =>  偏移量[6,7]  => 没有数据存储
      	// 4、如果切分的位置位于一行的中间  会在当前分区读完一整行数据
        javaRDD.saveAsTextFile("output");      

        // 4 关闭资源
        sc.stop();
    }
}

3.分区器介绍

Spark分区器目前支持Hash分区、Range分区和用户自定义分区。Hash分区为当前的默认分区。分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle后进入哪个分区和Reduce的个数。

什么时候会用到分区器?

Shuffle的算子才会用到分区器,给我们的数据进行重分区

注意:

1、分区器是pairRDD专属的,并且只有Key-Value类型的pairRDD才有分区器,非Key-Value类型的RDD分区的值是None

2、Java RDD没有分区器,他只是有一些分区和分区的规则而已

3、每个RDD的分区ID范围:0~numPartitions-1,决定这个值是属于那个分区的。

3.1.Hash分区器原理

Hash 分区原理:对于给定的key,计算其hashCode,并除以分区的个数取余,如果余数小于0,则用余数+分区的个数(否则加0),最后返回的值就是这个key所属的分区ID。

Hash 分区弊端:可能导致每个分区中数据量的不均匀,极端情况下会导致某些分区拥有RDD的全部数据,从而造成数据倾斜。

3.2.Range分区器原理

RangePartitioner作用:将一定范围内的数映射到某一个分区内,尽量保证每个分区中数据量均匀,而且分区与分区之间是有序的,一个分区中的元素肯定都是比另一个分区内的元素小或者大,但是分区内的元素是不能保证顺序的。简单的说就是将一定范围内的数映射到某一个分区内。

实现过程为:

第一步:先从整个RDD中采用水塘抽样算法,抽取出样本数据,将样本数据排序,计算出每个分区的最大key值,形成一个Array[KEY]类型的数组变量rangeBounds;

第二步:判断key在rangeBounds中所处的范围,给出该key值在下一个RDD中的分区id下标;该分区器要求RDD中的KEY类型必须是可以排序的

1)我们假设有100万条数据要分4个区

2)从100万条中抽100个数(1,2,3, ….. 100)

3)对100个数进行排序,然后均匀的分为4段

4)获取100万条数据,每个值与4个分区的范围比较,放入合适分区

Range 分区弊端:因为每一个数据集数据都不一样,也可能导致出现数据倾斜问题

Range 分区器在实际工作中基本没有使用场景

3.3.自定义分区器实践

无论是Hash分区器还是Range分区器都没有办法从根源上解决数据倾斜的问题。要是想从根本上解决数据倾斜问题,一般需要使用自定义分区器来根据数据特征自定义数据分区规则。

如下案例是一个通过自定义分区器解决数据倾斜问题的场景

RDD 代码:

public class Test_CustomPartitioner {
    public static void main(String[] args) {
        // 1 创建SparkConf
        SparkConf sparkConf = new SparkConf().setAppName("SparkCore").setMaster("local[2]");

        // 2 创建SparkContext
        JavaSparkContext sc = new JavaSparkContext(sparkConf);

        // 3 编写代码
        // 手动模拟的数据倾斜数据集
        ArrayList<Tuple2<Integer, Integer>> list = new ArrayList<>();
        list.add(new Tuple2<>(1, 1));list.add(new Tuple2<>(1, 1));list.add(new Tuple2<>(1, 1));
        list.add(new Tuple2<>(1, 1));list.add(new Tuple2<>(1, 1));list.add(new Tuple2<>(1, 1));
        list.add(new Tuple2<>(1, 1));list.add(new Tuple2<>(1, 1));list.add(new Tuple2<>(1, 1));
        list.add(new Tuple2<>(3, 5));
        list.add(new Tuple2<>(5, 5));
        list.add(new Tuple2<>(4, 5));
        list.add(new Tuple2<>(2, 2));list.add(new Tuple2<>(2, 2));list.add(new Tuple2<>(2, 2));
        list.add(new Tuple2<>(2, 2));list.add(new Tuple2<>(2, 2));list.add(new Tuple2<>(2, 2));
        list.add(new Tuple2<>(2, 2));list.add(new Tuple2<>(2, 2));list.add(new Tuple2<>(2, 2));

        // 创建一个RDD,指定两个分区
        JavaPairRDD<Integer, Integer> pairRDD = sc.parallelizePairs(list, 2);

        // 对数据进行排序,1和2两个大key进入一个分区,这里就出现了数据倾斜
        JavaPairRDD<Integer, Integer> sortByKey = pairRDD.sortByKey();
        // 使用自定义分区器
        JavaPairRDD<Integer, Integer> partitionBy = sortByKey.partitionBy(new CustomPartitioner(2));

        // 查看自定义分区器后的结果
        JavaRDD<String> partitionsWithIndex = partitionBy.mapPartitionsWithIndex(new Function2<Integer, Iterator<Tuple2<Integer, Integer>>, Iterator<String>>() {

            @Override
            public Iterator<String> call(Integer v1, Iterator<Tuple2<Integer, Integer>> v2) throws Exception {

                ArrayList<String> result = new ArrayList<>();
                while (v2.hasNext()) {
                    Tuple2<Integer, Integer> next = v2.next();
                    result.add(v1 + "----" + next._1);
                }
                return result.iterator();
            }
        }, true);

        partitionsWithIndex.collect().forEach(System.out::println);

        // 4 关闭资源
        sc.stop();
    }
}

自定义分区器代码:

// 自定义分区器需要继承Partitioner类
public class CustomPartitioner extends Partitioner {

    private int numPartitions;

    public CustomPartitioner() {
    }

    // 带参构造器,自定义分区数量
    public CustomPartitioner(int numPartitions) {
        this.numPartitions = numPartitions;
    }

    @Override
    public int numPartitions() {
        return numPartitions;
    }

    //return 的结果就是分区号
    @Override
    public int getPartition(Object key) {
        // Object类型转换成int类型,这里传递进来是什么类型,就转换成什么类型
        int newKey = (int) key;

        // 判断key等于1放到0号分区,等于2放到1号分区,其他数据根据传入的分区数量取模决定
        if (newKey == 1) {
            return 0;
        } else if  (newKey == 2) {
            return 1;
        } else {
            return newKey % numPartitions();
        }
    }

    public int getNumPartitions() {
        return numPartitions;
    }

    public void setNumPartitions(int numPartitions) {
        this.numPartitions = numPartitions;
    }
}

转载请注明:西门飞冰的博客 » Spark RDD 分区规则和分区器

喜欢 (0)or分享 (0)