由于blog各种垃圾评论太多,而且本人审核评论周期较长,所以懒得管理评论了,就把评论功能关闭,有问题可以直接qq骚扰我

(4)Spark on K8S checkpoint实践

大数据 西门飞冰 1486℃
[隐藏]

1.简介

元数据checkpoint主要用于Spark Streaming场景,以便Driver从故障中快速恢复任务的DAG和状态数据; 而RDD checkpoint主要是对有状态转换算子的数据做持久化,以切断依赖链,缩短Spark程序恢复时间 在Spark On K8s环境下,checkpoint数据可以通过PVC+PV保存,也可以保存在HDFS或S3上

2.checkpoint 程序开发

为了测试Spark On K8s下与Spark Checkpoint的应用,为此开发一个WordCount示例程序,该程序从Kafka读取文本消息,然后统计文本消息中每个单词出现的次数,并将结果保存到MySQL。

1、开发示例程序:使用Java + Maven开发,Java版本是1.8,Spark版本是3.2.3,Scala版本是2.12。

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function3;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.State;
import org.apache.spark.streaming.StateSpec;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaMapWithStateDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import scala.Tuple2;

import java.sql.*;
import java.util.*;
import java.util.regex.Pattern;

/**
 * 从Kafka读取文本消息,逐行分词并统计单词出现次数,最后将统计结果保存到MySQL
 */
public class StreamWordCount {

    private static final Pattern SPACE = Pattern.compile(" ");

    public static void main(String[] args) throws Exception {
        // 用于本地测试
        int batchDuration = 5;
        String brokers = "172.16.252.129:9092,172.16.252.130:9092,172.16.252.131:9092";
        String topics = "spark_test";
        String checkpoint = "file:///Users/willwang/spark-on-k8s-demo/checkpoint";
        String dbHost = "172.16.252.113";
        String dbPort = "3306";
        String dbName = "spark_test";
        String table = "wc";
        SparkConf sparkConf = new SparkConf().setAppName("StreamWordCount")
                .setMaster("local[1]");

//        在K8s上运行使用以下代码
//        if (args.length < 8) {
//            System.err.println("Usage: StreamWordCount <batchDuration> <brokers> <topics> <checkpoint> " +
//                    "<dbHost> <dbPort> <dbName>


<table>");
//            System.exit(1);
//        }
//        int batchDuration = Integer.parseInt(args[0]);
//        String brokers = args[1];
//        String topics = args[2];
//        String checkpoint = args[3];
//        String dbHost = args[4];
//        String dbPort = args[5];
//        String dbName = args[6];
//        String table = args[7];
//        SparkConf sparkConf = new SparkConf().setAppName("StreamWordCount");

        // 创建Spark Streaming上下文环境,如果checkpoint路径下有数据,则从checkpoint恢复,否则新创建上下文
        JavaStreamingContext jssc = JavaStreamingContext.getOrCreate(checkpoint, () -> {
            JavaStreamingContext context = new JavaStreamingContext(sparkConf, Durations.seconds(batchDuration));
            // 设置checkpoint路径
            context.checkpoint(checkpoint);

            // 设置连接kafka的参数
            Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
            Map<String, Object> kafkaParams = new HashMap<>();
            kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
            kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, "spark-wc");
            kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

            // 创建 kafka stream
            JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
                    context,
                    LocationStrategies.PreferConsistent(),
                    ConsumerStrategies.Subscribe(topicsSet, kafkaParams));

            // 获取kafka消息
            JavaDStream<String> lines = messages.map(ConsumerRecord::value);

            // 对每条字符串消息分词
            JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());

            // 定义有状态mapping函数,累计每个单词出现次数
            Function3<String, org.apache.spark.api.java.Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc =
                    (word, one, state) -> {
                        int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
                        Tuple2<String, Integer> output = new Tuple2<>(word, sum);
                        state.update(sum);
                        return output;
                    };

            // 应用有状态mapping函数
            JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> wordCounts =
                    words.mapToPair(s -> new Tuple2<>(s, 1)).mapWithState(StateSpec.function(mappingFunc));

            // 打印统计结果
            wordCounts.print();

            // 将统计结果保存到MySQL
            wordCounts.foreachRDD(rdd -> {
                //对该rdd的整个批次进行处理,不是对单个元素进行处理
                rdd.foreachPartition(iter -> {

                    // 连接MySQL
                    Connection conn;
                    PreparedStatement insertPS;
                    PreparedStatement updatePS;
                    String url = "jdbc:mysql://" + dbHost + ":" + dbPort + "/" + dbName + "?characterEncoding=utf8&autoReconnect=true&useUnicode=true&useSSL=false";
                    //建立mysql连接
                    conn = DriverManager.getConnection(url, "root", "数据库的密码");

                    //遍历rdd中的数据,将数据保存在mysql上
                    while (iter.hasNext()) {
                        Tuple2<String, Integer> t = iter.next();

                        // 查询某个单词是否已经保存到MySQL
                        String sql = "select count(*) from " + table + " where word = '" + t._1() + "'";
                        Statement statement = conn.createStatement();
                        ResultSet resultSet = statement.executeQuery(sql);
                        long cnt = 0;
                        while (resultSet.next()) {
                            cnt = resultSet.getLong(1);
                        }
                        resultSet.close();
                        statement.close();

                        // 如果已经保存,则更新
                        if (cnt > 0) {
                            String sql2 = "UPDATE " + table + " set cnt = ? where word = ?;";
                            updatePS = conn.prepareStatement(sql2);
                            updatePS.setLong(1, t._2());
                            updatePS.setString(2, t._1());
                            updatePS.executeUpdate();
                            updatePS.close();
                        } else { // 如果没有保存,则新增
                            String sql2 = "INSERT INTO " + table + " (`word`, `cnt`) VALUES(?,?);";
                            insertPS = conn.prepareStatement(sql2);
                            insertPS.setString(1, t._1());
                            insertPS.setLong(2, t._2());
                            insertPS.executeUpdate();
                            insertPS.close();
                        }
                    }

                    // 关闭连接
                    conn.close();
                });
            });

            return context;
        });

        // 启动Spark作业
        jssc.start();
        jssc.awaitTermination();
    }
}

2、本地测试:运行程序后,往Kafka topic输入随机字符串,观察mysql数据库的输出结果,如果mysql的结果输出正常,则说明程序正常

image-20230531161045024

3、将代码切换到K8S 运行模式,并打包

3.spark checkpoint写入pv

1、创建checkpoint pvc

[root@k8s-demo001 ~]# cat spark-checkpoint-pvc.yaml 
#spark checkpoint 持久化存储pvc
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: spark-checkpoint-pvc  # spark checkpoint pvc名称
  namespace: apache-spark   # 指定归属的名命空间
spec:
  storageClassName: nfs-storage   #sc名称,更改为实际的sc名称
  accessModes:
    - ReadWriteMany   #采用ReadWriteMany的访问模式
  resources:
    requests:
      storage: 1Gi    #存储容量,根据实际需要更改
[root@k8s-demo001 ~]# kubectl apply -f spark-checkpoint-pvc.yaml

2、编写自定义driver和executor pod模板文件。driver和executor都要挂载checkpoint pvc,否则Spark在执行checkpoint时会报错。

driver:

[root@k8s-demo001 ~]# cat /data/nginx_down/driver-cp-pv.yaml 
apiVersion: v1
kind: Pod
metadata:
  labels:
    app.kubernetes.io/name: apache-spark
    app.kubernetes.io/instance: apache-spark
    app.kubernetes.io/version: v3.2.3
  namespace: apache-spark
  name: driver
spec:
  serviceAccountName: spark-service-account
  hostAliases:
    - ip: "172.16.252.105"
      hostnames:
        - "k8s-demo001"
    - ip: "172.16.252.134"
      hostnames:
        - "k8s-demo002"
    - ip: "172.16.252.135"
      hostnames:
        - "k8s-demo003"
    - ip: "172.16.252.136"
      hostnames:
        - "k8s-demo004"
  containers:
    - image: apache/spark:v3.2.3
      name: driver
      imagePullPolicy: IfNotPresent
      env:
        - name: TZ
          value: Asia/Shanghai
      resources:
        requests:
          cpu: 1
          memory: 1Gi
        limits:
          cpu: 1
          memory: 1Gi
      volumeMounts:
        - name: spark-jar   # 挂载Jar
          mountPath: /opt/spark/appJars
        - name: spark-historyserver   # 挂载eventLog归档目录
          mountPath: /opt/spark/eventLog
        - name: spark-logs   # 挂载日志
          mountPath: /opt/spark/logs
        - name: spark-checkpoint   # 挂载checkpoint路径
          mountPath: /opt/spark/checkpoint-wc
  volumes:
    - name: spark-jar
      persistentVolumeClaim:
        claimName: spark-jar-pvc
    - name: spark-historyserver
      persistentVolumeClaim:
        claimName: spark-historyserver-pvc
    - name: spark-logs
      persistentVolumeClaim:
        claimName: spark-logs-pvc
    - name: spark-checkpoint
      persistentVolumeClaim:
        claimName: spark-checkpoint-pvc

executor:

[root@k8s-demo001 ~]# cat /data/nginx_down/executor-cp-pv.yaml 
apiVersion: v1
kind: Pod
metadata:
  labels:
    app.kubernetes.io/name: apache-spark
    app.kubernetes.io/instance: apache-spark
    app.kubernetes.io/version: v3.2.3
  namespace: apache-spark
  name: executor
spec:
  serviceAccountName: spark-service-account
  hostAliases:
    - ip: "172.16.252.105"
      hostnames:
        - "k8s-demo001"
    - ip: "172.16.252.134"
      hostnames:
        - "k8s-demo002"
    - ip: "172.16.252.135"
      hostnames:
        - "k8s-demo003"
    - ip: "172.16.252.136"
      hostnames:
        - "k8s-demo004"
  containers:
    - image: apache/spark:v3.2.3
      name: executor
      imagePullPolicy: IfNotPresent
      env:
        - name: TZ
          value: Asia/Shanghai
      resources:
        requests:
          cpu: 1
          memory: 1Gi
        limits:
          cpu: 1
          memory: 1Gi
      volumeMounts:
        - name: spark-jar    # 挂载Jar
          mountPath: /opt/spark/appJars
        - name: spark-logs   # 挂载日志
          mountPath: /opt/spark/logs
        - name: spark-checkpoint   # 挂载checkpoint路径
          mountPath: /opt/spark/checkpoint-wc
  volumes:
    - name: spark-jar
      persistentVolumeClaim:
        claimName: spark-jar-pvc
    - name: spark-logs
      persistentVolumeClaim:
        claimName: spark-logs-pvc
    - name: spark-checkpoint
      persistentVolumeClaim:
        claimName: spark-checkpoint-pvc

3、提交spark作业

./spark-3.2.3/bin/spark-submit \
 --name StreamWordCount \
 --verbose \
 --master k8s://https://172.16.252.105:6443 \
 --deploy-mode cluster \
 --conf spark.network.timeout=300 \
 --conf spark.executor.instances=1 \
 --conf spark.driver.cores=1 \
 --conf spark.executor.cores=1 \
 --conf spark.driver.memory=1024m \
 --conf spark.executor.memory=1024m \
 --conf spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true -Dlog.file=/opt/spark/logs/driver-wc-cp-pv.log" \
 --conf spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true -Dlog.file=/opt/spark/logs/executor-wc-cp-pv.log" \
 --conf spark.eventLog.enabled=true \
 --conf spark.eventLog.dir=file:///opt/spark/eventLog \
 --conf spark.history.fs.logDirectory=file:///opt/spark/eventLog \
 --conf spark.kubernetes.namespace=apache-spark \
 --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark-service-account \
 --conf spark.kubernetes.authenticate.executor.serviceAccountName=spark-service-account \
 --conf spark.kubernetes.container.image.pullPolicy=IfNotPresent \
 --conf spark.kubernetes.container.image=apache/spark:v3.2.3 \
 --conf spark.kubernetes.driver.podTemplateFile=http://172.16.252.105:8080/driver-cp-pv.yaml \
 --conf spark.kubernetes.executor.podTemplateFile=http://172.16.252.105:8080/executor-cp-pv.yaml \
 --class org.fblinux.StreamWordCount \
 local:///opt/spark/appJars/spark-on-k8s-demo-1.0-SNAPSHOT-jar-with-dependencies.jar \
 5 \
172.16.252.129:9092,172.16.252.130:9092,172.16.252.131:9092 \
 spark_test \
 file:///opt/spark/checkpoint-wc \
 172.16.252.113 \
 3306 \
 spark_test \
 wc

4、终止程序运行(只需删除driver pod,executor pod会连带被删除释放),重新提交就可以验证是否从checkpoint中恢复数据

4.spark checkpoint写入hdfs

1、创建Hadoop checkpoint

hdfs dfs -mkdir /tmp/spark/checkpoint-wc

2、编写自定义driver和executor pod模板文件

driver:

[root@k8s-demo001 ~]# cat /data/nginx_down/driver-hadoop.yaml 
apiVersion: v1
kind: Pod
metadata:
  labels:
    app.kubernetes.io/name: apache-spark
    app.kubernetes.io/instance: apache-spark
    app.kubernetes.io/version: v3.2.3
  namespace: apache-spark
  name: driver
spec:
  serviceAccountName: spark-service-account
  hostAliases:
    - ip: "172.16.252.105"
      hostnames:
        - "k8s-demo001"
    - ip: "172.16.252.134"
      hostnames:
        - "k8s-demo002"
    - ip: "172.16.252.135"
      hostnames:
        - "k8s-demo003"
    - ip: "172.16.252.136"
      hostnames:
        - "k8s-demo004"
  containers:
    - image: apache/spark:v3.2.3
      name: driver
      imagePullPolicy: IfNotPresent
      env:
        - name: TZ
          value: Asia/Shanghai
        - name: HADOOP_USER_NAME
          value: root
        - name: SPARK_USER
          value: root
      resources:
        requests:
          cpu: 1
          memory: 1Gi
        limits:
          cpu: 1
          memory: 1Gi
      volumeMounts:
        - name: spark-jar   # 挂载Jar
          mountPath: /opt/spark/appJars
        - name: spark-historyserver   # 挂载eventLog归档目录
          mountPath: /opt/spark/eventLog
        - name: spark-logs   # 挂载日志
          mountPath: /opt/spark/logs
  volumes:
    - name: spark-jar
      persistentVolumeClaim:
        claimName: spark-jar-pvc
    - name: spark-historyserver
      persistentVolumeClaim:
        claimName: spark-historyserver-pvc
    - name: spark-logs
      persistentVolumeClaim:
        claimName: spark-logs-pvc

executor:

[root@k8s-demo001 ~]# cat /data/nginx_down/executor-hadoop.yaml 
apiVersion: v1
kind: Pod
metadata:
  labels:
    app.kubernetes.io/name: apache-spark
    app.kubernetes.io/instance: apache-spark
    app.kubernetes.io/version: v3.2.3
  namespace: apache-spark
  name: executor
spec:
  serviceAccountName: spark-service-account
  hostAliases:
    - ip: "172.16.252.105"
      hostnames:
        - "k8s-demo001"
    - ip: "172.16.252.134"
      hostnames:
        - "k8s-demo002"
    - ip: "172.16.252.135"
      hostnames:
        - "k8s-demo003"
    - ip: "172.16.252.136"
      hostnames:
        - "k8s-demo004"
  containers:
    - image: apache/spark:v3.2.3
      name: executor
      imagePullPolicy: IfNotPresent
      env:
        - name: TZ
          value: Asia/Shanghai
        - name: HADOOP_USER_NAME
          value: root
        - name: SPARK_USER
          value: root
      resources:
        requests:
          cpu: 1
          memory: 1Gi
        limits:
          cpu: 1
          memory: 1Gi
      volumeMounts:
        - name: spark-jar    # 挂载Jar
          mountPath: /opt/spark/appJars
        - name: spark-logs   # 挂载日志
          mountPath: /opt/spark/logs
  volumes:
    - name: spark-jar
      persistentVolumeClaim:
        claimName: spark-jar-pvc
    - name: spark-logs
      persistentVolumeClaim:
        claimName: spark-logs-pvc

3、提交作业,路径不加file,默认读写hdfs文件系统

./spark-3.2.3-hadoop/bin/spark-submit \
 --name StreamWordCount \
 --verbose \
  --master k8s://https://172.16.252.105:6443 \
 --deploy-mode cluster \
 --conf spark.network.timeout=300 \
 --conf spark.executor.instances=1 \
 --conf spark.driver.cores=1 \
 --conf spark.executor.cores=1 \
 --conf spark.driver.memory=1024m \
 --conf spark.executor.memory=1024m \
 --conf spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true -Dlog.file=/opt/spark/logs/driver-wc-cp-hadoop.log" \
 --conf spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true -Dlog.file=/opt/spark/logs/executor-wc-cp-hadoop.log" \
 --conf spark.eventLog.enabled=true \
 --conf spark.eventLog.dir=file:///opt/spark/eventLog \
 --conf spark.history.fs.logDirectory=file:///opt/spark/eventLog \
 --conf spark.kubernetes.namespace=apache-spark \
 --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark-service-account \
 --conf spark.kubernetes.authenticate.executor.serviceAccountName=spark-service-account \
 --conf spark.kubernetes.container.image.pullPolicy=IfNotPresent \
 --conf spark.kubernetes.container.image=apache/spark:v3.2.3 \
 --conf spark.kubernetes.driver.podTemplateFile=http://172.16.252.105:8080/driver-hadoop.yaml \
 --conf spark.kubernetes.executor.podTemplateFile=http://172.16.252.105:8080/executor-hadoop.yaml \
 --class org.fblinux.StreamWordCount \
 local:///opt/spark/appJars/spark-on-k8s-demo-1.0-SNAPSHOT-jar-with-dependencies.jar \
 5 \
172.16.252.129:9092,172.16.252.130:9092,172.16.252.131:9092 \
 spark_test \
 /tmp/spark/checkpoint-wc \
 172.16.252.113 \
 3306 \
 spark_test \
 wc

转载请注明:西门飞冰的博客 » (4)Spark on K8S checkpoint实践

喜欢 (0)or分享 (0)