由于blog各种垃圾评论太多,而且本人审核评论周期较长,所以懒得管理评论了,就把评论功能关闭,有问题可以直接qq骚扰我

(3)Spark on K8S 自定义Pod Template 读写HDFS

大数据 西门飞冰 290℃
[隐藏]

1.简介

Spark 支持使用Pod Template文件定义Driver和Executor的Pod规格信息,在Pod Template中可以使用Kubernetes原生支持的语法,这样不仅可以极大简化spark-submit的参数数量,而且可以在Pod Template中添加spark-submit不支持的配置项。

Pod Template文件可以存放在HDFS、S3或者HTTP服务器上,通过相应的协议获取。

2.自定义PodTemplate

本次示例:使用nginx HTTP服务器存放Pod Template文件。

1、编写executor pod template,并放到nginx目录下

[root@k8s-demo001 ~]# cat executor-wc.yaml 
apiVersion: v1
kind: Pod
metadata:
  labels:
    app.kubernetes.io/name: apache-spark
    app.kubernetes.io/instance: apache-spark
    app.kubernetes.io/version: v3.2.3
  namespace: apache-spark
  name: executor
spec:
  serviceAccountName: spark-service-account
  hostAliases:
    - ip: "172.16.252.105"
      hostnames:
        - "k8s-demo001"
    - ip: "172.16.252.134"
      hostnames:
        - "k8s-demo002"
    - ip: "172.16.252.135"
      hostnames:
        - "k8s-demo003"
    - ip: "172.16.252.136"
      hostnames:
        - "k8s-demo004"
  containers:
    - image: apache/spark:v3.2.3
      name: executor
      imagePullPolicy: IfNotPresent
      env:
        - name: TZ
          value: Asia/Shanghai
      resources:
        requests:
          cpu: 1
          memory: 1Gi
        limits:
          cpu: 1
          memory: 1Gi

2、提交spark 作业

./spark-3.2.3/bin/spark-submit \
 --name BasicWordCount \
 --verbose \
 --master k8s://https://172.16.252.105:6443 \
 --deploy-mode client \
 --conf spark.network.timeout=300 \
 --conf spark.executor.instances=3 \
 --conf spark.driver.cores=1 \
 --conf spark.executor.cores=1 \
 --conf spark.driver.memory=1024m \
 --conf spark.executor.memory=1024m \
 --conf spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" \
 --conf spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" \
 --conf spark.kubernetes.namespace=apache-spark \
 --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark-service-account \
 --conf spark.kubernetes.authenticate.executor.serviceAccountName=spark-service-account \
 --conf spark.kubernetes.container.image.pullPolicy=IfNotPresent \
 --conf spark.kubernetes.container.image=apache/spark:v3.2.3 \
 --conf spark.kubernetes.executor.podTemplateFile=http://172.16.252.105:8080/executor-wc.yaml \
 --class org.fblinux.BasicWordCount \
http://172.16.252.105:8080/spark-on-k8s-demo-1.0-SNAPSHOT.jar \
http://172.16.252.105:8080/wc_input.txt

虽然在pod template文件中指定了image,spark-submit仍需指定apache-spark镜像,否则会报错

3、提交完成之后观察driver pod和executor pod

Client模式下,Driver运行在K8s之外,为此,我们需要确保运行Driver的节点与Executor Pod的网络要联通。

3.读写HDFS 实践

1、Spark要读写Hadoop HDFS,需要使用Hadoop的core-site.xml和hdfs-site.xml配置文件,为此,需要先将这2个文件放到spark/conf路径下

2、HDFS 创建相关目录,上传测试数据文件

[root@hadoop-test01 ~]# hadoop fs -mkdir /tmp/spark
[root@hadoop-test01 ~]# hadoop fs -mkdir /tmp/spark/input
[root@hadoop-test01 ~]# hadoop fs -mkdir /tmp/spark/output

上传测试数据文件

[root@hadoop-test01 ~]# cat wc_input.txt 
Apache Spark is a unified analytics engine for large-scale data processing It provides high-level APIs in Java Scala Python and R and an optimized
engine that supports general execution graphsIt also supports a rich set of higher-level tools including Spark SQL for SQL and structured data
processing pandas API on Spark for pandas workloads MLlib for machine learning GraphX for graph processing and Structured Streaming for incremental
computation and stream processing Get Spark from the downloads page of the project website This documentation is for Spark version 340 Spark uses Hadoop’s
client libraries for HDFS and YARN Downloads are pre-packagedfor a handful of popular Hadoop versions Users can also download a Hadoop free binary and run
Spark with any Hadoop version by augmenting Spark’s classpath Scala and Java users can
include Spark in their projects using its Maven coordinates and Python users can install Spark from PyPI
If you’d like to build Spark from source visit Building Spark Spark runs on both Windows and UNIX-like systems eg Linux Mac OS and it should
run on any platform that runs a supported version of Java This should include JVMs on x86_64 and ARM64 It’s easy to run locally
on one machine — all you need is to have java installed on your system PATH or the JAVA_HOME environment variable pointing to a Java installation
[root@hadoop-test01 ~]# hdfs dfs -put wc_input.txt /tmp/spark/input

3、编写读写HDFS的示例程序,并打包

实例程序使用Java + Maven开发,Java版本是1.8,Spark版本是3.2.3,Scala版本是2.12。

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;

import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;

public final class BasicWordCount2 {
    protected static final Logger parentLogger = LogManager.getLogger(BasicWordCount2.class);
    private static final Logger logger = parentLogger;

    private static final Pattern SPACE = Pattern.compile(" ");

    public static void main(String[] args) throws Exception {

        // 通过传参方式传入源路径和目标路径
        if (args.length != 2) {
            logger.error("Usage: BasicWordCount2 <inFilePath> <outFilePath>");
            System.exit(1);
        }

        SparkSession spark = SparkSession
                .builder()
                .appName("BasicWordCount2")
                .getOrCreate();

        // 1 读取文件
        JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD();

        // 2 逐行分词
        JavaRDD<String> words = lines.flatMap(s -> Arrays.asList(SPACE.split(s)).iterator());

        // 3 构建单词Tuple
        JavaPairRDD<String, Integer> ones = words.mapToPair(s -> new Tuple2<>(s, 1));

        // 4 统计单词出现次数
        JavaPairRDD<String, Integer> counts = ones.reduceByKey((i1, i2) -> i1 + i2);

        // 5 收集统计结果
        List<Tuple2<String, Integer>> output = counts.collect();

        // 6 打印结果
        for (Tuple2<?,?> tuple : output) {
            System.out.println(tuple._1() + ": " + tuple._2());
            logger.info(tuple._1() + ": " + tuple._2());
        }

        // 7 将结果保存到目标文件
        counts.saveAsTextFile(args[1]);

        spark.stop();
    }
}

4、编写自定义driver和executor pod模板文件

driver 配置:

[root@k8s-demo001 nginx_down]# cat driver-hadoop.yaml 
apiVersion: v1
kind: Pod
metadata:
  labels:
    app.kubernetes.io/name: apache-spark
    app.kubernetes.io/instance: apache-spark
    app.kubernetes.io/version: v3.2.3
  namespace: apache-spark
  name: driver
spec:
  serviceAccountName: spark-service-account
  hostAliases:
    - ip: "172.16.252.105"
      hostnames:
        - "k8s-demo001"
    - ip: "172.16.252.134"
      hostnames:
        - "k8s-demo002"
    - ip: "172.16.252.135"
      hostnames:
        - "k8s-demo003"
    - ip: "172.16.252.136"
      hostnames:
        - "k8s-demo004"
  containers:
    - image: apache/spark:v3.2.3
      name: driver
      imagePullPolicy: IfNotPresent
      env:
        - name: TZ
          value: Asia/Shanghai
        - name: HADOOP_USER_NAME
          value: root
        - name: SPARK_USER
          value: root
      resources:
        requests:
          cpu: 1
          memory: 1Gi
        limits:
          cpu: 1
          memory: 1Gi
      volumeMounts:
        - name: spark-jar   # 挂载Jar
          mountPath: /opt/spark/appJars
        - name: spark-historyserver   # 挂载eventLog归档目录
          mountPath: /opt/spark/eventLog
        - name: spark-logs   # 挂载日志
          mountPath: /opt/spark/logs
  volumes:
    - name: spark-jar
      persistentVolumeClaim:
        claimName: spark-jar-pvc
    - name: spark-historyserver
      persistentVolumeClaim:
        claimName: spark-historyserver-pvc
    - name: spark-logs
      persistentVolumeClaim:
        claimName: spark-logs-pvc

executor配置:

[root@k8s-demo001 nginx_down]# cat executor-hadoop.yaml 
apiVersion: v1
kind: Pod
metadata:
  labels:
    app.kubernetes.io/name: apache-spark
    app.kubernetes.io/instance: apache-spark
    app.kubernetes.io/version: v3.2.3
  namespace: apache-spark
  name: executor
spec:
  serviceAccountName: spark-service-account
  hostAliases:
    - ip: "172.16.252.105"
      hostnames:
        - "k8s-demo001"
    - ip: "172.16.252.134"
      hostnames:
        - "k8s-demo002"
    - ip: "172.16.252.135"
      hostnames:
        - "k8s-demo003"
    - ip: "172.16.252.136"
      hostnames:
        - "k8s-demo004"
  containers:
    - image: apache/spark:v3.2.3
      name: executor
      imagePullPolicy: IfNotPresent
      env:
        - name: TZ
          value: Asia/Shanghai
        - name: HADOOP_USER_NAME
          value: root
        - name: SPARK_USER
          value: root
      resources:
        requests:
          cpu: 1
          memory: 1Gi
        limits:
          cpu: 1
          memory: 1Gi
      volumeMounts:
        - name: spark-jar    # 挂载Jar
          mountPath: /opt/spark/appJars
        - name: spark-logs   # 挂载日志
          mountPath: /opt/spark/logs
  volumes:
    - name: spark-jar
      persistentVolumeClaim:
        claimName: spark-jar-pvc
    - name: spark-logs
      persistentVolumeClaim:
        claimName: spark-logs-pvc

5、提交spark作业

./spark-3.2.3-hadoop/bin/spark-submit \
 --name BasicWordCount2 \
 --verbose \
  --master k8s://https://172.16.252.105:6443 \
 --deploy-mode cluster \
 --conf spark.network.timeout=300 \
 --conf spark.executor.instances=1 \
 --conf spark.driver.cores=1 \
 --conf spark.executor.cores=1 \
 --conf spark.driver.memory=1024m \
 --conf spark.executor.memory=1024m \
 --conf spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true -Dlog.file=/opt/spark/logs/driver-wc-hadoop.log" \
 --conf spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true -Dlog.file=/opt/spark/logs/executor-wc-hadoop.log" \
 --conf spark.eventLog.enabled=true \
 --conf spark.eventLog.dir=file:///opt/spark/eventLog \
 --conf spark.history.fs.logDirectory=file:///opt/spark/eventLog \
 --conf spark.kubernetes.namespace=apache-spark \
 --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark-service-account \
 --conf spark.kubernetes.authenticate.executor.serviceAccountName=spark-service-account \
 --conf spark.kubernetes.container.image.pullPolicy=IfNotPresent \
 --conf spark.kubernetes.container.image=apache/spark:v3.2.3 \
 --conf spark.kubernetes.driver.podTemplateFile=http://172.16.252.105:8080/driver-hadoop.yaml \
 --conf spark.kubernetes.executor.podTemplateFile=http://172.16.252.105:8080/executor-hadoop.yaml \
 --class org.fblinux.BasicWordCount2 \
 local:///opt/spark/appJars/spark-on-k8s-demo-1.0-SNAPSHOT.jar \
 /tmp/spark/input/wc_input.txt \
 /tmp/spark/output/wc_ouput_1

6、验证Hadoop的结果输出

image-20230531115510575

转载请注明:西门飞冰的博客 » (3)Spark on K8S 自定义Pod Template 读写HDFS

喜欢 (0)or分享 (0)