1.简介
Spark 支持使用Pod Template文件定义Driver和Executor的Pod规格信息,在Pod Template中可以使用Kubernetes原生支持的语法,这样不仅可以极大简化spark-submit的参数数量,而且可以在Pod Template中添加spark-submit不支持的配置项。
Pod Template文件可以存放在HDFS、S3或者HTTP服务器上,通过相应的协议获取。
2.自定义PodTemplate
本次示例:使用nginx HTTP服务器存放Pod Template文件。
[root@k8s-demo001 ~]# cat executor-wc.yaml apiVersion: v1 kind: Pod metadata: labels: app.kubernetes.io/name: apache-spark app.kubernetes.io/instance: apache-spark app.kubernetes.io/version: v3.2.3 namespace: apache-spark name: executor spec: serviceAccountName: spark-service-account hostAliases: - ip: "172.16.252.105" hostnames: - "k8s-demo001" - ip: "172.16.252.134" hostnames: - "k8s-demo002" - ip: "172.16.252.135" hostnames: - "k8s-demo003" - ip: "172.16.252.136" hostnames: - "k8s-demo004" containers: - image: apache/spark:v3.2.3 name: executor imagePullPolicy: IfNotPresent env: - name: TZ value: Asia/Shanghai resources: requests: cpu: 1 memory: 1Gi limits: cpu: 1 memory: 1Gi
2、提交spark 作业
./spark-3.2.3/bin/spark-submit \ --name BasicWordCount \ --verbose \ --master k8s://https://172.16.252.105:6443 \ --deploy-mode client \ --conf spark.network.timeout=300 \ --conf spark.executor.instances=3 \ --conf spark.driver.cores=1 \ --conf spark.executor.cores=1 \ --conf spark.driver.memory=1024m \ --conf spark.executor.memory=1024m \ --conf spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" \ --conf spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" \ --conf spark.kubernetes.namespace=apache-spark \ --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark-service-account \ --conf spark.kubernetes.authenticate.executor.serviceAccountName=spark-service-account \ --conf spark.kubernetes.container.image.pullPolicy=IfNotPresent \ --conf spark.kubernetes.container.image=apache/spark:v3.2.3 \ --conf spark.kubernetes.executor.podTemplateFile=http://172.16.252.105:8080/executor-wc.yaml \ --class org.fblinux.BasicWordCount \ http://172.16.252.105:8080/spark-on-k8s-demo-1.0-SNAPSHOT.jar \ http://172.16.252.105:8080/wc_input.txt
虽然在pod template文件中指定了image,spark-submit仍需指定apache-spark镜像,否则会报错
3、提交完成之后观察driver pod和executor pod
Client模式下,Driver运行在K8s之外,为此,我们需要确保运行Driver的节点与Executor Pod的网络要联通。
3.读写HDFS 实践
1、Spark要读写Hadoop HDFS,需要使用Hadoop的core-site.xml和hdfs-site.xml配置文件,为此,需要先将这2个文件放到spark/conf路径下
2、HDFS 创建相关目录,上传测试数据文件
[root@hadoop-test01 ~]# hadoop fs -mkdir /tmp/spark [root@hadoop-test01 ~]# hadoop fs -mkdir /tmp/spark/input [root@hadoop-test01 ~]# hadoop fs -mkdir /tmp/spark/output
上传测试数据文件
[root@hadoop-test01 ~]# cat wc_input.txt Apache Spark is a unified analytics engine for large-scale data processing It provides high-level APIs in Java Scala Python and R and an optimized engine that supports general execution graphsIt also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing pandas API on Spark for pandas workloads MLlib for machine learning GraphX for graph processing and Structured Streaming for incremental computation and stream processing Get Spark from the downloads page of the project website This documentation is for Spark version 340 Spark uses Hadoop’s client libraries for HDFS and YARN Downloads are pre-packagedfor a handful of popular Hadoop versions Users can also download a Hadoop free binary and run Spark with any Hadoop version by augmenting Spark’s classpath Scala and Java users can include Spark in their projects using its Maven coordinates and Python users can install Spark from PyPI If you’d like to build Spark from source visit Building Spark Spark runs on both Windows and UNIX-like systems eg Linux Mac OS and it should run on any platform that runs a supported version of Java This should include JVMs on x86_64 and ARM64 It’s easy to run locally on one machine — all you need is to have java installed on your system PATH or the JAVA_HOME environment variable pointing to a Java installation [root@hadoop-test01 ~]# hdfs dfs -put wc_input.txt /tmp/spark/input
3、编写读写HDFS的示例程序,并打包
实例程序使用Java + Maven开发,Java版本是1.8,Spark版本是3.2.3,Scala版本是2.12。
import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.SparkSession; import scala.Tuple2; import java.util.Arrays; import java.util.List; import java.util.regex.Pattern; public final class BasicWordCount2 { protected static final Logger parentLogger = LogManager.getLogger(BasicWordCount2.class); private static final Logger logger = parentLogger; private static final Pattern SPACE = Pattern.compile(" "); public static void main(String[] args) throws Exception { // 通过传参方式传入源路径和目标路径 if (args.length != 2) { logger.error("Usage: BasicWordCount2 <inFilePath> <outFilePath>"); System.exit(1); } SparkSession spark = SparkSession .builder() .appName("BasicWordCount2") .getOrCreate(); // 1 读取文件 JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD(); // 2 逐行分词 JavaRDD<String> words = lines.flatMap(s -> Arrays.asList(SPACE.split(s)).iterator()); // 3 构建单词Tuple JavaPairRDD<String, Integer> ones = words.mapToPair(s -> new Tuple2<>(s, 1)); // 4 统计单词出现次数 JavaPairRDD<String, Integer> counts = ones.reduceByKey((i1, i2) -> i1 + i2); // 5 收集统计结果 List<Tuple2<String, Integer>> output = counts.collect(); // 6 打印结果 for (Tuple2<?,?> tuple : output) { System.out.println(tuple._1() + ": " + tuple._2()); logger.info(tuple._1() + ": " + tuple._2()); } // 7 将结果保存到目标文件 counts.saveAsTextFile(args[1]); spark.stop(); } }
4、编写自定义driver和executor pod模板文件
driver 配置:
[root@k8s-demo001 nginx_down]# cat driver-hadoop.yaml apiVersion: v1 kind: Pod metadata: labels: app.kubernetes.io/name: apache-spark app.kubernetes.io/instance: apache-spark app.kubernetes.io/version: v3.2.3 namespace: apache-spark name: driver spec: serviceAccountName: spark-service-account hostAliases: - ip: "172.16.252.105" hostnames: - "k8s-demo001" - ip: "172.16.252.134" hostnames: - "k8s-demo002" - ip: "172.16.252.135" hostnames: - "k8s-demo003" - ip: "172.16.252.136" hostnames: - "k8s-demo004" containers: - image: apache/spark:v3.2.3 name: driver imagePullPolicy: IfNotPresent env: - name: TZ value: Asia/Shanghai - name: HADOOP_USER_NAME value: root - name: SPARK_USER value: root resources: requests: cpu: 1 memory: 1Gi limits: cpu: 1 memory: 1Gi volumeMounts: - name: spark-jar # 挂载Jar mountPath: /opt/spark/appJars - name: spark-historyserver # 挂载eventLog归档目录 mountPath: /opt/spark/eventLog - name: spark-logs # 挂载日志 mountPath: /opt/spark/logs volumes: - name: spark-jar persistentVolumeClaim: claimName: spark-jar-pvc - name: spark-historyserver persistentVolumeClaim: claimName: spark-historyserver-pvc - name: spark-logs persistentVolumeClaim: claimName: spark-logs-pvc
executor配置:
[root@k8s-demo001 nginx_down]# cat executor-hadoop.yaml apiVersion: v1 kind: Pod metadata: labels: app.kubernetes.io/name: apache-spark app.kubernetes.io/instance: apache-spark app.kubernetes.io/version: v3.2.3 namespace: apache-spark name: executor spec: serviceAccountName: spark-service-account hostAliases: - ip: "172.16.252.105" hostnames: - "k8s-demo001" - ip: "172.16.252.134" hostnames: - "k8s-demo002" - ip: "172.16.252.135" hostnames: - "k8s-demo003" - ip: "172.16.252.136" hostnames: - "k8s-demo004" containers: - image: apache/spark:v3.2.3 name: executor imagePullPolicy: IfNotPresent env: - name: TZ value: Asia/Shanghai - name: HADOOP_USER_NAME value: root - name: SPARK_USER value: root resources: requests: cpu: 1 memory: 1Gi limits: cpu: 1 memory: 1Gi volumeMounts: - name: spark-jar # 挂载Jar mountPath: /opt/spark/appJars - name: spark-logs # 挂载日志 mountPath: /opt/spark/logs volumes: - name: spark-jar persistentVolumeClaim: claimName: spark-jar-pvc - name: spark-logs persistentVolumeClaim: claimName: spark-logs-pvc
5、提交spark作业
./spark-3.2.3-hadoop/bin/spark-submit \ --name BasicWordCount2 \ --verbose \ --master k8s://https://172.16.252.105:6443 \ --deploy-mode cluster \ --conf spark.network.timeout=300 \ --conf spark.executor.instances=1 \ --conf spark.driver.cores=1 \ --conf spark.executor.cores=1 \ --conf spark.driver.memory=1024m \ --conf spark.executor.memory=1024m \ --conf spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true -Dlog.file=/opt/spark/logs/driver-wc-hadoop.log" \ --conf spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true -Dlog.file=/opt/spark/logs/executor-wc-hadoop.log" \ --conf spark.eventLog.enabled=true \ --conf spark.eventLog.dir=file:///opt/spark/eventLog \ --conf spark.history.fs.logDirectory=file:///opt/spark/eventLog \ --conf spark.kubernetes.namespace=apache-spark \ --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark-service-account \ --conf spark.kubernetes.authenticate.executor.serviceAccountName=spark-service-account \ --conf spark.kubernetes.container.image.pullPolicy=IfNotPresent \ --conf spark.kubernetes.container.image=apache/spark:v3.2.3 \ --conf spark.kubernetes.driver.podTemplateFile=http://172.16.252.105:8080/driver-hadoop.yaml \ --conf spark.kubernetes.executor.podTemplateFile=http://172.16.252.105:8080/executor-hadoop.yaml \ --class org.fblinux.BasicWordCount2 \ local:///opt/spark/appJars/spark-on-k8s-demo-1.0-SNAPSHOT.jar \ /tmp/spark/input/wc_input.txt \ /tmp/spark/output/wc_ouput_1