由于blog各种垃圾评论太多,而且本人审核评论周期较长,所以懒得管理评论了,就把评论功能关闭,有问题可以直接qq骚扰我

(1)Spark on K8S的简单实现

大数据 西门飞冰 1514℃
[隐藏]

1.简介

容器化是当前甚至未来一段时间内计算机提供资源的主流方式,当下的云原生就是有力的例证。在这种趋势引领下大数据也会走向容器化,容器化也就意味着存算分离

大数据主要提供海量数据的存储和海量数据的计算这两大类的能力,简单来讲大数据存算分离就是将提供存储的组件和提供计算的组件从物理上分开部署,存储服务和计算服务通过网络进行交互访问,他们可以根据自身的情况进行扩缩容而不影响对方

存算一体架构的优点:

1、部署起来比较容易,如CDH就是基于控制台实现自动化部署

2、具有数据本地化访问的性能优势,spark 会尽可能的将任务分配到数据所在的节点运行,减少数据的网络传输开销

存算一体架构的缺点:

1、产生资源利用不够充分的问题,随着业务的发展,大数据集群中的资源需求,会打破原来存储和计算资源之间的比例平衡,造成某一类资源的利用率一直无法提升,造成资源过剩的情况

2、存储和计算各种组件都安装在一台物理机上,组件之间的运行难免会出现内存和CPU之间的竞争,这不仅会影响不同组件之间的运行,严重的会导致该节点宕机,从而影响整个集群

3、不利于具体组件的扩缩容,原因和第一点类似

4、不利于运维,一台服务器安装了过多的组件,排查问题的时候就需要花费更多的时间,因为需要分析组件之间是否存在相互的影响

spark on K8S 是大数据存算分离的典型实践,通过将大数据和容器化有机结合在一起,以一种全新的姿态来使用大数据技术。

Spark on K8S 官方文档:https://spark.apache.org/docs/3.2.3/running-on-kubernetes.html

2.Spark 部署模式

1、Local 本地模式:直接在本地运行spark程序,这种模式通常是为了开发调试使用的

2、Standalone模式

3、Mesos(v3.2.0 版本废弃)

4、YARN模式

5、Kubernetes模式

3.环境说明

K8S:v1.23.17

spark:3.2.3

4.Spark on K8S 环境初始化

初始化相关的目录,并创建指定的namespace和ServiceAccount,后续将spark作业调度到指定的namespace并使用特定的ServiceAccount运行。

0、创建文件夹

mkdir /root/apache-spark-k8s
mkdir /root/apache-spark-k8s/examples
mkdir /root/apache-spark-k8s/init-yamls

1、创建spark namespace

kubectl create namespace apache-spark

2、创建ServiceAccount

kubectl create serviceaccount spark-service-account -n apache-spark

3、创建Role和RoleBinding

[root@k8s-demo001 init-yamls]# cat spark-role.yaml 
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  labels:
    app.kubernetes.io/name: apache-spark
    app.kubernetes.io/instance: apache-spark
    app.kubernetes.io/version: v3.2.3
  namespace: apache-spark
  name: spark-role
rules:
  - apiGroups: [""]
    resources: ["pods"]
    verbs: ["get", "watch", "list", "create", "delete"]
  - apiGroups: ["extensions", "apps"]
    resources: ["deployments"]
    verbs: ["get", "watch", "list", "create", "delete"]
  - apiGroups: [""]
    resources: ["configmaps"]
    verbs: ["get", "create", "update", "delete"]
  - apiGroups: [""]
    resources: ["secrets"]
    verbs: ["get"]
  - apiGroups: [""]
    resources: ["services"]
    verbs: ["get", "list", "create", "delete"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  labels:
    app.kubernetes.io/name: apache-spark
    app.kubernetes.io/instance: apache-spark
    app.kubernetes.io/version: v3.2.3
  name: spark-role-binding
  namespace: apache-spark
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: Role
  name: spark-role
subjects:
  - kind: ServiceAccount
    name: spark-service-account
    namespace: apache-spark
[root@k8s-demo001 init-yamls]# kubectl apply -f spark-role.yaml

4、创建ClusterRole和ClusterRoleBinding

[root@k8s-demo001 init-yamls]# cat cluster-role.yaml 
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  labels:
    app.kubernetes.io/name: apache-spark
    app.kubernetes.io/instance: apache-spark
    app.kubernetes.io/version: v3.2.3
  name: apache-spark-clusterrole
rules:
  - apiGroups:
      - ''
    resources:
      - configmaps
      - endpoints
      - nodes
      - pods
      - secrets
      - namespaces
    verbs:
      - list
      - watch
      - get
  - apiGroups:
      - ''
    resources:
      - services
    verbs:
      - get
      - list
      - watch
  - apiGroups:
      - ''
    resources:
      - events
    verbs:
      - create
      - patch
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  labels:
    app.kubernetes.io/name: apache-spark
    app.kubernetes.io/instance: apache-spark
    app.kubernetes.io/version: v3.2.3
  name: apache-spark-clusterrole-binding
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: apache-spark-clusterrole
subjects:
  - kind: ServiceAccount
    name: spark-service-account
    namespace: apache-spark
[root@k8s-demo001 init-yamls]# kubectl apply -f cluster-role.yaml

5.spark on k8s环境测试

使用Spark自带的示例程序SparkPi,以Cluster模式运行

1、下载官网的spark二进制包

wget https://archive.apache.org/dist/spark/spark-3.2.3/spark-3.2.3-bin-hadoop3.2.tgz
tar -zxvf spark-3.2.3-bin-hadoop3.2.tgz
mv spark-3.2.3-bin-hadoop3.2 spark-3.2.3

2、查看k8s master的url,对于高可用K8s集群,必须使用kubectl cluster-info返回的Kubernetes control plane URL

image-20230530094821030

3、提交作业到K8s上运行

./spark-3.2.3/bin/spark-submit \
 --name SparkPi \
 --verbose \
 --master k8s://https://172.16.252.105:6443 \		# k8s 集群的地址
 --deploy-mode cluster \
 --conf spark.network.timeout=300 \
 --conf spark.executor.instances=3 \
 --conf spark.driver.cores=1 \
 --conf spark.executor.cores=1 \
 --conf spark.driver.memory=1024m \
 --conf spark.executor.memory=1024m \
 --conf spark.kubernetes.namespace=apache-spark \		
 --conf spark.kubernetes.container.image.pullPolicy=IfNotPresent \
 --conf spark.kubernetes.container.image=apache/spark:v3.2.3 \		# 运行的镜像
 --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark-service-account \
 --conf spark.kubernetes.authenticate.executor.serviceAccountName=spark-service-account \
 --conf spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" \
 --conf spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" \
 --class org.apache.spark.examples.SparkPi \
 local:///opt/spark/examples/jars/spark-examples_2.12-3.2.3.jar \
 3000

观察driver pod和executor pod

image-20230530095346803

spark-submit客户端终止后,K8s上的driver,需要额外另行处理

6.示例程序开发

为了实现后续的测试,这里开发一个WordCount实例程序,该程序可以读取本地或网络上的文本文件,然后统计每个单词出现的次数,并输出到控制台和日志。

1、pom配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.fblinux</groupId>
    <artifactId>spark-on-k8s-demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <spark.version>3.2.3</spark.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <slf4j.version>1.7.30</slf4j.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
            <!--            <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
            <!--            <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
            <!--            <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
            <!--            <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>
    </dependencies>

    <!-- 打包配置-->
    <build>
        <plugins>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>

        </plugins>
    </build>

</project>

2、主程序

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;


public final class BasicWordCount {
    protected static final Logger parentLogger = LogManager.getLogger(BasicWordCount.class);
    private static final Logger logger = parentLogger;

    private static final Pattern SPACE = Pattern.compile(" ");

    public static void main(String[] args) throws Exception {
        // 默认读取当前目录下的wc_input.txt文件
        String fileUrl = "wc_input.txt";

        if (args.length == 1) {
            fileUrl = args[0];
        }

        // 1 创建SparkSession
        SparkSession spark = SparkSession
                .builder()
                // 发布的时候需要注释掉
            .master("local")
                .appName("BasicWordCount")
                .getOrCreate();

        JavaRDD<String> lines;

        // 2 读取文件
        if (fileUrl.startsWith("http")) {
            // 2.1 从http获取文件
            URL url = new URL(fileUrl);
            HttpURLConnection conn = (HttpURLConnection)url.openConnection();
            //设置超时间为3秒
            conn.setConnectTimeout(3*1000);

            //得到输入流
            InputStream inputStream = conn.getInputStream();
            //获取字节数组
            byte[] bytes = readInputStream(inputStream);
            String content = new String(bytes);
            inputStream.close();

            JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
            lines = sc.parallelize(Arrays.asList(content.split("\r\n")), 1);
        } else {
            // 2.2 从本地获取文件
            lines = spark.read().textFile(fileUrl).javaRDD();
        }

        // 3 逐行分词
        JavaRDD<String> words = lines.flatMap(s -> Arrays.asList(SPACE.split(s)).iterator());

        // 4 构建单词Tuple
        JavaPairRDD<String, Integer> ones = words.mapToPair(s -> new Tuple2<>(s, 1));

        // 5 统计单词出现次数
        JavaPairRDD<String, Integer> counts = ones.reduceByKey((i1, i2) -> i1 + i2);

        // 6 收集统计结果
        List<Tuple2<String, Integer>> output = counts.collect();

        // 7 打印结果
        for (Tuple2<?,?> tuple : output) {
            System.out.println(tuple._1() + ": " + tuple._2());
            logger.info(tuple._1() + ": " + tuple._2());
        }

        // 睡眠10s,用于观察Pod的变化
        logger.info("sleep 10s");
        Thread.sleep(10*1000);

        spark.stop();
    }

    /**
     * 从输入流中获取字节数组
     * @param inputStream
     * @return
     * @throws IOException
     */
    public static  byte[] readInputStream(InputStream inputStream) throws IOException {
        byte[] buffer = new byte[1024];
        int len = 0;
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        while((len = inputStream.read(buffer)) != -1) {
            bos.write(buffer, 0, len);
        }
        bos.close();
        return bos.toByteArray();
    }
}

3、在当前工程目录下创建wc_input.txt文本,输入以下内容

Apache Spark is a unified analytics engine for large-scale data processing It provides high-level APIs in Java Scala Python and R and an optimized 
engine that supports general execution graphsIt also supports a rich set of higher-level tools including Spark SQL for SQL and structured data 
processing pandas API on Spark for pandas workloads MLlib for machine learning GraphX for graph processing and Structured Streaming for incremental 
computation and stream processing Get Spark from the downloads page of the project website This documentation is for Spark version 340 Spark uses Hadoop’s 
client libraries for HDFS and YARN Downloads are pre-packagedfor a handful of popular Hadoop versions Users can also download a Hadoop free binary and run 
Spark with any Hadoop version by augmenting Spark’s classpath Scala and Java users can
include Spark in their projects using its Maven coordinates and Python users can install Spark from PyPI
If you’d like to build Spark from source visit Building Spark Spark runs on both Windows and UNIX-like systems eg Linux Mac OS and it should 
run on any platform that runs a supported version of Java This should include JVMs on x86_64 and ARM64 It’s easy to run locally 
on one machine — all you need is to have java installed on your system PATH or the JAVA_HOME environment variable pointing to a Java installation

执行程序,输出如下内容

image-20230530100252739

然后将程序进行打包

7.spark 程序jar包获取方式

方式一:Jar包打进镜像

每个作业都要打一个镜像,容易导致镜像数量过多,不便于管理,不推荐使用

方式二:Jar包通过PV挂载

Jar包不用打到镜像,省去镜像构建工作,只需维护少量几个Spark基础镜像,节省空间,优先推荐此方式

方式三:HTTP/FTP下载方式

可以统一将作业Jar包发布到HTTP文件服务器,例如tomcat、apache或nginx,实现作业Jar包的统一存放,推荐使用

方式四:S3/HDFS方式

将作业Jar包发布到S3对象存储或HDFS上,通过s3a或hdfs schema访问作业Jar包,与方式3类似,但增加了对第三方集群环境的依赖,不推荐使用

8.作业Jar包打入镜像方式

1、上传Spark作业jar包和wc_input.txt到/root/spark-wc/ 下

2、编写Dockerfile构建镜像,并将镜像上传到harbor

Dockerfile

FROM apache/spark:v3.2.3
COPY spark-on-k8s-demo-1.0-SNAPSHOT.jar /opt/spark/spark-on-k8s-demo-1.0.jar
COPY wc_input.txt /opt/spark/wc_input.txt
WORKDIR /opt/spark/work-dir
ENTRYPOINT ["/opt/entrypoint.sh"]
USER 185

构建镜像并上传到harbor

docker build -f Dockerfile -t spark-wc:v3.2.3 .
docker tag spark-wc:v3.2.3 172.16.252.110:8011/sparkonk8s/spark-wc:v3.2.3
docker push 172.16.252.110:8011/sparkonk8s/spark-wc:v3.2.3

3、提交作业到k8s运行

此处传参要使用file://开头,否则Spark会认为是HDFS上的路径

./spark-3.2.3/bin/spark-submit  \
 --name BasicWordCount \
 --verbose \
 --master k8s://https://172.16.252.105:6443 \
 --deploy-mode cluster \
 --conf spark.network.timeout=300 \
 --conf spark.executor.instances=3 \
 --conf spark.driver.cores=1 \
 --conf spark.executor.cores=1 \
 --conf spark.driver.memory=1024m \
 --conf spark.executor.memory=1024m \
 --conf spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" \
 --conf spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" \
 --conf spark.kubernetes.namespace=apache-spark \
 --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark-service-account \
 --conf spark.kubernetes.authenticate.executor.serviceAccountName=spark-service-account \
 --conf spark.kubernetes.container.image.pullPolicy=IfNotPresent \
 --conf spark.kubernetes.container.image=172.16.252.110:8011/sparkonk8s/spark-wc:v3.2.3 \
 --class org.fblinux.BasicWordCount \
 local:///opt/spark/spark-on-k8s-demo-1.0.jar \
 file:///opt/spark/wc_input.txt

4、观察driver pod和executor pod

watch -n 1 kubectl get all -owide -n apache-spark

查看日志的统计结果

kubectl logs basicwordcount-50fa3c886a85a123-driver -n apache-spark

image-20230530104212965

5、删除driver pod

kubectl delete pod basicwordcount-50fa3c886a85a123-driver -n apache-spark

9.作业Jar包PV挂载方式实践

1、创建JAR包的PVC

[root@k8s-demo001 spark-wc]# cat spark-jar-pvc.yaml 
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: spark-jar-pvc  # jar pvc名称
  namespace: apache-spark
spec:
  storageClassName: nfs-storage   #sc名称
  accessModes:
    - ReadOnlyMany   #采用ReadOnlyMany的访问模式
  resources:
    requests:
      storage: 1Gi    #存储容量,根据实际需要更改
[root@k8s-demo001 spark-wc]# kubectl apply -f spark-jar-pvc.yaml 

2、将JAR包和wc_input.txt上传放到PV的实际路径

image-20230530105725654

3、提交spark 作业

此处使用apache-spark的镜像,原始的镜像是不含有spark-on-k8s-demo-1.0.jar的

./spark-3.2.3/bin/spark-submit \
 --name BasicWordCount \
 --verbose \
 --master k8s://https://172.16.252.105:6443 \
 --deploy-mode cluster \
 --conf spark.network.timeout=300 \
 --conf spark.executor.instances=3 \
 --conf spark.driver.cores=1 \
 --conf spark.executor.cores=1 \
 --conf spark.driver.memory=1024m \
 --conf spark.executor.memory=1024m \
 --conf spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" \
 --conf spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" \
 --conf spark.kubernetes.namespace=apache-spark \
 --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark-service-account \
 --conf spark.kubernetes.authenticate.executor.serviceAccountName=spark-service-account \
 --conf spark.kubernetes.container.image.pullPolicy=IfNotPresent \
 --conf spark.kubernetes.container.image=apache/spark:v3.2.3 \
 --conf spark.kubernetes.driver.volumes.persistentVolumeClaim.jars-pvc.mount.path=/opt/spark/appJars \
 --conf spark.kubernetes.driver.volumes.persistentVolumeClaim.jars-pvc.options.claimName=spark-jar-pvc \
 --conf spark.kubernetes.executor.volumes.persistentVolumeClaim.jars-pvc.mount.path=/opt/spark/appJars \
 --conf spark.kubernetes.executor.volumes.persistentVolumeClaim.jars-pvc.options.claimName=spark-jar-pvc \
 --class org.fblinux.BasicWordCount \
 local:///opt/spark/appJars/spark-on-k8s-demo-1.0-SNAPSHOT.jar \
 file:///opt/spark/appJars/wc_input.txt

4、验证

watch -n 1 kubectl get all -owide -n apache-spark
kubectl logs pod/basicwordcount-a70b5e886a9f1849-driver -n apache-spark

观察driver pod和executor pod

5、删除Driver Pod

kubectl delete pod basicwordcount-a70b5e886a9f1849-driver -n apache-spark

10.作业Jar包HTTP方式实践

1、将spark作业jar包和wc_input.txt上传到nginx下载路径下

2、提交spark 作业

./spark-3.2.3/bin/spark-submit \
 --name BasicWordCount \
 --verbose \
 --master k8s://https://172.16.252.105:6443 \
 --deploy-mode cluster \
 --conf spark.network.timeout=300 \
 --conf spark.executor.instances=3 \
 --conf spark.driver.cores=1 \
 --conf spark.executor.cores=1 \
 --conf spark.driver.memory=1024m \
 --conf spark.executor.memory=1024m \
 --conf spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" \
 --conf spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" \
 --conf spark.kubernetes.namespace=apache-spark \
 --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark-service-account \
 --conf spark.kubernetes.authenticate.executor.serviceAccountName=spark-service-account \
 --conf spark.kubernetes.container.image.pullPolicy=IfNotPresent \
 --conf spark.kubernetes.container.image=apache/spark:v3.2.3 \
 --class org.fblinux.BasicWordCount \
http://172.16.252.105:8080/spark-on-k8s-demo-1.0-SNAPSHOT.jar \
http://172.16.252.105:8080/wc_input.txt

验证方式和pv以及镜像方式一致

11.作业日志文件持久化

1、创建日志的PVC

[root@k8s-demo001 ~]# cat spark-logs-pvc.yaml 
#  Spark日志 持久化存储pvc
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: spark-logs-pvc  # 日志 pvc名称
  namespace: apache-spark
spec:
  storageClassName: nfs-storage   #sc名称
  accessModes:
    - ReadWriteMany   #采用ReadWriteMany的访问模式
  resources:
    requests:
      storage: 1Gi    #存储容量,根据实际需要更改

2、编写log4j.properties

[root@k8s-demo001 ~]# cat spark-3.2.3/conf/log4j.properties
# Set everything to be logged to the console
log4j.rootCategory=INFO, console, logRollingFile
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

# Set the default spark-shell/spark-sql log level to WARN. When running the
# spark-shell/spark-sql, the log level for these classes is used to overwrite
# the root logger's log level, so that the user can have different defaults
# for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=WARN
log4j.logger.org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver=WARN

# Settings to quiet third party logs that are too verbose
log4j.logger.org.sparkproject.jetty=WARN
log4j.logger.org.sparkproject.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR

# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR

# For deploying Spark ThriftServer
# SPARK-34128:Suppress undesirable TTransportException warnings involved in THRIFT-4805
log4j.appender.console.filter.1=org.apache.log4j.varia.StringMatchFilter
log4j.appender.console.filter.1.StringToMatch=Thrift error occurred during processing of message
log4j.appender.console.filter.1.AcceptOnMatch=false

log4j.appender.logRollingFile = org.apache.log4j.RollingFileAppender
log4j.appender.logRollingFile.layout = org.apache.log4j.PatternLayout
log4j.appender.logRollingFile.layout.ConversionPattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log4j.appender.logRollingFile.Threshold = INFO
log4j.appender.logRollingFile.ImmediateFlush = TRUE
log4j.appender.logRollingFile.Append = TRUE
log4j.appender.logRollingFile.File = ${log.file}
log4j.appender.logRollingFile.MaxFileSize = 100MB
log4j.appender.logRollingFile.MaxBackupIndex = 10
log4j.appender.logRollingFile.Encoding = UTF-8 

3、提交spark 作业

./spark-3.2.3/bin/spark-submit \
 --name BasicWordCount \
 --verbose \
 --master k8s://https://172.16.252.105:6443 \
 --deploy-mode cluster \
 --conf spark.network.timeout=300 \
 --conf spark.executor.instances=1 \
 --conf spark.driver.cores=1 \
 --conf spark.executor.cores=1 \
 --conf spark.driver.memory=1024m \
 --conf spark.executor.memory=1024m \
 --conf spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true -Dlog.file=/opt/spark/logs/driver-wc.log" \
 --conf spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true -Dlog.file=/opt/spark/logs/executor-wc.log" \
 --conf spark.kubernetes.namespace=apache-spark \
 --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark-service-account \
 --conf spark.kubernetes.authenticate.executor.serviceAccountName=spark-service-account \
 --conf spark.kubernetes.container.image.pullPolicy=IfNotPresent \
 --conf spark.kubernetes.container.image=apache/spark:v3.2.3 \
 --conf spark.kubernetes.driver.volumes.persistentVolumeClaim.spark-logs-pvc.mount.path=/opt/spark/logs \
 --conf spark.kubernetes.driver.volumes.persistentVolumeClaim.spark-logs-pvc.options.claimName=spark-logs-pvc \
 --conf spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-logs-pvc.mount.path=/opt/spark/logs \
 --conf spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-logs-pvc.options.claimName=spark-logs-pvc \
 --conf spark.kubernetes.driverEnv.TZ=Asia/Shanghai \
 --conf spark.kubernetes.executorEnv.TZ=Asia/Shanghai \
 --class org.fblinux.BasicWordCount \
http://172.16.252.105:8080/spark-on-k8s-demo-1.0-SNAPSHOT.jar \
http://172.16.252.105:8080/wc_input.txt

4、作业运行完成之后,在pvc目录查询日志是否持久化

image-20230530183239847

转载请注明:西门飞冰的博客 » (1)Spark on K8S的简单实现

喜欢 (2)or分享 (0)