由于blog各种垃圾评论太多,而且本人审核评论周期较长,所以懒得管理评论了,就把评论功能关闭,有问题可以直接qq骚扰我

(6)Spark Sql on K8S 实现

大数据 西门飞冰 2077℃
[隐藏]

1.简介

在实际工作中,绝大多数的离线数仓,批处理作业,都是使用SQL脚本开发的,通过SQL语句实现处理的逻辑。得益于spark的优良性能,尤其是在spark 3.0之后,spark sql的性能有了大幅度的提升。目前在绝大多数的公司的数仓团队中,都是通过spark sql来开发批处理作业的。所以在大数据存算分离的大背景下,spark sql 容器化也是必要的。

2.Spark Sql on k8s架构图

如下图所示,spark sql是以client模式运行的,也就是driver运行在k8s之外,Executor运行在k8s之内。spark sql要对数据库和数据表进行操作,就需要通过hive metastore获取hive的元数据信息。

在存算分离架构中,spark的计算是在k8s里面完成的,而数据则存储在外部的HDFS 或者 对象存储中。

image-20230601151618104

3.环境准备

1、提前安装好Hadoop(3.1.3)、hive(3.1.2)、spark(3.2.3)环境,hive启动Metastore服务。

2、配置好spark on hive的运行环境。

4.特别说明

1、使用spark-submit提交任务到k8s是在k8s服务器上执行的操作,需要拷贝一份hive的整个二进制目录和相关依赖到spark- submit提交任务的节点,负责使用spark- submit提交任务到k8s集群会报相关hive的依赖找不到。

2、因为k8s和hadoop 是两个独立的集群,服务器操作k8s集群用的是root账号,操作Hadoop 集群用的是Hadoop账号。而使用spark提交任务到k8s是在k8s服务器上执行操作,所以使用的是root用户,需要Hadoop给相关目录配置root的可操作权限。

hdfs dfs -chown -R hadoop:root /user/
hdfs dfs -chmod -R 775 /user

5.history 配置

由于Spark SQL以client模式运行,Driver运行在K8s外面,Executor运行在K8s里面,倘若我们要使用Spark History Server, 它但要求driver和executor的eventLog需要指向同一个存储路径,此场景下使用PV不方便,因为PV 的实际路径与Pod容器里挂载路径通常是不一样的,为此,在Spark SQL On K8s 与Hadoop集成的场景中,推荐将eventLog保存到HDFS。

1、在HDFS上创建目录保存eventLog

hadoop fs -mkdir /spark
hadoop fs -mkdir /spark/eventLog

2、创建ConfigMap,保存Hadoop配置文件信息

[root@k8s-demo001 ~]# cat spark-conf-hadoop.yaml 
kind: ConfigMap
apiVersion: v1
metadata:
  name: spark-conf-hadoop
  namespace: apache-spark
  annotations:
    kubesphere.io/creator: admin
data:
  core-site.xml: |
        <configuration>
            <!-- 指定NameNode的地址 -->
            <property>
                <name>fs.defaultFS</name>
                <value>hdfs://hadoop-test01:8020</value>
            </property>
        
            <!-- 指定hadoop数据的存储目录 -->
            <property>
                <name>hadoop.tmp.dir</name>
                <value>/data/hadoop/data</value>
            </property>
        
            <!-- 配置HDFS网页登录使用的静态用户为hadoop -->
            <property>
                <name>hadoop.http.staticuser.user</name>
                <value>hadoop</value>
            </property>
        
            <property>
                <name>hadoop.proxyuser.hadoop.hosts</name>
                <value>*</value>
            </property>
        
            <property>
                <name>hadoop.proxyuser.hadoop.groups</name>
                <value>*</value>
            </property>
        
            <property>
                <name>hadoop.proxyuser.hadoop.users</name>
                <value>*</value>
            </property>
        </configuration>
  hdfs-site.xml: |
        <configuration>
            <!-- nn web端访问地址-->
            <property>
                <name>dfs.namenode.http-address</name>
                <value>hadoop-test01:9870</value>
            </property>
            <!-- 2nn web端访问地址-->
            <property>
                <name>dfs.namenode.secondary.http-address</name>
                <value>hadoop-test02:9868</value>
            </property>
        </configuration>

3、部署historyserver,采用Deployment + Service + Ingress方式

[root@k8s-demo001 ~]# cat spark-historyserver-hadoop.yaml 
apiVersion: apps/v1
kind: Deployment
metadata:
  namespace: apache-spark
  labels:
    app: spark-historyserver
    name: spark-historyserver
  name: spark-historyserver
spec:
  replicas: 1
  selector:
    matchLabels:
      name: spark-historyserver
  template:
    metadata:
      namespace: apache-spark
      labels:
        app: spark-historyserver
        name: spark-historyserver
    spec:
      hostAliases:  # hosts配置
        - ip: "172.16.252.105"
          hostnames:
            - "k8s-demo001"
        - ip: "172.16.252.134"
          hostnames:
            - "k8s-demo002"
        - ip: "172.16.252.135"
          hostnames:
            - "k8s-demo003"
        - ip: "172.16.252.136"
          hostnames:
            - "k8s-demo004"
      containers:
        - name: spark-historyserver
          image: apache/spark:v3.2.3
          imagePullPolicy: IfNotPresent
          args: ["/opt/spark/bin/spark-class", "org.apache.spark.deploy.history.HistoryServer"]
          env:
            - name: TZ
              value: Asia/Shanghai
            - name: HADOOP_USER_NAME
              value: hadoop 
            - name: SPARK_USER
              value: hadoop
            # 如果不使用configmap,则通过SPARK_HISTORY_OPTS配置
            - name: SPARK_HISTORY_OPTS
              value: "-Dspark.eventLog.enabled=true -Dspark.eventLog.dir=hdfs://hadoop-test01:8020/spark/eventLog -Dspark.history.fs.logDirectory=hdfs://hadoop-test01:8020/spark/eventLog"
          ports:
            - containerPort: 18080
          volumeMounts:
            - name: spark-conf
              mountPath: /opt/spark/conf/core-site.xml
              subPath: core-site.xml
            - name: spark-conf
              mountPath: /opt/spark/conf/hdfs-site.xml
              subPath: hdfs-site.xml
      volumes:  # 挂载卷配置
        - name: spark-conf
          configMap:
            name: spark-conf-hadoop
#---
#kind: Service
#apiVersion: v1
#metadata:
#  namespace: apache-spark
#  name: spark-historyserver
#spec:
#  type: NodePort
#  ports:
#    - port: 18080
#      nodePort: 31082
#  selector:
#    name: spark-historyserver

# ingress按实际情况配置
---
apiVersion: v1
kind: Service
metadata:
  labels:
    app: spark-historyserver
    name: spark-historyserver
  name: spark-historyserver
  namespace: apache-spark
spec:
  selector:
    app: spark-historyserver
  ports:
    - port: 18080
      protocol: TCP
      targetPort: 18080
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  namespace: apache-spark
  name: spark-historyserver
  annotations:
    nginx.ingress.kubernetes.io/default-backend: ingress-nginx-controller
    nginx.ingress.kubernetes.io/use-regex: 'true'
spec:
  ingressClassName: nginx
  rules:
    - host: "spark.k8s.io"
      http:
        paths:
          - pathType: Prefix
            path: "/"
            backend:
              service:
                name: spark-historyserver
                port:
                  number: 18080

6.spark sql on k8s 测试

1、创建pod template

[root@k8s-demo001 ~]# cat /data/nginx_down/executor-sql-hadoop.yaml 
apiVersion: v1
kind: Pod
metadata:
  labels:
    app.kubernetes.io/name: apache-spark
    app.kubernetes.io/instance: apache-spark
    app.kubernetes.io/version: v3.2.3
  namespace: apache-spark
  name: executor
spec:
  serviceAccountName: spark-service-account
  hostAliases:
    - ip: "172.16.252.105"
      hostnames:
        - "k8s-demo001"
    - ip: "172.16.252.134"
      hostnames:
       - "k8s-demo002"
    - ip: "172.16.252.135"
      hostnames:
        - "k8s-demo003"
    - ip: "172.16.252.136"
      hostnames:
        - "k8s-demo004"
  containers:
    - image: apache/spark:v3.2.3
      name: executor
      imagePullPolicy: IfNotPresent
      env:
        - name: TZ
          value: Asia/Shanghai
        - name: HADOOP_USER_NAME
          value: hadoop 
        - name: SPARK_USER
          value: hadoop
      resources:
        requests:
          cpu: 1
          memory: 1Gi
        limits:
          cpu: 1
          memory: 1Gi
      volumeMounts:
        - name: spark-logs   # 挂载日志
          mountPath: /opt/spark/logs
  volumes:
    - name: spark-logs
      persistentVolumeClaim:
        claimName: spark-logs-pvc

2、在driver运行节点(spark-submit提交节点)创建日志保存路径。

/opt/spark/logs

3、使用-e执行SQL语句

./spark-3.2.3-hadoop/bin/spark-submit \
 --name spark-sql-test \
 --verbose \
 --master k8s://https://172.16.252.105:6443 \
 --deploy-mode client \
 --conf spark.network.timeout=300 \
 --conf spark.executor.instances=3 \
 --conf spark.driver.cores=1 \
 --conf spark.executor.cores=1 \
 --conf spark.driver.memory=1024m \
 --conf spark.executor.memory=1024m \
 --conf spark.driver.host=k8s-demo001 \
 --conf spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true -Dlog.file=/opt/spark/logs/driver-sql-hadoop-1.log" \
 --conf spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true -Dlog.file=/opt/spark/logs/executor-sql-hadoop-1.log" \
 --conf spark.eventLog.enabled=true \
 --conf spark.eventLog.dir=hdfs://hadoop-test01:8020/spark/eventLog \
 --conf spark.history.fs.logDirectory=hdfs://hadoop-test01:8020/spark/eventLog \
 --conf spark.kubernetes.namespace=apache-spark \
 --conf spark.kubernetes.container.image.pullPolicy=IfNotPresent \
 --conf spark.kubernetes.container.image=apache/spark:v3.2.3 \
 --conf spark.kubernetes.executor.podTemplateFile=http://172.16.252.105:8080/executor-sql-hadoop.yaml \
 --class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver \
 -e \
 "
 DROP TABLE IF EXISTS spark_dept;
 CREATE TABLE spark_dept(deptno int, dname string, loc string);
 INSERT INTO spark_dept VALUES (10, 'ACCOUNTING', 'NEW YORK');
 select * from spark_dept;
 select count(*) from spark_dept where deptno=10
 "

说明:执行过程中只有executor节点运行在集群,任务运行结束executor pod就被删除了,要查询任务执行记录可以通过history server或者driver log来查看。

image-20230601174743276

image-20230601182948201

4、使用-f执行SQL脚本

执行完成查看日志:

image-20230601183801617

spark-test-sql.sql脚本内容:

USE default;

DROP TABLE IF EXISTS spark_dept;
CREATE TABLE spark_dept
(
    deptno int,
    dname  string,
    loc    string
);

INSERT INTO spark_dept
VALUES (10, 'ACCOUNTING', 'NEW YORK'),
       (20, 'RESEARCH', 'DALLAS'),
       (30, 'SALES', 'CHICAGO'),
       (40, 'OPERATIONS', 'BOSTON'),
       (50, 'DEVELOP', 'Washington');

SELECT * FROM spark_dept;

DROP TABLE IF EXISTS spark_emp;
CREATE TABLE spark_emp
(
    EMPNO    int,
    ENAME    string,
    JOB      string,
    MGR      double,
    HIREDATE string,
    SAL      double,
    COMM     double,
    DEPTNO   int
);

INSERT INTO spark_emp
VALUES (7369, 'SMITH', 'CLERK', 7902, '1980-12-17', 800, NULL, 20),
       (7499, 'ALLEN', 'SALESMAN', 7698, '1981-2-20', 1600, 300, 30),
       (7521, 'WARD', 'SALESMAN', 7698, '1981-2-22', 1250, 500, 30),
       (7566, 'JONES', 'MANAGER', 7839, '1981-4-2', 2975, NULL, 20),
       (7654, 'MARTIN', 'SALESMAN', 7698, '1981-9-28', 1250, 1400, 30),
       (7698, 'BLAKE', 'MANAGER', 7839, '1981-5-1', 2850, NULL, 30),
       (7782, 'CLARK', 'MANAGER', 7839, '1981-6-9', 2450, NULL, 10),
       (7788, 'SCOTT', 'ANALYST', 7566, '1987-4-19', 3000, NULL, 20),
       (7839, 'KING', 'PRESIDENT', NULL, '1981-11-17', 5000, NULL, 10),
       (7844, 'TURNER', 'SALESMAN', 7698, '1981-9-8', 1500, 0, 30),
       (7876, 'ADAMS', 'CLERK', 7788, '1987-5-23', 1100, NULL, 20),
       (7900, 'JAMES', 'CLERK', 7698, '1981-12-3', 950, NULL, 30),
       (7902, 'FORD', 'ANALYST', 7566, '1981-12-3', 3000, NULL, 20),
       (7934, 'MILLER', 'CLERK', 7782, '1982-1-23', 1300, NULL, 10);

SELECT * FROM spark_emp;

SELECT a.*, b.*
FROM spark_dept a,
     spark_emp b
WHERE a.deptno = b.deptno
  AND b.sal > 2100;

SELECT e.deptno, d.dname, SUM(sal) sum_sal, AVG(sal) avg_sal, MAX(sal) avg_sal, MIN(sal) avg_sal
FROM spark_emp e,
     spark_dept d
WHERE e.deptno = d.deptno
GROUP BY e.deptno, d.dname;

SELECT deptno, SUM(sal)
FROM spark_emp
WHERE deptno > 10
GROUP BY deptno;

7.Spark Sql Hive 版本适配

在之前的测试中,我们使用hive 3.1.3 版本的metastore,但spark v3.2.3自带的hive客户端jar包版本是2.3.9,虽然在此前的测试中Spark SQL均能正常运行,但为了确保两者尽可能兼容,在实际的应用中建议统一hive的版本。

1、首先需要构建含有hive 3.1.3版本jar包的spark镜像

(1)创建构建镜像的目录,并将hive相关的jar拷贝到hive-lib目录下

mkdir /root/my_dockerfile/spark-hive -p
cd /root/my_dockerfile/spark-hive
mkdir hive-lib
cp /opt/module/hive/lib/*.jar hive-lib/

(2)编写Dockerfile

[root@k8s-demo001 spark-hive]# cat Dockerfile 
FROM apache/spark:v3.2.3
COPY hive-lib /opt/hive-lib
WORKDIR /opt/spark/work-dir
ENTRYPOINT ["/opt/entrypoint.sh"]
USER 185

(3)构建镜像,并上传到镜像仓库

docker build -f Dockerfile -t spark-hive3:v3.2.3 .
docker tag spark-hive3:v3.2.3 172.16.252.110:8011/sparkonk8s/spark-hive3:v3.2.3
docker push 172.16.252.110:8011/sparkonk8s/spark-hive3:v3.2.3

2、在spark-submit(driver)运行节点放置hive 客户端jar包,路径要与spark镜像里的路径一致

mkdir /opt/hive-lib
cp /opt/module/hive-3.1.3/lib/*.jar /opt/hive-lib/

3、编写executor pod template

[root@k8s-demo001 ~]# cat /data/nginx_down/executor-sql-hadoop-hive3.yaml 
apiVersion: v1
kind: Pod
metadata:
  labels:
    app.kubernetes.io/name: apache-spark
    app.kubernetes.io/instance: apache-spark
    app.kubernetes.io/version: v3.2.3
  namespace: apache-spark
  name: executor
spec:
  serviceAccountName: spark-service-account
  hostAliases:
    - ip: "172.16.252.105"
      hostnames:
        - "k8s-demo001"
    - ip: "172.16.252.134"
      hostnames:
       - "k8s-demo002"
    - ip: "172.16.252.135"
      hostnames:
        - "k8s-demo003"
    - ip: "172.16.252.136"
      hostnames:
        - "k8s-demo004"
  containers:
    - image: 172.16.252.110:8011/sparkonk8s/spark-hive3:v3.2.3
      name: executor
      imagePullPolicy: IfNotPresent
      env:
        - name: TZ
          value: Asia/Shanghai
        - name: HADOOP_USER_NAME
          value: hadoop 
        - name: SPARK_USER
          value: hadoop
        - name: SPARK_LOCAL_DIRS # 设置shuffle落数的路径
          value: "/opt/spark/shuffle-data"
      resources:
        requests:
          cpu: 1
          memory: 1Gi
        limits:
          cpu: 1
          memory: 1Gi
      volumeMounts:
        - name: spark-logs   # 挂载日志
          mountPath: /opt/spark/logs
        - name: spark-shuffle   # 挂载shuffle落数路径
          mountPath: /opt/spark/shuffle-data
  volumes:
    - name: spark-logs
      persistentVolumeClaim:
        claimName: spark-logs-pvc
    - name: spark-shuffle
      persistentVolumeClaim:
        claimName: spark-shuffle-pvc

4、提交spark sql测试

./spark-3.2.3-hadoop/bin/spark-submit \
 --name spark-sql-hive3-1 \
 --verbose \
 --master k8s://https://172.16.252.105:6443 \
 --deploy-mode client \
 --conf spark.network.timeout=300 \
 --conf spark.executor.instances=3 \
 --conf spark.driver.cores=1 \
 --conf spark.executor.cores=1 \
 --conf spark.driver.memory=1024m \
 --conf spark.executor.memory=1024m \
 --conf spark.driver.host=k8s-demo001 \
 --conf spark.sql.hive.metastore.version=3.1.3 \
 --conf spark.sql.hive.metastore.jars=/opt/hive-lib/* \
 --conf spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true -Dlog.file=/opt/spark/logs/driver-sql-hadoop-hive3-1.log" \
 --conf spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true -Dlog.file=/opt/spark/logs/executor-sql-hadoop-hive3-1.log" \
 --conf spark.eventLog.enabled=true \
 --conf spark.eventLog.dir=hdfs://hadoop-test01:8020/spark/eventLog \
 --conf spark.history.fs.logDirectory=hdfs://hadoop-test01:8020/spark/eventLog \
 --conf spark.kubernetes.namespace=apache-spark \
 --conf spark.kubernetes.container.image.pullPolicy=IfNotPresent \
 --conf spark.kubernetes.container.image=172.16.252.110:8011/sparkonk8s/spark-hive3:v3.2.3 \
 --conf spark.kubernetes.executor.podTemplateFile=http://172.16.252.105:8080/executor-sql-hadoop-hive3.yaml \
 --class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver \
 -e \
 "
 DROP TABLE IF EXISTS spark_dept;
 CREATE TABLE spark_dept(deptno int, dname string, loc string);
 INSERT INTO spark_dept VALUES (10, 'ACCOUNTING', 'NEW YORK');
 select * from spark_dept;
 select count(*) from spark_dept where deptno=10
 "

转载请注明:西门飞冰的博客 » (6)Spark Sql on K8S 实现

喜欢 (0)or分享 (0)