由于blog各种垃圾评论太多,而且本人审核评论周期较长,所以懒得管理评论了,就把评论功能关闭,有问题可以直接qq骚扰我

Flink 状态管理

大数据 西门飞冰 1327℃
[隐藏]

1.状态的概念

在流处理中,数据是连续不断到来和处理的。每个任务进行计算处理时,可以基于当前数据直接转换得到输出结果;也可以依赖一些其他数据。这些由一个任务维护,并且用来计算输出结果的所有数据,就叫作这个任务的状态。

什么场景会用到状态,下面列举了三种场景:

  • 去重:比如上游的系统数据可能会重复,落到下游系统时希望把重复的数据都去掉。去重需要先了解哪些数据来过,哪些数据还没有来,也就是把所有的主键都记录下来,当一条数据到来后,能够看到在主键当中是否存在。
  • 窗口计算:比如统计每分钟Nginx日志API被访问了多少次。窗口是一分钟计算一次,在窗口触发前,如08:00~08:01这个窗口,前59秒的数据来了需要先放入内存,即需要把这个窗口之内的数据先保留下来,等到8:01时一分钟后,再将整个窗口内触发的数据输出。未触发的窗口也是一种状态。
  • 访问历史数据:比如与昨天的数据进行对比,需要访问一些历史数据。如果每次从外部去读,对资源的消耗可能比较大,所以也希望把这些历史数据也放入状态中做对比。

2.有状态的算子

在Flink中,算子任务可以分为无状态和有状态两种情况。

无状态算子:无论这条输入进来多少次,输出的结果都是一样的,因为单条输入中已经包含了所需的所有信息。所以相同输入可以得到相同输出,这就是一个无状态算子。如map、filter、flatMap,计算时不依赖其他数据,就都属于无状态的算子。

有状态的算子任务:则除当前数据之外,还需要一些其他数据来得到计算结果。这里的“其他数据”,就是所谓的状态(state),最常见的就是之前到达的数据,或者由之前数据计算出的某个结果。比如,做求和(sum)计算时,需要保存之前所有数据的和,这就是状态;窗口算子中会保存已经到达的所有数据,这些也都是它的状态。下图所示是一个有状态的算子处理流程:

image-20221220215844542

有状态的流处理过程,是一个标准的事务处理过程,只不过是把传统的数据库换成了内存。

3.状态管理机制

在传统的事务型处理架构中,这种额外的状态数据是保存在数据库中的。而对于实时流处理来说,这样做需要频繁读写外部数据库,如果数据规模非常大肯定就达不到性能要求了。所以Flink的解决方案是,将状态直接保存在内存中来保证性能,并通过分布式扩展来提高吞吐量。

但对于流式作业有以下要求:

  • 7*24小时运行,高可靠
  • 数据不丢不重,恰好计算一次
  • 数据实时产出,不延迟

基于以上要求,内存的管理就会出现一些问题:

  • 由于内存的容量是有限的,如果要做24小时的窗口计算,将24小时的数据都放到内存,可能会出现内存不足
  • 作业是7*24小时运行,需要保障高可用,机器若出现故障或者宕机,需要考虑如何备份及从备份中恢复,保证运行的作业不受影响
  • 还应该考虑到分布式应用的横向扩展性。比如处理的数据量增大时,我们应该相应地对计算资源扩容,调大并行度。这时就涉及到了状态的重组调整。
  • 状态的访问权限。我们知道Flink上的聚合和窗口操作,一般都是基于KeyedStream的,数据会按照key的哈希值进行分区,聚合处理的结果也应该是只对当前key有效。然而同一个分区(也就是slot)上执行的任务实例,可能会包含多个key的数据,它们同时访问和更改本地变量,就会导致计算结果错误。所以这时状态并不是单纯的本地变量。

可见状态的管理并不是一件轻松的事。好在Flink作为有状态的大数据流式处理框架,已经帮我们搞定了这一切。Flink有一套完整的状态管理机制,将底层一些核心功能全部封装起来,包括状态的高效存储和访问、持久化保存和故障恢复,以及资源扩展时的调整。这样,我们只需要调用相应的API就可以很方便地使用状态,或对应用的容错机制进行配置

4.状态的分类

4.1.托管状态和原始状态

Flink的状态有两种:托管状态(Managed State)和原始状态(Raw State)

托管状态:就是由Flink统一管理的,状态的存储访问、故障恢复和重组等一系列问题都由Flink实现,我们只要调接口就可以。对于具体的状态内容,Flink也提供了值状态(ValueState)、列表状态(ListState)、映射状态(MapState)、聚合状态(AggregateState)等多种结构,内部支持各种数据类型。聚合、窗口等算子中内置的状态,就都是托管状态;我们也可以在富函数类(RichFunction)中通过上下文来自定义状态,这些也都是托管状态。

原始状态:是自定义的,相当于就是开辟了一块内存,需要我们自己管理,Flink不会对状态进行任何自动操作,也不知道状态的具体数据类型,只会把它当作最原始的字节(Byte)数组来存储。我们需要花费大量的精力来处理状态的管理和维护,实现状态的序列化和故障恢复。

只有在遇到托管状态无法实现的特殊需求时,我们才会考虑使用原始状态;一般情况下不推荐使用。绝大多数应用场景,我们都可以用Flink提供的算子或者自定义托管状态来实现需求。

接下来我们的重点就是托管状态。

4.2.算子状态和按键分区状态

我们知道在Flink中,一个算子任务会按照并行度分为多个并行子任务执行,而不同的子任务会占据不同的任务槽(task slot)。由于不同的slot在计算资源上是物理隔离的,所以Flink能管理的状态在并行任务间是无法共享的,每个状态只能针对当前子任务的实例有效。

而很多有状态的操作(比如聚合、窗口)都是要先做keyBy进行按键分区的。按键分区之后,任务所进行的所有计算都应该只针对当前key有效,所以状态也应该按照key彼此隔离。在这种情况下,状态的访问方式又会有所不同。

基于这样的想法,我们又可以将托管状态分为两类:算子状态和按键分区状态。

(1)算子状态

状态作用范围限定为当前的算子任务实例,也就是只对当前并行子任务实例有效。这就意味着对于一个并行子任务,占据了一个“分区”,它所处理的所有数据都会访问到相同的状态,状态对于同一任务而言是共享的,如下图所示

image-20221220225655132

算子状态可以用在所有算子上,使用的时候其实就跟一个本地变量没什么区别——因为本地变量的作用域也是当前任务实例。

(2)按键分区状态

状态是根据输入流中定义的键(key)来维护和访问的,所以只能定义在按键分区流(KeyedStream)中,也就keyBy之后才可以使用,如下图所示:

image-20221220225826907

按键分区状态应用非常广泛。如聚合算子必须在keyBy之后才能使用,就是因为聚合的结果是以Keyed State的形式保存的。另外,也可以通过富函数类(Rich Function)来自定义Keyed State,所以只要提供了富函数类接口的算子,也都可以使用Keyed State。

所以即使是map、filter这样无状态的基本转换算子,我们也可以通过富函数类给它们“追加”Keyed State,或者实现CheckpointedFunction接口来定义Operator State;从这个角度讲,Flink中所有的算子都可以是有状态的,不愧是“有状态的流处理”。

无论是Keyed State还是Operator State,它们都是在本地实例上维护的,也就是说每个并行子任务维护着对应的状态,算子的子任务之间状态不共享。

5.Keyed State的概念

在实际应用中,我们一般都需要将数据按照某个key进行分区,然后再进行计算处理;所以最为常见的状态类型就是Keyed State。keyBy之后的聚合、窗口计算,算子所持有的状态,都是Keyed State。

我们知道,在进行按键分区(keyBy)之后,具有相同键的所有数据,都会分配到同一个并行子任务中;所以如果当前任务定义了状态,Flink就会在当前并行子任务实例中,为每个键值维护一个状态的实例。于是当前任务就会为分配来的所有数据,按照key维护和处理对应的状态。

因为一个并行子任务可能会处理多个key的数据,所以Flink需要对Keyed State进行一些特殊优化。在底层,Keyed State类似于一个分布式的映射(map)数据结构,所有的状态会根据key保存成键值对(key-value)的形式。这样当一条数据到来时,任务就会自动将状态的访问范围限定为当前数据的key,从map存储中读取出对应的状态值。所以具有相同key的所有数据都会到访问相同的状态,而不同key的状态之间是彼此隔离的。

这种将状态绑定到key上的方式,相当于使得状态和流的逻辑分区一一对应了:不会有别的key的数据来访问当前状态;而当前状态对应key的数据也只会访问这一个状态,不会分发到其他分区去。这就保证了对状态的操作都是本地进行的,对数据流和状态的处理做到了分区一致性。

另外,在应用的并行度改变时,状态也需要随之进行重组。不同key对应的Keyed State可以进一步组成所谓的键组(key groups),每一组都对应着一个并行子任务。键组是Flink重新分配Keyed State的单元,键组的数量就等于定义的最大并行度。当算子并行度发生改变时,Keyed State就会按照当前的并行度重新平均分配,保证运行时各个子任务的负载相同。

6.Keyed State的结构类型

实际应用中,需要保存为状态的数据会有各种各样的类型,有时还需要复杂的集合类型,比如列表(List)和映射(Map)。对于这些常见的用法,Flink的按键分区状态(Keyed State)提供了足够的支持。接下来我们就来了解一下Keyed State 所支持的结构类型

  • 值状态(ValueState):顾名思义,状态中只保存一个“值”(value)。
  • 列表状态(ListState):将需要保存的数据,以列表(List)的形式组织起来。
  • 映射状态(MapState):把一些键值对(key-value)作为状态整体保存起来,可以认为就是一组key-value映射的列表。
  • 规约状态(ReducingState):类似于值状态(Value),不过需要对添加进来的所有数据进行归约,将归约聚合之后的值作为状态保存下来。
  • 聚合状态(AggregatingState):与归约状态非常类似,聚合状态也是一个值,用来保存添加进来的所有数据的聚合结果。与ReducingState不同的是,它的聚合逻辑是由在描述器中传入一个更加一般化的聚合函数(AggregateFunction)来定义的;这也就是之前我们讲过的AggregateFunction,里面通过一个累加器(Accumulator)来表示状态,所以聚合的状态类型可以跟添加进来的数据类型完全不同,使用更加灵活。

image-20221221224157363

7.状态编程介绍

在 Flink 中,状态始终是与特定算子相关联的;算子在使用状态前首先需要“注册”,其实就是告诉Flink当前上下文中定义状态的信息,这样运行时的 Flink 才能知道算子有哪些状态。

状态的注册,主要是通过“状态描述器”(StateDescriptor)来实现的。状态描述器中最重要的内容,就是状态的名称(name)和类型(type)。我们知道Flink中的状态,可以认为是加了一些复杂操作的内存中的变量;而当我们在代码中声明一个局部变量时,都需要指定变量类型和名称,名称就代表了变量在内存中的地址,类型则指定了占据内存空间的大小。同样地,我们一旦指定了名称和类型,Flink就可以在运行时准确地在内存中找到对应的状态,进而返回状态对象供我们使用了。所以在一个算子中,我们也可以定义多个状态,只要它们的名称不同就可以了。

另外,状态描述器中还可能需要传入一个用户自定义函数(user-defined-function,UDF),用来说明处理逻辑,比如前面提到的ReduceFunction和AggregateFunction。

以ValueState为例,我们可以定义值状态描述器如下:

ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>(
				"my state", // 状态名称
				Types.LONG // 状态类型
);

这里我们定义了一个叫作“my state”的长整型ValueState的描述器。

代码中完整的操作是,首先定义出状态描述器;然后调用.getRuntimeContext()方法获取运行时上下文;继而调用RuntimeContext的获取状态的方法,将状态描述器传入,就可以得到对应的状态了。

8.值状态—代码使用

案例,统计用户访问频次,使用值状态来保存count值

public class StateTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(0))
                        .withTimestampAssigner((element, recordTimestamp) -> element.timestamp)
                );

        stream.keyBy(value -> value.user)
                .flatMap(new RichFlatMapFunction<Event, String>() {

                    // 定义一个值状态
                    ValueState<Long> myValueState;
                    
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        // 定义一个值状态描述器,用来在运行时环境中声明状态
                        ValueStateDescriptor<Long> valueStateDescriptor = new ValueStateDescriptor<>("my-value", Long.class);
                        myValueState = getRuntimeContext().getState(valueStateDescriptor);                        
                    }

                    @Override
                    public void flatMap(Event value, Collector<String> out) throws Exception {
                        // 对值状态进行处理操作
                        if (myValueState.value() == null) {
                            myValueState.update(1L);
                        } else {
                            myValueState.update(myValueState.value() + 1);
                        }

                        out.collect("my value state - " + "用户" + value.user + "的访问频次为:" + myValueState.value());
                    }
                }).print();

        env.execute();
    }
}

程序输出结果如下:

image-20221221125429213

9.列表状态—代码使用

简单模拟,使用列表状态记录用户的访问信息

public class StateTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(0))
                        .withTimestampAssigner((element, recordTimestamp) -> element.timestamp)
                );


        stream.keyBy(value -> value.user)
                .flatMap(new RichFlatMapFunction<Event, String>() {
                    // 定义一个列表状态变量
                    ListState<Event> myListState;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        // 定义一个列表状态描述器
                        myListState = getRuntimeContext().getListState(new ListStateDescriptor<Event>("my-list", Event.class));
                    }

                    @Override
                    public void flatMap(Event value, Collector<String> out) throws Exception {
                        // 使用列表状态
                        myListState.add(value);
                        out.collect("my list state: " + myListState.get());
                    }
                }).print();

        env.execute();
    }
}

程序输出结果如下:

image-20221221141300045

10.映射状态—代码使用

映射状态的用法和Java中的HashMap很相似。在这里我们通过MapState模拟一个URL的访问统计测试。

public class StateTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(0))
                        .withTimestampAssigner((element, recordTimestamp) -> element.timestamp)
                );

        stream.keyBy(value -> value.user)
                .flatMap(new RichFlatMapFunction<Event, String>() {
                    // 定义一个映射状态变量,保存(url, count)
                    MapState<String, Long> myMapState;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        // 定义一个映射状态描述器,用来在运行时环境中声明状态
                        myMapState = getRuntimeContext().getMapState(new MapStateDescriptor<String, Long>("my-map", String.class, Long.class));
                    }

                    @Override
                    public void flatMap(Event value, Collector<String> out) throws Exception {
                        // 使用映射状态
                        if (myMapState.contains(value.url)) {
                            myMapState.put(value.url, myMapState.get(value.url) + 1);
                        } else {
                            myMapState.put(value.url, 1L);
                        }
                        out.collect("my map state: (" + value.url + ", " + myMapState.get(value.url) + ")");
                    }
                }).print();

        env.execute();
    }
}

程序输出结果如下:

image-20221221141934620

11.规约状态—代码使用

模拟使用规约状态记录用户访问的所有URL信息

public class StateTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(0))
                        .withTimestampAssigner((element, recordTimestamp) -> element.timestamp)
                );

        stream.keyBy(value -> value.user)
                .flatMap(new RichFlatMapFunction<Event, String>() {
                    // 定义一个规约状态变量,保存(url, count)
                    ReducingState<Event> myReducingState;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        // 定义一个规约状态描述器,用来在运行时环境中声明状态
                        ReducingStateDescriptor<Event> reducingStateDescriptor = new ReducingStateDescriptor<>(
                                "my-reducing",
                                ((value1, value2) -> new Event(value1.user, value1.url + ", " + value2.url, value1.timestamp > value2.timestamp ? value1.timestamp : value2.timestamp)),
                                Event.class
                        );
                        myReducingState = getRuntimeContext().getReducingState(reducingStateDescriptor);
                    }

                    @Override
                    public void flatMap(Event value, Collector<String> out) throws Exception {
                        // 使用归约状态
                        myReducingState.add(value);
                        out.collect("my reducing state: " + myReducingState.get());
                    }
                }).print();

        env.execute();
    }
}

程序输出结果如下:

image-20221221144451103

12.聚合状态—代码使用

聚合状态也是一个值,用来保存添加进来的所有数据的聚合结果。我们这里测试使用聚合状态来记录用户访问网站的频次

public class StateTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(0))
                        .withTimestampAssigner((element, recordTimestamp) -> element.timestamp)
                );

        stream.keyBy(value -> value.user)
                .flatMap(new RichFlatMapFunction<Event, String>() {
                    // 声明聚合状态变量
                    AggregatingState<Event, String> myAggState;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                      	// 定义一个状态描述器
                        AggregatingStateDescriptor<Event, Integer, String> aggregatingStateDescriptor = new AggregatingStateDescriptor<>(
                                "my-agg",
                                new AggregateFunction<Event, Integer, String>() {
                                    @Override
                                    public Integer createAccumulator() {
                                        return 0;
                                    }

                                    @Override
                                    public Integer add(Event value, Integer accumulator) {
                                        return accumulator + 1;
                                    }

                                    @Override
                                    public String getResult(Integer accumulator) {
                                        return "当前用户的访问频次为:" + accumulator;
                                    }

                                    @Override
                                    public Integer merge(Integer a, Integer b) {
                                        return a + b;
                                    }
                                },
                                Integer.class
                        );
                        myAggState = getRuntimeContext().getAggregatingState(aggregatingStateDescriptor);
                    }

                    @Override
                    public void flatMap(Event value, Collector<String> out) throws Exception {
                        // 使用聚合状态
                        myAggState.add(value);
                        out.collect("my aggregating state: " + myAggState.get());
                    }
                }).print();

        env.execute();
    }
}

程序输出结果如下:

image-20221221150820603

13.状态的生存时间TTL

在实际应用中,很多状态会随着时间的推移逐渐增长,如果不加以限制,最终就会导致存储空间的耗尽。一个优化的思路是直接在代码中调用.clear()方法去清除状态,但是有时候我们的逻辑要求不能直接清除。这时就需要配置一个状态的“生存时间”(time-to-live,TTL),当状态在内存中存在的时间超出这个值时,就将它清除。

具体实现上,状态创建的时候,设置 失效时间 = 当前时间 + TTL;之后如果有对状态的访问和修改,我们可以再对失效时间进行更新;当设置的清除条件被触发时,就可以判断状态是否失效、从而进行清除了。

代码配置示例:

public class StateTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(0))
                        .withTimestampAssigner((element, recordTimestamp) -> element.timestamp)
                );

        stream.keyBy(value -> value.user)
                .flatMap(new RichFlatMapFunction<Event, String>() {

                    // 定义一个值状态
                    ValueState<Long> myValueState;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        // 设置一个状态生存时间,时间是1小时
                        StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.hours(1))
                                // 设置更新的类型
                                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                                // 设置状态的可见性
                                .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
                                .build();

                        // 定义一个值状态描述器
                        ValueStateDescriptor<Long> valueStateDescriptor = new ValueStateDescriptor<>("my-value", Long.class);
                        // 状态描述器启动TTL功能
                        valueStateDescriptor.enableTimeToLive(ttlConfig);
                        myValueState = getRuntimeContext().getState(valueStateDescriptor);
                    }

                    @Override
                    public void flatMap(Event value, Collector<String> out) throws Exception {
                        // 对值状态进行处理操作
                        if (myValueState.value() == null) {
                            myValueState.update(1L);
                        } else {
                            myValueState.update(myValueState.value() + 1);
                        }

                        out.collect("my value state - " + "用户" + value.user + "的访问频次为:" + myValueState.value());
                    }
                }).print();

        env.execute();
    }
}

这里用到了几个配置项:

.newBuilder():状态TTL配置的构造器方法,方法需要传入一个Time作为参数,这就是设定的状态生存时间。

.setUpdateType():设置更新类型。更新类型指定了什么时候更新状态失效时间,这里的OnCreateAndWrite表示只有创建状态和更改状态(写操作)时更新失效时间。另一种类型OnReadAndWrite则表示无论读写操作都会更新失效时间,也就是只要对状态进行了访问,就表明它是活跃的,从而延长生存时间。这个配置默认为OnCreateAndWrite。

.setStateVisibility():设置状态的可见性。所谓的“状态可见性”,是指因为清除操作并不是实时的,所以当状态过期之后还有可能继续存在,这时如果对它进行访问,能否正常读取到就是一个问题了。这里设置的NeverReturnExpired是默认行为,表示从不返回过期值,也就是只要过期就认为它已经被清除了,应用不能继续读取;这在处理会话或者隐私数据时比较重要。对应的另一种配置是ReturnExpireDefNotCleanedUp,就是如果过期状态还存在,就返回它的值。

这里需要注意,目前的TTL设置只支持处理时间。

转载请注明:西门飞冰的博客 » Flink 状态管理

喜欢 (0)or分享 (0)