由于blog各种垃圾评论太多,而且本人审核评论周期较长,所以懒得管理评论了,就把评论功能关闭,有问题可以直接qq骚扰我

Flink 状态持久化和检查点

大数据 西门飞冰 1150℃
[隐藏]

1.状态持久化

在Flink的状态管理机制中,Flink 容错性的保障就是要对状态数据做一个持久化的保存,这样就可以在发生故障后通过持久化数据进行重启恢复。在Flink 中对状态进行持久化的方式,就是将当前所有分布式状态进行“快照”保存,写入一个“检查点”(checkpoint)或者保存点(savepoint)保存到外部存储系统中。用的最多的外部系统就是HDFS。

2.检查点

检查点(checkpoint),其实就是所有任务的状态在某个时间点的一个快照(一份拷贝)。简单来讲,就是一次“存盘”,让我们之前处理数据的进度不要丢掉,这样重启之后就可以继续处理新数据、而不需要重新计算了。

遇到故障重启的时候,我们可以从检查点中“读档”,恢复出之前的状态,这样就可以回到当时保存的一刻接着处理数据了。

检查点是Flink容错机制的核心。这里所谓的“检查”,其实是针对故障恢复的结果而言的:故障恢复之后继续处理的结果,应该与发生故障前完全一致,我们需要“检查”结果的正确性。所以,有时又会把checkpoint叫做“一致性检查点”。

默认情况下,检查点是被禁用的,需要在配置/代码中手动开启。

3.状态后端

检查点的保存离不开JobManager和TaskManager,以及外部存储系统的协调。在应用进行检查点保存时,首先会由JobManager向所有TaskManager发出触发检查点的命令;TaskManger收到之后,将当前任务的所有状态进行快照保存,持久化到远程的存储介质中;完成之后向JobManager返回确认信息。这个过程是分布式的,当JobManger收到所有TaskManager的返回信息后,就会确认当前检查点成功保存。而这一切工作的协调,就需要一个“专职人员”来完成。

image-20221222112544834

在Flink中,状态的存储、访问以及维护,都是由一个可插拔的组件决定的,这个组件就叫作状态后端(state backend)。状态后端主要负责两件事:一是本地的状态管理,二是将检查点(checkpoint)写入远程的持久化存储。

3.1.状态后端的分类

状态后端是一个“开箱即用”的组件,可以在不改变应用程序逻辑的情况下独立配置。Flink中提供了两类不同的状态后端,一种是“哈希表状态后端”(HashMapStateBackend),另一种是“内嵌RocksDB状态后端”(EmbeddedRocksDBStateBackend)。如果没有特别配置,系统默认的状态后端是HashMapStateBackend。

(1)哈希表状态后端

这种方式就是把状态放在内存里。具体实现上,哈希表状态后端在内部会直接把状态当作对象(objects),保存在Taskmanager的JVM堆(heap)上。普通的状态,以及窗口中收集的数据和触发器(triggers),都会以键值对(key-value)的形式存储起来,所以底层是一个哈希表(HashMap),这种状态后端也因此得名。

(2)内嵌RocksDB状态后端

RocksDB是一种内嵌的key-value存储介质,可以把数据持久化到本地硬盘。配置EmbeddedRocksDBStateBackend后,会将处理中的数据全部放入RocksDB数据库中,RocksDB默认存储在TaskManager的本地数据目录里。

说明:对于检查点,两个状态后端都会写入到远程的持久化文件系统中。

3.2.如何选择状态后端

HashMap和RocksDB两种状态后端最大的区别,就在于本地状态存放在哪里:前者是内存,后者是RocksDB本地硬盘。

  • HashMapStateBackend是内存计算,读写速度非常快;但是,状态的大小会受到集群可用内存的限制,如果应用的状态随着时间不停地增长,就会耗尽内存资源。
  • 而RocksDB是硬盘存储,所以可以根据可用的磁盘空间进行扩展,而且是唯一支持增量检查点的状态后端,所以它非常适合于超级海量状态的存储。不过由于每个状态的读写都需要做序列化/反序列化,而且可能需要直接从磁盘读取数据,这就会导致性能的降低,平均读写性能要比HashMapStateBackend慢一个数量级。

目前用的基本都是把本地状态放在内存,因为对于大多数公司来说,使用内存提升计算效率所带来的收益是远远超出存储成本的。

4.检查点恢复具体流程

在运行流处理程序时,Flink会周期性地保存检查点。当发生故障时,就需要找到最近一次成功保存的检查点来恢复状态。

举例,我们一个任务处理完三个数据后保存了一个检查点。之后又正常处理了一个数据,在处理第五个数据时发生了故障。

image-20221222150356111

这里Source任务已经处理完毕,所以偏移量为5;Map任务也处理完成了。而Sum任务在处理中发生了故障,此时状态并未保存。

接下来就需要从检查点来恢复状态了。具体的步骤为:

(1)重启应用

遇到故障之后,第一步当然就是重启。我们将应用重新启动后,所有任务的状态会清空,如下图所示:

image-20221222150522205

(2)读取检查点,重置状态

找到最近一次保存的检查点,从中读出每个算子任务状态的快照,分别填充到对应的状态中。这样,Flink内部所有任务的状态,就恢复到了保存检查点的那一时刻,也就是刚好处理完第三个数据的时候,如下图所示

image-20221222150625406

(3)重放数据

从检查点恢复状态后还有一个问题:如果直接继续处理数据,那么保存检查点之后、到发生故障这段时间内的数据,也就是第4、5个数据就相当于丢掉了;这会造成计算结果的错误。

为了不丢数据,我们应该从保存检查点后开始重新读取数据,这可以通过Source任务向外部数据源重新提交偏移量(offset)来实现

image-20221222150910262

这样,整个系统的状态已经完全回退到了检查点保存完成的那一时刻。

(4)继续处理数据

接下来,我们就可以正常处理数据了。首先是重放第4、5个数据,然后继续读取后面的数据。

当处理到第5个数据时,就已经追上了发生故障时的系统状态。之后继续处理,就好像没有发生过故障一样;我们既没有丢掉数据、也没有重复计算数据,这就保证了计算结果的正确性。

5.检查点算法

在Flink中,采用了基于Chandy-Lamport算法的分布式快照,可以在不暂停整体流处理的前提下,将状态备份保存到检查点。

5.1.检查点分界线

Flink 的检查点算法用到了一种称为分界线(barrier)的特殊数据形式,专门用来表示触发检查点保存的时间点。收到保存检查点的指令后,Source任务可以在当前数据流中插入这个结构;之后的所有任务只要遇到它就开始对状态做持久化快照保存。由于数据流是保持顺序依次处理的,因此遇到这个标识就代表之前的数据都处理完了,可以保存一个检查点;而在它之后的数据,引起的状态改变就不会体现在这个检查点中,而需要保存到下一个检查点。

这种特殊的数据形式,把一条流上的数据按照不同的检查点分隔开,所以就叫做检查点的“分界线”。

5.2.分布式快照算法

watermark指示的是“之前的数据全部到齐了”,而barrier指示的是“之前所有数据的状态更改保存入当前检查点”:它们都是一个“截止时间”的标志。所以在处理多个分区的传递时,也要以是否还会有数据到来作为一个判断标准。

具体实现上,Flink使用了Chandy-Lamport算法的一种变体,被称为“异步分界线快照”(asynchronous barrier snapshotting)算法。算法的核心就是两个原则:当上游任务向多个并行下游任务发送barrier时,需要广播出去;而当多个上游任务向同一个下游任务传递barrier时,需要在下游任务执行“分界线对齐”(barrier alignment)操作,也就是需要等到所有并行分区的barrier都到齐,才可以开始状态的保存。

场景说明:我们有两个并行的Source任务,会分别读取两个数据流。此时第一条流的Source任务读取了3个数据,偏移量为3;而第二条流的Source任务只读取了一个“hello”数据,偏移量为1。

image-20221222172034901

接下来就是检查点保存的算法。具体过程如下:

(1)JobManager发送指令,触发检查点的保存;Source任务保存状态,插入分界线

JobManager 会周期性地向每个 TaskManager发送一条带有新检查点 ID 的消息,通过这种方式来启动检查点。收到指令后,TaskManger会在所有Source 任务中插入一个分界线(barrier),并将偏移量保存到远程的持久化存储中,如下图所示:

image-20221222172222093

并行的Source任务保存的状态为3和1,表示当前的1号检查点应该包含:第一条流中截至第三个数据、第二条流中截至第一个数据的所有状态更改。可以发现Source任务做这些的时候并不影响后面任务的处理,Sum任务已经处理完了第一条流中传来的(world, 1),对应的状态也有了更改。

(2)状态快照保存完成,分界线向下游传递

状态存入持久化存储之后,会返回通知给 Source 任务;Source 任务就会向 JobManager 确认检查点完成,然后跟数据一样把barrier向下游任务传递,如下图所示:

image-20221222172401498

由于Source和Map之间是一对一(forward)的传输关系,所以barrier可以直接传递给对应的Map任务。之后Source任务就可以继续读取新的数据了。与此同时,Sum 1已经将第二条流传来的(hello,1)处理完毕,更新了状态。

(3)向下游多个并行子任务广播分界线,执行分界线对齐

Map任务没有状态,所以直接将barrier继续向下游传递。这时由于进行了keyBy分区,所以需要将barrier广播到下游并行的两个Sum任务。同时,Sum任务可能收到来自上游两个并行Map任务的barrier,所以需要执行“分界线对齐”操作。

wps2

此时的Sum 2收到了来自上游两个Map任务的barrier,说明第一条流第三个数据、第二条流第一个数据都已经处理完,可以进行状态的保存了;而Sum 1只收到了来自Map 2的barrier,所以这时需要等待分界线对齐。在等待的过程中,如果分界线尚未到达的分区任务Map 1又传来了数据(hello, 1),说明这是需要保存到检查点的,Sum任务应该正常继续处理数据,状态更新为3;而如果分界线已经到达的分区任务Map 2又传来数据,这已经是下一个检查点要保存的内容了,就不应立即处理,而是要缓存起来、等到状态保存之后再做处理。

(4)分界线对齐后,保存状态到持久化存储

各个分区的分界线都对齐后,就可以对当前状态做快照,保存到持久化存储了。存储完成之后,同样将barrier向下游继续传递,并通知JobManager保存完毕

image-20221222172801389

这个过程中,每个任务保存自己的状态都是相对独立的,互不影响。我们可以看到,当Sum将当前状态保存完毕时,Source 1任务已经读取到第一条流的第五个数据了。

(5)先处理缓存数据,然后正常继续处理

完成检查点保存之后,任务就可以继续正常处理数据了。这时如果有等待分界线对齐时缓存的数据,需要先做处理;然后再按照顺序依次处理新到的数据。

当JobManager收到所有任务成功保存状态的信息,就可以确认当前检查点成功保存。之后遇到故障就可以从这里恢复了。

6.检查点的配置

检查点很多具体的配置项,可以跟每个作业相关,可以为每个单独的作业配置他的特色。

检查点部分可配置内容如下:

// 进行检查点配置
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
// 自动存盘机制的间隔
checkpointConfig.setCheckpointInterval(200);
// 检查点保存超时时间1分钟,超时后丢掉检查点
checkpointConfig.setCheckpointTimeout(10000);
// 设置精确一次模式
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 允许检查点失败的次数,默认是0,相当于检查点保存失败,会导致整个任务失败,这个默认配置比较严格
checkpointConfig.setTolerableCheckpointFailureNumber(3);
// 最大同时出现的检查点数量
checkpointConfig.setMaxConcurrentCheckpoints(2);
// 两个检查点之间的最小暂停时间,至少要保证两个检查点保存间隔的时间
checkpointConfig.setMinPauseBetweenCheckpoints(1000);
// 启用不对齐的检查点保存方式
checkpointConfig.enableUnalignedCheckpoints();
// 设置一个对齐的超时时间
checkpointConfig.setAlignmentTimeout(Duration.ofMillis(100));
// 设置一个外部检查点清理机制
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 设置检查点存储,可以直接传入一个String,指定文件系统的路径
checkpointConfig.setCheckpointStorage("hdfs://my/checkpoint/dir")

转载请注明:西门飞冰的博客 » Flink 状态持久化和检查点

喜欢 (0)or分享 (0)