由于blog各种垃圾评论太多,而且本人审核评论周期较长,所以懒得管理评论了,就把评论功能关闭,有问题可以直接qq骚扰我

(7)spark sql on k8s Shuffle 落盘

大数据 西门飞冰 1364℃
[隐藏]

1.简介

Spark Shuffle通常是在RDD宽依赖的情况下发生,是上游Stage和下游Stage之间传递数据的一种机制。shuffle阶段通常会伴随中间数据的落盘(数据量不大的情况下也可以不落盘而是全部保存在内存中),shuffle的性能高低直接影响了整个Spark程序的性能和吞吐量,尤其是处理大量数据的时候尤为显著。

当有大量中间结果数据需要Shuffle时,数据要落盘是必然的。

在Spark On K8s环境下,Spark默认使用Pod容器的文件系统来保存Shuffle落数,此方式消耗的是K8s Node节点的存储空间,在数据量小的时候这样做没有问题,但当Spark程序需要处理上百GB或上TB数据的时候,Shuffle需要落数的数据量就会很大,为了应对这种场景,实际的应用中通常需要使用高性能的存储,用于shuffle的落数,这里的高性能的存储通常是SSD的云盘,采用pv+pvc的方式,挂载到spark的pod容器里面,而不是保存在容器里的文件系统中。

2.配置PVC 落盘

1、创建ShufflePVC

[root@k8s-demo001 ~]# cat spark-shuffle-pvc.yaml 
#  Spark Shuffle 持久化存储pvc
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: spark-shuffle-pvc  # shuffle pvc名称
  namespace: apache-spark
spec:
  storageClassName: nfs-storage   #sc名称
  accessModes:
    - ReadWriteMany   #采用ReadWriteMany的访问模式
  resources:
    requests:
      storage: 5Gi    #存储容量,根据实际需要更改
[root@k8s-demo001 ~]# kubectl apply -f spark-shuffle-pvc.yaml 

2、创建pod template

[root@k8s-demo001 ~]# cat /data/nginx_down/executor-sql-hadoop-shuffle.yaml 
apiVersion: v1
kind: Pod
metadata:
  labels:
    app.kubernetes.io/name: apache-spark
    app.kubernetes.io/instance: apache-spark
    app.kubernetes.io/version: v3.2.3
  namespace: apache-spark
  name: executor
spec:
  serviceAccountName: spark-service-account
  hostAliases:
    - ip: "172.16.252.105"
      hostnames:
        - "k8s-demo001"
    - ip: "172.16.252.134"
      hostnames:
       - "k8s-demo002"
    - ip: "172.16.252.135"
      hostnames:
        - "k8s-demo003"
    - ip: "172.16.252.136"
      hostnames:
        - "k8s-demo004"
  containers:
    - image: apache/spark:v3.2.3
      name: executor
      imagePullPolicy: IfNotPresent
      env:
        - name: TZ
          value: Asia/Shanghai
        - name: HADOOP_USER_NAME
          value: hadoop 
        - name: SPARK_USER
          value: hadoop
        - name: SPARK_LOCAL_DIRS # 设置shuffle落数的路径
          value: "/opt/spark/shuffle-data"
      resources:
        requests:
          cpu: 1
          memory: 1Gi
        limits:
          cpu: 1
          memory: 1Gi
      volumeMounts:
        - name: spark-logs
          mountPath: /opt/spark/logs
        - name: spark-shuffle   # 挂载shuffle落数路径
          mountPath: /opt/spark/shuffle-data
  volumes:
    - name: spark-logs
      persistentVolumeClaim:
        claimName: spark-logs-pvc
    - name: spark-shuffle
      persistentVolumeClaim:
        claimName: spark-shuffle-pvc

3、提交spark SQL

./spark-3.2.3-hadoop/bin/spark-submit \
 --name spark-sql-shuffle-test-2 \
 --verbose \
 --master k8s://https://172.16.252.105:6443 \
 --deploy-mode client \
 --conf spark.network.timeout=300 \
 --conf spark.executor.instances=3 \
 --conf spark.driver.cores=1 \
 --conf spark.executor.cores=1 \
 --conf spark.driver.memory=1024m \
 --conf spark.executor.memory=1024m \
 --conf spark.driver.host=k8s-demo001 \
 --conf spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true -Dlog.file=/opt/spark/logs/driver-sql-shuffle-2.log" \
 --conf spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true -Dlog.file=/opt/spark/logs/executor-sql-shuffle-2.log" \
 --conf spark.eventLog.enabled=true \
 --conf spark.eventLog.dir=hdfs://hadoop-test01:8020/spark/eventLog \
 --conf spark.history.fs.logDirectory=hdfs://hadoop-test01:8020/spark/eventLog \
 --conf spark.kubernetes.namespace=apache-spark \
 --conf spark.kubernetes.container.image.pullPolicy=IfNotPresent \
 --conf spark.kubernetes.container.image=apache/spark:v3.2.3 \
 --conf spark.kubernetes.executor.podTemplateFile=http://172.16.252.105:8080/executor-sql-hadoop-shuffle.yaml \
 --class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver \
 -e \
 "
 SELECT d.deptno, d.dname, SUM(sal) sum_sal, AVG(sal) avg_sal, MAX(sal) avg_sal, MIN(sal) avg_sal
 FROM spark_emp e,
      spark_dept d
 WHERE e.deptno = d.deptno
 GROUP BY d.deptno, d.dname;
 "

3.将shuffle数据存在内存

仅适合数据量不大,且K8s能分配足够多的内存给Spark程序运行的场景

通过配置 spark.kubernetes.local.dirs.tmpfs=true 实现

./spark-3.2.3-hadoop/bin/spark-submit \
 --name spark-sql-shuffle-test-3 \
 --verbose \
 --master k8s://https://172.16.252.105:6443 \
 --deploy-mode client \
 --conf spark.network.timeout=300 \
 --conf spark.executor.instances=3 \
 --conf spark.driver.cores=1 \
 --conf spark.executor.cores=1 \
 --conf spark.driver.memory=1024m \
 --conf spark.executor.memory=1024m \
 --conf spark.driver.host=k8s-demo001 \
 --conf spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true -Dlog.file=/opt/spark/logs/driver-sql-shuffle-3.log" \
 --conf spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true -Dlog.file=/opt/spark/logs/executor-sql-shuffle-3.log" \
 --conf spark.eventLog.enabled=true \
 --conf spark.eventLog.dir=hdfs://hadoop-test01:8020/spark/eventLog \
 --conf spark.history.fs.logDirectory=hdfs://hadoop-test01:8020/spark/eventLog \
 --conf spark.kubernetes.namespace=apache-spark \
 --conf spark.kubernetes.container.image.pullPolicy=IfNotPresent \
 --conf spark.kubernetes.container.image=apache/spark:v3.2.3 \
 --conf spark.kubernetes.executor.podTemplateFile=http://172.16.252.105:8080/executor-sql-hadoop.yaml \
 --conf spark.kubernetes.local.dirs.tmpfs=true \
 --class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver \
 -e \
 "
 SELECT d.deptno, d.dname, SUM(sal) sum_sal, AVG(sal) avg_sal, MAX(sal) avg_sal, MIN(sal) avg_sal
 FROM spark_emp e,
      spark_dept d
 WHERE e.deptno = d.deptno
 GROUP BY d.deptno, d.dname;
 "

转载请注明:西门飞冰的博客 » (7)spark sql on k8s Shuffle 落盘

喜欢 (0)or分享 (0)