由于blog各种垃圾评论太多,而且本人审核评论周期较长,所以懒得管理评论了,就把评论功能关闭,有问题可以直接qq骚扰我

Flink 中的时间语义和水位线

大数据 西门飞冰 854℃
[隐藏]

1.Flink中的时间语义

 

image-20221216234930606

如上图所示,在事件发生之后,生成的数据被收集起来,首先进入消息队列,然后被Flink系统中的Source算子读取消费,进而向下游的转换算子传递,最终由窗口进行计算处理。

很明显这里面有两个时间点

  • Event Time:是事件创建的时间,叫做“事件时间”。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink 通过时间戳分配器访问事件时间戳。
  • Processing Time:数据真正被处理的时刻,叫作“处理时间”,由于分布式系统中网络传输的延迟和时钟漂移,处理时间相对事件发生的时间会有所滞后

在Flink中, 窗口是用来处理无界流的核心。我们所定义的窗口操作,到底是以那种时间作为衡量标准,就是所谓的”时间语义”

2.那种时间语义更重要

两种时间语义都有各自的用途,适用于不同的场景。

在实际业务中,使用”处理时间”明显是不合适的,我们更关心的显然是数据本身产生的时间。所以在实际应用中,事件时间语义会更为常见。

在一些非业务场景,比如使用Flink 实现监控报警,反爬等,就可以使用处理时间,而且处理时间使用起来比事件时间要简单很多。

在Flink中,由于处理时间比较简单,早期版本默认的时间语义是处理时间;而考虑到事件时间在实际应用中更为广泛,从1.12版本开始,Flink已经将事件时间作为了默认的时间语义。

3.水位线的概念

现在有了每一个事件对应的时间戳,只知道数据应该分配到那个窗口,但是现在有个问题,就是当前的窗口应该如何关闭,比如8点到9点的一个窗口,如果是处理时间语义,这个很简单,就是看当前系统时间,到9点窗口就直接关闭,然后就做计算,输出结果。

但如果是事件时间,就要看当前数据里面带着的时间戳,到对应的时间才能关闭这个窗口,这种情况下,在系统里面要有另外的一个时钟去表达当前时间的进展的,这在flink里面是单独的提出了另外一套体系。

在Flink中,用来衡量事件时间(Event Time)进展的标记,就被称作“水位线”(Watermark)。

具体的实现上,水位线可以看出一个特殊的数据记录,它其实就是插入在数据流里面的一个标记点,具体怎么插入和当前某一个数据有关,即使数据被拦住了不往后面发送了,水位线也可以继续往后进行发送,这样就起到指示事件时间进展的功能。

4.水位线插入策略

在实际运行过程中,不会每来一条数据就插入一条水位线,因为Flink这样的大数据流处理框架,每秒处理的数据是可以到百万级、千万级的,这么多的数据就算用毫秒来记录他的时间戳,大量数据的时间戳可能都一样,要是每一个数据后面都插入一个水位线的话,就相当于做了大量的无用功

水位线的插入有两种策略:

  • 一种是每来一条数据就插入一个水位线,但是这种在数据量非常大的情况下,不太划算,因为不光大量数据时间戳一样,而且还影响性能
  • 一种是周期性的插入水位线,针对同一时刻来的大量数据,时间戳可能意义,就周期性的插入水位线,减少无用功(插入的水位线,按照已经到来的所有数据里面最大的时间戳保持下来,到了插入水位线的周期点,就根据当前最大的时间戳生成水位线)

这两种插入方式各有优缺点:

  • 针对稀疏数据场景:要是使用周期性的方式插入水位线,在周期内可能没有数据来,就会一直插入同样的水位线,相当于做了无用功。这种就适合来一条数据插入一个水位线。
  • 针对密集数据场景:数据的时间戳可能都一样,每来一个数据就插入一个水位线,就相当于做了无用功,这种情况对我们计算性能是有影响的,这种场景就适合周期性插入水位线。

Flink底层使用的是周期性的插入水位线,时间间隔默认200毫秒

5.乱序流中的水位线

我们知道,流处理从事件产生,到被FlinkSource算子读取,再到operator,中间是有一个过程和时间的,虽然大部分情况下,流到 operator 的数据都是按照事件产生的时间顺序来的,但是也不排除由于分布式(如kafka多分区)、网络延迟、程序阻塞等原因,导致乱序的产生,所谓乱序,就是指 Flink 接收到的事件的先后顺序不是严格按照事件的 Event Time 顺序排列的。

image-20221216215214288

如上图所示,2秒3秒生成的数据,生成时间自然比5秒的数据早;但是经过分布式传输之后,flink处理任务先收到了5秒的数据,之后2秒3秒的数据才姗姗来迟,这时如果我们希望插入水位线,来指示当前的事件时间进展,又该怎么做呢?

解决思路也很简单:我们还是靠数据来驱动,每来一个数据就提取它的时间戳、插入一个水位线。不过现在的情况是数据乱序,所以插入新的水位线时,要先判断一下时间戳是否比之前的大,否则就不再生成新的水位线。也就是说,只有数据的时间戳比当前时钟大,才能推动时钟前进,这时才插入水位线。

但是这样做会带来一个非常大的问题:我们无法正确处理“迟到”的数据。为了让窗口能够正确收集到迟到的数据,我们也可以等上一段时间,比如2秒;也就是用当前已有数据的最大时间戳减去2秒,就是要插入的水位线的时间戳。这样的话,5秒的数据到来之后,时间时钟不会直接推进到5秒,而是进展到了3秒,必须等到7秒的数据到来之后,时间时钟才会推进到5秒,这时迟到数据也都已收集齐,1~5秒的窗口就可以正确计算结果了。

image-20221216224110547

6.水位线的特性总结

现在我们可以知道,水位线就代表了当前的事件时间时钟,而且可以在数据的时间戳基础上加一些延迟来保证不丢数据,这一点对于乱序流的正确处理非常重要。

总结一下水位线的特性:

  • 水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据
  • 水位线主要的内容是一个时间戳,用来表示当前事件时间的进展
  • 水位线是基于数据的时间戳生成的
  • 水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进
  • 水位线可以通过设置延迟,来保证正确处理乱序数据
  • 一个水位线Watermark(t),表示在当前流中事件时间已经达到了时间戳t, 这代表t之前的所有数据都到齐了,之后流中不会出现时间戳t’ ≤ t的数据

水位线是Flink流处理中保证结果正确性的核心机制,它往往会跟窗口一起配合,完成对乱序数据的正确处理。

7.生成水位线的总体原则

完美的水位线是“绝对正确”的,也就是一个水位线一旦出现,就表示这个时间之前的数据已经全部到齐、之后再也不会出现了。不过如果要保证绝对正确,就必须等足够长的时间,这会带来更高的延迟。

如果我们希望计算结果能更加准确,那可以将水位线的延迟设置得更高一些,等待的时间越长,自然也就越不容易漏掉数据。不过这样做的代价是处理的实时性降低了,我们可能为极少数的迟到数据增加了很多不必要的延迟。

如果我们希望处理得更快、实时性更强,那么可以将水位线延迟设得低一些。这种情况下,可能很多迟到数据会在水位线之后才到达,就会导致窗口遗漏数据,计算结果不准确。当然,如果我们对准确性完全不考虑、一味地追求处理速度,可以直接使用处理时间语义,这在理论上可以得到最低的延迟。

所以Flink中的水位线,其实是流处理中对低延迟和结果正确性的一个权衡机制,而且把控制的权力交给了程序员,我们可以在代码中定义水位线的生成策略。

8.代码中自定义水位线生成

在Flink的DataStream API中,有一个单独用于生成水位线的方法:.assignTimestampsAndWatermarks(),它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间,.assignTimestampsAndWatermarks()方法需要传入一个WatermarkStrategy作为参数,这就是所谓的“水位线生成策略”。WatermarkStrategy中包含了一个“时间戳分配器” TimestampAssigner和一个“水位线生成器” WatermarkGenerator。

import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class WatermarkTest {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 自动生成水位线的时间间隔,单位毫秒
        env.getConfig().setAutoWatermarkInterval(100);

        // 添加数据源
        env.addSource(new ClickSource())
                // 自定义水位线生成策略
                .assignTimestampsAndWatermarks(new WatermarkStrategy<Event>() {
                    // 水位线生成器:主要负责按照既定的方式,基于时间戳生成水位线。
                    @Override
                    public WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                        return new WatermarkGenerator<Event>() {
                            // 乱序数据延迟时间
                            private final Long delay = 2000L;
                            // 保存流中的最大时间戳(+delay的目的,主要是初始产生水位线的时候,不会精度溢出)
                            private Long maxTs = Long.MIN_VALUE + delay;

                            // onEvent:每个事件(数据)到来都会调用的方法,它的参数有当前事件、时间戳,以及允许发出水位线的一个WatermarkOutput,可以基于事件做各种操作
                            @Override
                            public void onEvent(Event event, long l, WatermarkOutput watermarkOutput) {
                                // 每来一条数据就发出一个水位线
                                watermarkOutput.emitWatermark(new Watermark(l));

                                // 更新当前的最大时间戳
                                maxTs = Math.max(maxTs, l);
                            }

                            // onPeriodicEmit:周期性调用的方法,可以由WatermarkOutput发出水位线。周期时间为处理时间,可以调用环境配置的.setAutoWatermarkInterval()方法来设置,默认为200ms。
                            @Override
                            public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
                                // 周期性发出水位线
                                watermarkOutput.emitWatermark(new Watermark(maxTs - delay));
                            }
                        };
                    }

                    // 时间戳分配器:主要负责从流中数据元素的某个字段中提取时间戳,并分配给元素。时间戳的分配是生成水位线的基础。
                    @Override
                    public TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
                        return new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event event, long l) {
                                return event.timestamp;
                            }
                        };
                    }
                });
    }
}

这里只是生成了水位线,并没有做数据的转换,所以直接打印是看不到水位线的信息的

水位线结合窗口,有了和时间的操作,才能看到水位线对系统的影响

9.Flink内置的水位线生成

Flink内置的水位线生成策略,可以通过调用WatermarkStrategy的静态辅助方法来创建。它们都是周期性生成水位线的,分别对应着处理有序流和乱序流的场景。

public class FlinkWatermarkTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.getConfig().setAutoWatermarkInterval(100);

        DataStreamSource<Event> stream = env.addSource(new ClickSource());

        // 处理有序流:就是直接拿当前最大的时间戳作为水位线就可以了。
        stream.assignTimestampsAndWatermarks( WatermarkStrategy.<Event>forMonotonousTimestamps()
                 // .withTimestampAssigner()方法,将数据中的timestamp字段提取出来,作为时间戳分配给数据元素                             
                .withTimestampAssigner((element, recordTimestamp) -> element.timestamp)
        );

        // 处理乱序流:由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间。这里设置的延迟时间是2秒
        // 这时生成水位线的时间戳,就是当前数据流中最大的时间戳减去延迟的结果,相当于把表调慢,当前时钟会滞后于数据的最大时间戳。
        stream.assignTimestampsAndWatermarks( WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                .withTimestampAssigner((element, recordTimestamp) -> element.timestamp)
        );

        env.execute();
    }
}

10.自定义数据源发出水位线

我们也可以在自定义的数据源中抽取事件时间,然后发送水位线。这里要注意的是,在自定义数据源中发送了水位线以后,就不能再在程序中使用assignTimestampsAndWatermarks方法来生成水位线了。在自定义数据源中生成水位线和在程序中使用assignTimestampsAndWatermarks方法生成水位线二者只能取其一。

自定义数据源用的比较少。

// 自定义随机生成点击访问事件的数据源
public class ClickSource implements SourceFunction<Event> {
    // 定义一个标志位,用来控制数据的生成
    private Boolean running = true;

    @Override
    public void run(SourceContext<Event> sourceContext) throws Exception {
        Random random = new Random();
        // 定义随机数据选取的范围
        String[] users = {"Mary", "Bob"};
        String[] urls = {"./home", "./cart", "./fav", "./prod?id=1", "./prod?id=2"};

        while (running) {
            String user = users[random.nextInt(users.length)];
            String url = urls[random.nextInt(urls.length)];
            long ts = Calendar.getInstance().getTimeInMillis();
            sourceContext.collect(new Event(user,url,ts));
            // 通过数据源传递水位线
            sourceContext.emitWatermark(new Watermark(ts));
            // 隔1s生成一次数据
            Thread.sleep(1000);
        }
    }
    @Override
    public void cancel() {
        running = false;
    }
}

11.水位线传递规则

水位线的传递分为两种情况:

(1)上游任务只有一个并行子任务的时候,通过广播的形式传递给下游所有子任务

(2)上游任务有多个并行子任务的时候,多个子任务时间进展可能会不一致,要是上游两个不同子任务的不同水位线通过广播的方式传递给下游子任务,下游子任务接收到了来自上游的两个不一样的水位线,那么应该以谁为准呢?

这种情况也就两种策略,要么取时间最大的水位线,要么取时间最小的水位线,这种情况应该以最小那个作为当前任务的事件时钟,因为最小的和最大的水位线中间这个间隔时间可能还有数据会传递过来,要是取最大时间的水位线,就可能导致会丢数据。

image-20221216231731696

转载请注明:西门飞冰的博客 » Flink 中的时间语义和水位线

喜欢 (0)or分享 (0)