由于blog各种垃圾评论太多,而且本人审核评论周期较长,所以懒得管理评论了,就把评论功能关闭,有问题可以直接qq骚扰我

(7)Flink Native K8S

大数据 西门飞冰 1552℃
[隐藏]

1.简介

flink native k8s就是使用flink 自有的命令来提交作业到k8s集群的,需要提前下载好flink安装包

实际生产中使用flink on k8s推荐使用flink kubernetes operator的方式,flink native k8s还不是很成熟,坑略多。

2.环境初始化

实际工作中,为了规范flink 作业在k8s上的运行部署,通常会讲flink 作业调度到指定的namespace中运行,并使用特定的ServiceAccount运行。

注:在flink kubernetes operator下,role、ClusterRole、RoleBinding等资源会自动创建,但是在native模式下需要我们手动进行创建这些资源。

1、创建namespace

kubectl create namespace apache-flink

2、创建service-account

kubectl create serviceaccount flink-service-account -n apache-flink

3、创建Role和RoleBinding

[root@k8s-demo001 ~]# cat role.yaml 
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  labels:
    app.kubernetes.io/name: apache-flink
    app.kubernetes.io/instance: apache-flink
  namespace: apache-flink
  name: flink-role
rules:
  - apiGroups: [""]
    resources: ["pods"]
    verbs: ["get", "watch", "list", "create", "delete"]
  - apiGroups: ["extensions", "apps"]
    resources: ["deployments"]
    verbs: ["get", "watch", "list", "create", "delete"]
  - apiGroups: [""]
    resources: ["configmaps"]
    verbs: ["get", "create", "update", "delete"]
  - apiGroups: [""]
    resources: ["secrets"]
    verbs: ["get"]
  - apiGroups: [""]
    resources: ["services"]
    verbs: ["get", "list", "create", "delete"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  labels:
    app.kubernetes.io/name: apache-flink
    app.kubernetes.io/instance: apache-flink
  name: flink-role-binding
  namespace: apache-flink
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: Role
  name: flink-role
subjects:
  - kind: ServiceAccount
    name: flink-service-account
    namespace: apache-flink
[root@k8s-demo001 ~]# kubectl apply -f role.yaml

4、创建ClusterRole和ClusterRoleBinding

[root@k8s-demo001 ~]# cat cluster-role.yaml 
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  labels:
    app.kubernetes.io/name: apache-flink
    app.kubernetes.io/instance: apache-flink
  name: apache-flink-clusterrole
rules:
  - apiGroups:
      - ''
    resources:
      - configmaps
      - endpoints
      - nodes
      - pods
      - secrets
      - namespaces
    verbs:
      - list
      - watch
      - get
  - apiGroups:
      - ''
    resources:
      - services
    verbs:
      - get
      - list
      - watch
  - apiGroups:
      - ''
    resources:
      - events
    verbs:
      - create
      - patch
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  labels:
    app.kubernetes.io/name: apache-flink
    app.kubernetes.io/instance: apache-flink
  name: apache-flink-clusterrole-binding
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: apache-flink-clusterrole
subjects:
  - kind: ServiceAccount
    name: flink-service-account
    namespace: apache-flink
[root@k8s-demo001 ~]# kubectl apply -f cluster-role.yaml

3.测试代码

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.log4j.Logger;

import java.util.Arrays;
public class StreamWordCount {
    private static Logger logger = Logger.getLogger(StreamWordCount.class);
    public static void main(String[] args) throws Exception {
        // 1. 创建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2. 读取文本流  在k8s01行运行 nc -lk 7777
        DataStreamSource<String> lineDSS = env.socketTextStream("172.16.252.105", 7777);
        // 3. 转换数据格式
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lineDSS
                .flatMap((String line, Collector<String> words) -> {
                    Arrays.stream(line.split(" ")).forEach(words::collect);
                })
                .returns(Types.STRING)
                .map(word -> Tuple2.of(word, 1L))
                .returns(Types.TUPLE(Types.STRING, Types.LONG));
        // 4. 分组
        KeyedStream<Tuple2<String, Long>, String> wordAndOneKS = wordAndOne
                .keyBy(t -> t.f0);
        // 5. 求和
        SingleOutputStreamOperator<Tuple2<String, Long>> result = wordAndOneKS
                .sum(1);
        // 6. 打印
        result.print();
        logger.info(result.toString());
        // 7. 执行
        env.execute();
    }
}

4.Session 模式测试

1、安装flink

wget https://archive.apache.org/dist/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.12.tgz
tar xf flink-1.13.6-bin-scala_2.12.tgz 

2、启动flink session集群

#注意,在1.13.x-1.16.x要使用kubernetes.container.image指定镜像

#1.17使用kubernetes.container.image.ref

./flink-1.13.6/bin/kubernetes-session.sh \
 -Dkubernetes.cluster-id=session-deployment-only \
 -Dkubernetes.namespace=apache-flink \
 -Dkubernetes.service-account=flink-service-account \
 -Dkubernetes.container.image="flink:1.13.6" \
 -Dkubernetes.container.image.pull-policy="IfNotPresent" \
 -Dkubernetes.rest-service.exposed.type="NodePort"

验证集群启动情况

image-20230606173556098

访问web ui测试

http://172.16.252.105:31696

3、在172.16.252.105 服务器运行nc监听在7777端口,不然一会提交作业会报错

nc -lk 7777

4、提交作业到集群,注意这里使用的是读取本地jar包的方式,并且只能使用local方式获取flink作业的jar包,提交命令是flink不是kubernetes-session

./flink-1.13.6/bin/flink run \
 --target kubernetes-session \
 -Dkubernetes.cluster-id=session-deployment-only \
 -Dkubernetes.namespace=apache-flink \
 -Dkubernetes.service-account=flink-service-account \
 -c org.fblinux.StreamWordCount \
/root/flink-on-k8s-demo-1.0-SNAPSHOT.jar

5、往nc发送字符内容,观察taskmanager日志输出

kubectl logs pod/session-deployment-only-taskmanager-1-1 -n apache-flink

image-20230606175002199

6、查看作业列表

./flink-1.13.6/bin/flink list \
 --target kubernetes-session \
 -Dkubernetes.cluster-id=session-deployment-only \
 -Dkubernetes.namespace=apache-flink \
 -Dkubernetes.service-account=flink-service-account

7、终止集群

只是删除作业,不会删除jobmanager

./flink-1.13.6/bin/flink cancel \
 --target kubernetes-session \
 -Dkubernetes.cluster-id=session-deployment-only \
 -Dkubernetes.namespace=apache-flink \
 -Dkubernetes.service-account=flink-service-account \
 126e0ec61ee53c2187e1dd89ae12b34c

彻底删除集群

kubectl delete deployment session-deployment-only -n apache-flink

5.Application 模式测试

1、提交作业

注:在native k8s下,Application模式,只能使用local方式获取Flink作业Jar包

./flink-1.13.6/bin/flink run-application \
 --target kubernetes-application \
 -Dkubernetes.cluster-id=application-deployment \
 -Dkubernetes.namespace=apache-flink \
 -Dkubernetes.service-account=flink-service-account \
 -Dkubernetes.container.image=172.16.252.110:8011/flinkonk8s/flink-wc:1.13.6 \
 -Dkubernetes.container.image.pull-policy="IfNotPresent" \
 -Dkubernetes.rest-service.exposed.type="NodePort" \
 -c org.fblinux.StreamWordCount \
 local:///opt/flink/flink-on-k8s-demo-1.0-SNAPSHOT.jar

2、往nc发送字符内容,观察taskmanager日志输出

3、查看作业列表

./flink-1.13.6/bin/flink list \
 --target kubernetes-application \
 -Dkubernetes.cluster-id=application-deployment \
 -Dkubernetes.namespace=apache-flink \
 -Dkubernetes.service-account=flink-service-account

4、终止集群

通过flink 命令终止:

./flink-1.13.6/bin/flink cancel \
 --target kubernetes-application \
 -Dkubernetes.cluster-id=application-deployment \
 -Dkubernetes.namespace=apache-flink \
 -Dkubernetes.service-account=flink-service-account \
 8cb17cc033b440b98c4049e2113bc3df

通过k8s命令终止:

kubectl delete deployment application-deployment  -n apache-flink

6.Pod Template提交任务

在前面的示例中都是通过命令行传参的方式来配置flink相关的参数,这些参数既有控制flink运行的又有k8s相关的。但是和k8s原生的配置flink 命令行并不能完全支持,例如挂载卷等。针对这种情况,flink 官方支持使用pod template 定义jobmanager和taskmaster的pod信息,在pod template里面可以使用k8s原生支持的语法,这样不仅可以极大的简化flink命令行的参数数量,而且还可以在pod template里面添加flink所不能支持的配置项。

如果同一个配置在命令行和pod template里面都有定义,那么命令行的优先级是最高的。

这里使用之前的flink checkpoint程序来测试pod template提交任务。

1、创建pvc

创建flink jar的pvc

[root@k8s-demo001 ~]# cat flink-jar-pvc.yaml 
# Flink jar 持久化存储pvc
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: flink-jar-pvc  # jar pvc名称
  namespace: apache-flink   # 指定归属的名命空间
spec:
  storageClassName: nfs-storage   #sc名称,更改为实际的sc名称
  accessModes:
    - ReadWriteMany   #采用ReadWriteMany的访问模式
  resources:
    requests:
      storage: 1Gi    #存储容量,根据实际需要更改

创建存储checkpoint的pvc

[root@k8s-demo001 ~]# cat flink-checkpoint-pvc.yaml 
# Flink checkpoint 持久化存储pvc
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: flink-checkpoint-pvc  # checkpoint pvc名称
  namespace: apache-flink   # 指定归属的名命空间
spec:
  storageClassName: nfs-storage   #sc名称,更改为实际的sc名称
  accessModes:
    - ReadWriteMany   #采用ReadWriteMany的访问模式
  resources:
    requests:
      storage: 1Gi    #存储容量,根据实际需要更改

创建存储log的pvc

[root@k8s-demo001 ~]# cat flink-log-pvc.yaml 
# Flink log 持久化存储pvc
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: flink-log-pvc  # log pvc名称
  namespace: apache-flink   # 指定归属的名命空间
spec:
  storageClassName: nfs-storage   #sc名称,更改为实际的sc名称
  accessModes:
    - ReadWriteMany   #采用ReadWriteMany的访问模式
  resources:
    requests:
      storage: 1Gi    #存储容量,根据实际需要更改

创建ha的pvc

[root@k8s-demo001 ~]# cat flink-ha-pvc.yaml 
# Flink ha 持久化存储pvc
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: flink-ha-pvc  # ha pvc名称
  namespace: apache-flink   # 指定归属的名命空间
spec:
  storageClassName: nfs-storage   #sc名称,更改为实际的sc名称
  accessModes:
    - ReadWriteMany   #采用ReadWriteMany的访问模式
  resources:
    requests:
      storage: 1Gi    #存储容量,根据实际需要更改

2、编写pod template

[root@ansible ~]# cat flink-pod-template.yaml 
apiVersion: v1
kind: Pod
metadata:
  name: flink-pod-template
  namespace: apache-flink
spec:
  hostAliases:
    - ip: "172.16.252.129"
      hostnames:
        - "es-01"
    - ip: "172.16.252.130"
      hostnames:
        - "es-02"
    - ip: "172.16.252.131"
      hostnames:
        - "es-03"
  containers:
    # Do not change the main container name
    - name: flink-main-container
      image: flink:1.13.6
      imagePullPolicy: IfNotPresent
      env:
      - name: TZ
        value: Asia/Shanghai
      resources:
        requests:
          cpu: 1
          memory: 1024mb
        limits:
          cpu: 1
          memory: 1024mb
      volumeMounts:
        - name: flink-jar  # 挂载nfs上的jar
          mountPath: /opt/flink/jar
        - name: flink-checkpoints  # 挂载checkpoint pvc
          mountPath: /opt/flink/checkpoints
        - name: flink-log  # 挂载日志 pvc
          mountPath: /opt/flink/log
        - name: flink-ha    # HA pvc配置
          mountPath: /opt/flink/flink_recovery
  volumes:
    - name: flink-jar
      persistentVolumeClaim:
        claimName: flink-jar-pvc
    - name: flink-checkpoints
      persistentVolumeClaim:
        claimName: flink-checkpoint-pvc
    - name: flink-log
      persistentVolumeClaim:
        claimName: flink-log-pvc
    - name: flink-ha
      persistentVolumeClaim:
        claimName: flink-ha-pvc

3、提交flink 作业

./flink-1.13.6/bin/flink run-application \
 --target kubernetes-application \
 -Dkubernetes.cluster-id=application-pod-template-1 \
 -Dtaskmanager.numberOfTaskSlots=2 \
 -Dstate.checkpoints.dir=file:///opt/flink/checkpoints \
 -Dhigh-availability=ZOOKEEPER \
 -Dhigh-availability.zookeeper.quorum=172.16.252.129:2181,172.16.252.130:2181,172.16.252.131:2181 \
 -Dhigh-availability.zookeeper.path.root=/flink \
 -Dhigh-availability.storageDir=file:///opt/flink/flink_recovery \
 -Dkubernetes.namespace=apache-flink \
 -Dkubernetes.service-account=flink-service-account \
 -Dkubernetes.rest-service.exposed.type="NodePort" \
 -Dkubernetes.pod-template-file.jobmanager=/root/flink-native-yaml/flink-pod-template.yaml  \
 -Dkubernetes.pod-template-file.taskmanager=/root/flink-native-yaml/flink-pod-template.yaml  \
 -Dkubernetes.jobmanager.replicas=2 \
 --class org.fblinux.StreamWordCountWithCP \
 local:///opt/flink/jar/flink-on-k8s-demo-1.0-SNAPSHOT-jar-with-dependencies.jar \
 "172.16.252.129:9092,172.16.252.130:9092,172.16.252.131:9092" "flink_test" "172.16.252.113" "3306" "flink_test" "wc" "file:///opt/flink/checkpoints" "10000" "1"

4、观察jobmanager和taskmanager pod

watch -n 1 kubectl get all -owide -n apache-flink

查看作业列表

./flink-1.13.6/bin/flink list \
 --target kubernetes-application \
 -Dkubernetes.cluster-id=application-pod-template-1 \
 -Dkubernetes.namespace=apache-flink \
 -Dkubernetes.service-account=flink-service-account

5、往kafka发送字符串数据,验证mysql 表能否正常收到写入内容

6、终止集群

(1)高可用场景下,使用flink cancel命令终止作业,zk上的作业的高可用信息会被清除,也就是checkpoint信息也会被清除,下次启动作业不会从checkpoint恢复

./flink-1.13.6/bin/flink cancel \
 --target kubernetes-application \
 -Dkubernetes.cluster-id=application-pod-template-1 \
 -Dkubernetes.namespace=apache-flink \
 -Dkubernetes.service-account=flink-service-account \
 00000000000000000000000000000000

(2)高可用场景下,用kubectl delete终止作业,作业的高可用信息仍然保存在zk上,下次启动会从上一次的checkpoint恢复

kubectl delete deployment application-pod-template-1 -n apache-flink

 

转载请注明:西门飞冰的博客 » (7)Flink Native K8S

喜欢 (1)or分享 (0)