由于blog各种垃圾评论太多,而且本人审核评论周期较长,所以懒得管理评论了,就把评论功能关闭,有问题可以直接qq骚扰我

Flink 中的窗口

大数据 西门飞冰 105℃
[隐藏]

1.窗口的概念

我们知道Flink是一个可以处理无限流的流式处理引擎,无限流有个特点就是数据无休无止,源源不断,这种情况下要是想做统计聚合的话,就没有数据的尽头,因为数据在一直不停的更新。

在这种情况下,业务就想看之前一个小时或者一天的数据,就需要人为的给无界流增加一个界限,一个范围。这就相当于把无界流切割成了有界的数据块,就成了有界流。用在具体的API操作上,有界的数据块其实就是一个窗口,这个窗口里面可以包含很多数据

2.窗口的分类

在Flink中,窗口的应用非常灵活,我们可以使用各种不同类型的窗口来实现需求。接下来我们就从不同的角度,对Flink中内置的窗口做一个分类说明。

1、按照驱动类型分类

窗口本身是截取有界数据的一种方式,所以窗口一个非常重要的信息其实就是“怎样截取数据”。换句话说,就是以什么标准来开始和结束数据的截取,我们把它叫作窗口的“驱动类型”。

(1)时间窗口

时间窗口以时间点来定义窗口的开始(start)和结束(end),所以截取出的就是某一时间段的数据。到达结束时间时,窗口不再收集数据,触发计算输出结果,并将窗口关闭销毁。

用结束时间减去开始时间,得到这段时间的长度,就是窗口的大小(window size)。这里的时间可以是不同的语义,所以我们可以定义处理时间窗口和事件时间窗口。

(2)计数窗口

计数窗口相比时间窗口就更加简单,我们只需指定窗口大小,就可以把数据分配到对应的窗口中了。

2、按照窗口分配数据的规则分类

时间窗口和计数窗口,只是对窗口的一个大致划分;在具体应用时,还需要定义更加精细的规则,来控制数据应该划分到哪个窗口中去。不同的分配数据的方式,就可以有不同的功能应用。

(1)滚动窗口

滚动窗口有固定的大小,是一种对数据进行“均匀切片”的划分方式。窗口之间没有重叠,也不会有间隔,是“首尾相接”的状态。这是最简单的窗口形式,每个数据都会被分配到一个窗口,而且只会属于一个窗口。

滚动窗口可以基于时间定义,也可以基于数据个数定义;需要的参数只有一个,就是窗口的大小(window size)。比如我们可以定义一个长度为1小时的滚动时间窗口,那么每个小时就会进行一次统计;或者定义一个长度为10的滚动计数窗口,就会每10个数进行一次统计。

image-20221220105651381

滚动窗口应用非常广泛,它可以对每个时间段做聚合统计,很多BI分析指标都可以用它来实现。

(2)滑动窗口

与滚动窗口类似,滑动窗口的大小也是固定的。区别在于,窗口之间并不是首尾相接的,而是可以“错开”一定的位置,一个数据可以属于多个不同的窗口。

和滚动窗口相比,收集的数据量是差不多的,但是输出的频率更高

定义滑动窗口的参数有两个:除去窗口大小(window size)之外,还有一个“滑动步长”(window slide),它其实就代表了窗口计算的频率。滑动步长越短,输出频率越高

image-20221220110036662

在一些场景中,可能需要统计最近一段时间内的指标,而结果的输出频率要求又很高,甚至要求实时更新,比如股票价格的24小时涨跌幅统计,或者基于一段时间内行为检测的异常报警。这时滑动窗口无疑就是很好的实现方式。

(3)会话窗口

会话窗口顾名思义,是基于“会话”(session)来来对数据进行分组的。与滑动窗口和滚动窗口不同,会话窗口只能基于时间来定义

对于会话窗口而言,最重要的参数就是会话的超时时间,也就是两个会话窗口之间的最小距离。如果相邻两个数据到来的时间间隔(Gap)小于指定的大小(size),那说明还在保持会话,它们就属于同一个窗口;如果gap大于size,那么新来的数据就应该属于新的会话窗口,而前一个窗口就应该关闭了。在具体实现上,我们可以设置静态固定的大小(size),也可以通过一个自定义的提取器(gap extractor)动态提取最小间隔gap的值。

image-20221220110821702

我们可以看到,与前两种窗口不同,会话窗口的长度不固定,起始和结束时间也是不确定的,各个分区之间窗口没有任何关联。会话窗口之间一定是不会重叠的,而且会留有至少为size的间隔(session gap)。

在一些类似保持会话的场景下,往往可以使用会话窗口来进行数据的处理统计。

(4)全局窗口

这种窗口全局有效,会把相同key的所有数据都分配到同一个窗口中。这种窗口没有结束的时候,默认是不会做触发计算的。如果希望它能对数据进行计算处理,还需要自定义“触发器”(Trigger)。

image-20221220111100852

全局窗口没有结束的时间点,所以一般在希望做更加灵活的窗口处理时自定义使用。Flink中的计数窗口(Count Window),底层就是用全局窗口实现的。

总结窗口分类结合起来看,一共就五种类型

  • 滚动时间窗口
  • 滑动时间窗口
  • 滚动记数窗口
  • 滑动计数窗口
  • 会话窗口(会话窗口只能基于时间)

3.窗口API概述

窗口API其实是基于datastream调用各种方法,这些方法就叫做窗口API

在定义窗口操作之前,首先需要确定,到底是基于按键分区的数据流KeyedStream来开窗,还是直接在没有按键分区的DataStream上开窗。也就是说,在调用窗口算子之前,是否有keyBy操作。

没有按键分区不做KeyBy,就是把当前数据流里面的所有数据都放到了一个窗口里面进行处理计算,就相当于并行度变成了1。除非特殊情况,否则一般不推荐使用。所以一般在Flink中调用的时候要先做一个KeyBy分组,这也是Flink比较推荐大家的一个做法

做过KeyBy之后在开窗,那么接下来所有的操作都只针对当前的Key有效

在代码中实现一个窗口操作,主要有两部分:窗口分配器窗口函数

stream.keyBy(<key selector>)
  		 // 窗口分配器:窗口的类型
       .window(<window assigner>)		
    	 // 窗口函数:窗口内数据执行的统计计算
       .aggregate(<window function>)	

4.测试数据说明

说明:本文代码案例使用此自定义数据源进行测试

// 自定义随机生成点击访问事件的数据源
public class ClickSource implements SourceFunction<Event> {
    // 定义一个标志位,用来控制数据的生成
    private Boolean running = true;

    @Override
    public void run(SourceContext<Event> sourceContext) throws Exception {
        Random random = new Random();
        // 定义随机数据选取的范围
        String[] users = {"Mary", "Bob"};
        String[] urls = {"./home", "./cart", "./fav", "./prod?id=1", "./prod?id=2"};

        while (running) {
            String user = users[random.nextInt(users.length)];
            String url = urls[random.nextInt(urls.length)];
            long ts = Calendar.getInstance().getTimeInMillis();
            sourceContext.collect(new Event(user,url,ts));
            // 通过数据源传递水位线
            sourceContext.emitWatermark(new Watermark(ts));
            // 发出数据的时候给数据打上时间戳
            sourceContext.collectWithTimestamp(new Event(user,url,ts),ts);
            // 隔1s生成一次数据
            Thread.sleep(1000);
        }
    }
    @Override
    public void cancel() {
        running = false;
    }
}

自定义的Event POJO类

public class Event {
    public String user;
    public String url;
    public Long timestamp;

    public Event() {
    }

    public Event(String user, String url, Long timestamp) {
        this.user = user;
        this.url = url;
        this.timestamp = timestamp;
    }

    @Override
    public String toString() {
        return "Event{" +
                "user='" + user + '\'' +
                ", url='" + url + '\'' +
                ", timestamp=" + new Timestamp(timestamp) +
                '}';
    }
}

5.窗口分配器

定义窗口分配器(Window Assigners)是构建窗口算子的第一步,它的作用就是定义数据应该被”分配”到哪个窗口。所以可以说,窗口分配器其实就是在指定窗口的类型。代码配置方式如下:

public class WindowTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(0))
                        .withTimestampAssigner((element, recordTimestamp) -> element.timestamp)
                );

        // 定义窗口
        SingleOutputStreamOperator<Event> result = stream
          			// 非按键分区窗口操作:,windowAll本身就是一个非并行的操作,手动调大窗口算子的并行度也是无效的。
                // .windowAll()
                .keyBy(value -> value.user)
          			// 滚动事件时间窗口:传入的参数表示滚动窗口的大小
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))    
          				// 滚动事件时间窗口:第二个参数则表示窗口起始点的偏移量
                  //.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))    
          				// 滚动处理时间窗口
                  //.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))  
          				// 滑动事件时间窗口:第一个参数表示滑动窗口的大小,第二个表示滑动窗口的滑动步长
                  //.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))  
          				// 事件时间会话窗口
                  //.window(EventTimeSessionWindows.withGap(Time.seconds(10))) 
          				// 滚动计数窗口:当窗口中元素数量达到10的时候,就会触发计算执行并关闭窗口
                   //.countWindow(10)    
          				 // 滑动计数窗口: 窗口长度为10,滑动步长为3,每个窗口统计10个数据,每隔3个数据就统计输出一次结果
                   //.countWindow(10, 2)   
                .sum("timestamp");

        result.print();

        env.execute();
    }
}

6.窗口函数整体介绍

定义了窗口分配器,我们只是知道了数据属于哪个窗口,可以将数据收集起来了;至于收集起来到底要做什么,其实还完全没有头绪。所以在窗口分配器之后,必须再接上一个定义窗口如何进行计算的操作,这就是所谓的“窗口函数”

窗口里面保存的可能是数据,也可能是中间聚合的状态

窗口函数应用案例:统计每10秒内用户访问的次数

public class WindowTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 添加数据源
        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
                // 生成水位线
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(0))
                        // 提取数据中的事件时间
                        .withTimestampAssigner((element, recordTimestamp) -> element.timestamp)
                );

        // 计算10s内每个用户的访问次数
        SingleOutputStreamOperator<Tuple2<String, Long>> userCountPerWindowStream = stream.map(value -> Tuple2.of(value.user, 1L))
                .returns(Types.TUPLE(Types.STRING, Types.LONG))
                // 基于用户名进行keyBy分组
                .keyBy(value -> value.f0)
                 // 滚动事件时间窗口
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                 // 通过sum函数统计在窗口内用户访问的次数
                .sum(1);

        userCountPerWindowStream.print();

        env.execute();
    }
}

7.增量聚合函数

窗口将数据收集起来,最基本的处理操作当然就是进行聚合。我们可以每来一个数据就在之前结果上聚合一次,这就是“增量聚合”。

典型的增量聚合函数有两个:ReduceFunction和AggregateFunction。

7.1.ReduceFunction

上面的窗口函数应用案例,只能通过sum对数据进行增量聚合,我们需求现在换成统计用户频次和访问过的URL用sum就不行了。下面是使用ReduceFunction进行增量聚合的代码示例。

public class ReduceFunctionTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 添加数据源
        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
                // 生成水位线
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(0))
                        // 提取数据中的事件时间
                        .withTimestampAssigner((element, recordTimestamp) -> element.timestamp)
                );

        stream.print("input");

        // 统计每10s内每个用户的访问频次,和访问过的url
        SingleOutputStreamOperator<Tuple3<String, String, Long>> userCountPerWindowStream = stream.map(value -> Tuple3.of(value.user, value.url, 1L))
                .returns(Types.TUPLE(Types.STRING, Types.STRING, Types.LONG))
                 // 基于用户名进行keyBy分组
                .keyBy(value -> value.f0)
                 // 滚动事件时间窗口
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                // value1 是之前聚合好的数据,value2是新来的数据,他们两个进行增量聚合
                .reduce((value1, value2) -> Tuple3.of(value1.f0, value1.f1 + "," + value2.f1, value1.f2 + 1));

        userCountPerWindowStream.print();

        env.execute();
    }
}

7.2.AggregateFunction

ReduceFunction可以解决大多数归约聚合的问题,但是这个接口有一个限制,就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样,aggregate就突破了这个限制,可以定义更加灵活的窗口聚合操作。

AggregateFunction 更加灵活和强大,可以完全覆盖Reduce Function的功能,实际应用中一般非常简单才会用ReduceFunction,复杂一点的都是直接用AggregateFunction

AggregateFunction可以看作是ReduceFunction的通用版本,这里有三种类型:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。输入类型IN就是输入流中元素的数据类型;累加器类型ACC则是我们进行聚合的中间状态类型;而输出类型当然就是最终计算结果的类型了。

接口中有四个方法:

  • createAccumulator():创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次。
  • add():将输入的元素添加到累加器中。
  • getResult():从累加器中提取聚合的输出结果。
  • merge():合并两个累加器,并将合并后的状态作为一个累加器返回。

AggregateFunction的工作原理是:首先调用createAccumulator()为任务初始化一个状态(累加器);而后每来一个数据就调用一次add()方法,对数据进行聚合,得到的结果保存在状态中;等到了窗口需要输出时,再调用getResult()方法得到计算结果。很明显,与ReduceFunction相同,AggregateFunction也是增量式的聚合;而由于输入、中间状态、输出的类型可以不同,使得应用更加灵活方便。

示例代码如下:

public class AggregateFunctionTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(0))
                        .withTimestampAssigner((element, recordTimestamp) -> element.timestamp)
                );

        stream.print("input");

        // 统计10秒内每个用户的访问频次
        SingleOutputStreamOperator<Tuple2<String, Long>> userCountPerWindowStream = stream.keyBy(value -> value.user)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .aggregate(new AggregateFunction<Event, Tuple2<String, Long>, Tuple2<String, Long>>() {
                    // 创建一个累加器(实例任务创建时调用)
                    @Override
                    public Tuple2<String, Long> createAccumulator() {
                        return Tuple2.of("", 0L);
                    }

                    // 更改我们当前的累加器(每来一个元素之后调用)
                    @Override
                    public Tuple2<String, Long> add(Event value, Tuple2<String, Long> accumulator) {
                        return Tuple2.of(value.user, accumulator.f1 + 1);
                    }

                    // 窗口输出结果(窗口触发计算的时候调用)
                    @Override
                    public Tuple2<String, Long> getResult(Tuple2<String, Long> accumulator) {
                        return accumulator;
                    }

                    // 窗口合并
                    @Override
                    public Tuple2<String, Long> merge(Tuple2<String, Long> a, Tuple2<String, Long> b) {
                        return Tuple2.of(a.f0, a.f1 + b.f1);
                    }
                });

        userCountPerWindowStream.print();

        env.execute();
    }
}

Flink也为窗口的聚合提供了一系列预定义的简单聚合方法,可以直接基于WindowedStream调用。主要包括.sum()/max()/maxBy()/min()/minBy(),与KeyedStream的简单聚合非常相似。它们的底层,其实都是通过AggregateFunction来实现的。

应用案例:实现一个自定义的AggregateFunction统计PV和UV

public class PvUvExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 添加数据源
        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
                // 生成水位线
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        // 提取时间戳
                        .withTimestampAssigner((element, recordTimestamp) -> element.timestamp)
                );

        // 方便调试,每来一条数据就打印一次
        stream.print("input");

        // 开窗口:统计10s内的PV、UV,并输出比值
        stream.keyBy(value -> true)		// keyBy true表示,按照来的每一个值分区
                // 设置事件时间的滚动窗口
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                // 窗口函数传入自定义的增量聚合函数
                .aggregate(new PvUv())
                .print();
        // 提交执行
        env.execute();
    }

    // 自定义增量聚合函数,实现PVUV的统计
    public static class PvUv implements AggregateFunction<Event, Tuple2<Long, HashSet<String>>, String> {
        // 创建累加器
        @Override
        public Tuple2<Long, HashSet<String>> createAccumulator() {
            // 用Long值保存pv,用HashSet保存所有用户名并去重
            return Tuple2.of(0L, new HashSet<>());
        }
        // 将输入元素添加到累加器中
        @Override
        public Tuple2<Long, HashSet<String>> add(Event value, Tuple2<Long, HashSet<String>> accumulator) {
            // 增量聚合,每来一个数据,pv加1,并把user保存到HashSet
            accumulator.f1.add(value.user);
            return Tuple2.of(accumulator.f0 + 1, accumulator.f1);
        }
        // 从累加器中提取聚合的输出结果
        @Override
        public String getResult(Tuple2<Long, HashSet<String>> accumulator) {
            Long pv = accumulator.f0;
            int uv = accumulator.f1.size();
            return "PV: " + pv + " UV: " + uv + " 比值:" + ((double) pv / uv);
        }
        // 合并累加器,这里用不到
        @Override
        public Tuple2<Long, HashSet<String>> merge(Tuple2<Long, HashSet<String>> a, Tuple2<Long, HashSet<String>> b) {
            return null;
        }
    }
}

8.全窗口函数

有些场景下,我们要做的计算必须基于全部的数据才有效,这时做增量聚合就没什么意义了;另外,输出的结果有可能要包含上下文中的一些信息(比如窗口的起始时间),这是增量聚合函数做不到的。

所以,我们还需要有更丰富的窗口计算方式。窗口操作中的另一大类就是全窗口函数。与增量聚合函数不同,全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算。

在Flink中,全窗口函数也有两种:WindowFunction和ProcessWindowFunction。

不过WindowFunction能提供的上下文信息较少,也没有更高级的功能。事实上,它的作用可以被ProcessWindowFunction全覆盖,之后可能会逐渐弃用。一般在实际应用,直接使用ProcessWindowFunction就可以了。

8.1.WindowFunction

WindowFunction字面上就是“窗口函数”。我们可以基于WindowedStream调用.apply()方法,传入一个WindowFunction的实现类。这个类中可以获取到包含窗口所有数据的可迭代集合(Iterable),还可以拿到窗口(Window)本身的信息。

public class WindowFunctionTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(0))
                        .withTimestampAssigner((element, recordTimestamp) -> element.timestamp)
                );

        stream.print("input");

        // 统计每10s内每个用户的访问频次,使用全窗口函数
        SingleOutputStreamOperator<Tuple4<Long, Long, String, Integer>> userCountPerWindowStream = stream.keyBy(value -> value.user)
                // 设置滚动事件时间窗口
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                // 处理计算
                .apply(new WindowFunction<Event, Tuple4<Long, Long, String, Integer>, String, TimeWindow>() {
                    @Override
                    public void apply(String user, TimeWindow window, Iterable<Event> input, Collector<Tuple4<Long, Long, String, Integer>> out) throws Exception {
                        // 获取窗口的起始点和结束点
                        long start = window.getStart();
                        long end = window.getEnd();

                        // 统计用户访问频次
                        int count = 0;
                        for (Event clickEvent : input) {
                            count++;
                        }
                        // 输出计算结果
                        out.collect(Tuple4.of(start, end, user, count));
                    }
                });

        // 按照窗口统计当前最活跃用户
        userCountPerWindowStream.print("user-count");
        userCountPerWindowStream.keyBy(value -> value.f0)
                .maxBy(3)
                .print();

        env.execute();
    }
}

8.2.ProcessWindowFunction

ProcessWindowFunction是Window API中最底层的通用窗口函数接口之所以说它“最底层”,是因为除了可以拿到窗口中的所有数据之外,ProcessWindowFunction还可以获取到一个“上下文对象”(Context)。这个上下文对象非常强大,不仅能够获取窗口信息,还可以访问当前的时间和状态信息。这里的时间就包括了处理时间(processing time)和事件时间水位线(event time watermark)。这就使得ProcessWindowFunction更加灵活、功能更加丰富,其实就是一个增强版的WindowFunction。

具体使用跟WindowFunction非常类似,我们可以基于WindowedStream调用.process()方法,传入一个ProcessWindowFunction的实现类。下面是一个统计每10s内每个用户的访问频次案例:

public class ProcessWindowFunctionTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(0))
                        .withTimestampAssigner((element, recordTimestamp) -> element.timestamp)
                );

        stream.print("input");

        // 定义一个输出标签,用于表示侧输出流
        OutputTag<String> timeTag = new OutputTag<String>("time"){};

        // 统计每10s内每个用户的访问频次,使用全窗口函数
        SingleOutputStreamOperator<Tuple4<Long, Long, String, Integer>> userCountPerWindowStream = stream.keyBy(value -> value.user)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .process(new ProcessWindowFunction<Event, Tuple4<Long, Long, String, Integer>, String, TimeWindow>() {
                    @Override
                    public void process(String user, ProcessWindowFunction<Event, Tuple4<Long, Long, String, Integer>, String, TimeWindow>.Context context, Iterable<Event> elements, Collector<Tuple4<Long, Long, String, Integer>> out) throws Exception {
                        // 获取窗口的起始点和结束点
                        long start = context.window().getStart();
                        long end = context.window().getEnd();

                        // 统计用户访问频次
                        int count = 0;
                        for (Event event : elements) {
                            count++;
                        }

                        // 输出计算结果
                        out.collect(Tuple4.of(start, end, user, count));

                        // 获取当前的处理时间和水位线
                        long processingTime = context.currentProcessingTime();
                        long currentWatermark = context.currentWatermark();
                        // 用侧输出流输出
                        context.output(timeTag, "当前处理时间:" + processingTime);
                        context.output(timeTag, "当前水位线:" + currentWatermark);
                    }
                });

        userCountPerWindowStream.print("user-count");
        userCountPerWindowStream.getSideOutput(timeTag).print("time");

        // 按照窗口统计当前最活跃用户
        userCountPerWindowStream.keyBy(value -> value.f0)
                .maxBy(1)
                .print();

        env.execute();
    }
}

9.增量聚合函数和全窗口函数的结合

先来总结一下增量聚合函数和全窗口函数的优缺点:

增量聚合函数:

  • 优点:来一条数据处理一条数据,效率高,返回结果速度快
  • 缺点:有些信息是获取不到的,比如窗口的信息,上下文里面的信息等底层数据

全窗口函数:

  • 优点:可以获取窗口的信息,上下文里面的信息等底层数据
  • 缺点:到了窗口结束时间点才开始遍历计算数据,要是窗口数据量非常大,那么计算就很耗费时间,就会影响结果的实时性

在实际应用中,我们往往希望兼具这两者的优点,把它们结合在一起使用。Flink的Window API就给我们实现了这样的用法。

我们之前在调用WindowedStream的.reduce()和.aggregate()方法时,只是简单地直接传入了一个ReduceFunction或AggregateFunction进行增量聚合。除此之外,其实还可以传入第二个参数:一个全窗口函数,可以是WindowFunction或者ProcessWindowFunction。

这样调用的处理机制是:基于第一个参数(增量聚合函数)来处理窗口数据,每来一个数据就做一次聚合;等到窗口需要触发计算时,则调用第二个参数(全窗口函数)的处理逻辑输出结果。需要注意的是,这里的全窗口函数就不再缓存所有数据了,而是直接将增量聚合函数的结果拿来当作了Iterable类型的输入。

应用案例:统计10s内的页面访问频次

POJO 类定义:

public class UrlViewCount {
  	// 统计的URL
    public String url;
    // 访问的频次
    public Long count;
  	// 窗口的开始时间
    public Long start;
    // 窗口的结束时间
    public Long end;

    public UrlViewCount() {
    }

    public UrlViewCount(String url, Long count, Long start, Long end) {
        this.url = url;
        this.count = count;
        this.start = start;
        this.end = end;
    }

    @Override
    public String toString() {
        return "UrlViewCount{" +
                "url='" + url + '\'' +
                ", count=" + count +
                ", start=" + start +
                ", end=" + end +
                '}';
    }
}

flink代码:

public class UrlViewCountExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 定义数据源
        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        .withTimestampAssigner((element, recordTimestamp) -> element.timestamp)
                );

        stream.print("input");

        // 定义一个输出标签,用于表示侧输出流
        OutputTag<Event> outputTag = new OutputTag<Event>("late") {};

        // 统计10秒内,每个url的访问频次
        SingleOutputStreamOperator<UrlViewCount> urlCountStream = stream.keyBy(value -> value.url)
                // 滚动事件时间窗口
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .aggregate(new UrlCountAgg(), new UrlCountWindowResult());

        urlCountStream.print("url-count");
        urlCountStream.getSideOutput(outputTag).print("late");

        env.execute();
    }

    // 实现自定义的增量聚合函数,做一个计数统计,每来一个数据就加1
    public static class UrlCountAgg implements AggregateFunction<Event, Long, Long>{
        // 创建一个初始值为0的累加器
        @Override
        public Long createAccumulator() {
            return 0L;
        }
        // 实现计数,每来一个数据就加一
        @Override
        public Long add(Event value, Long accumulator) {
            return accumulator + 1;
        }

        // 返回计数结果
        @Override
        public Long getResult(Long accumulator) {
            return accumulator;
        }

        // 用不到
        @Override
        public Long merge(Long a, Long b) {
            return null;
        }
    }

    // 实现自定义的全窗口函数,包装窗口信息和url信息
    public static class UrlCountWindowResult extends ProcessWindowFunction<Long, UrlViewCount, String, TimeWindow>{
        @Override
        public void process(String url, ProcessWindowFunction<Long, UrlViewCount, String, TimeWindow>.Context context, Iterable<Long> elements, Collector<UrlViewCount> out) throws Exception {
            // 从增量聚合函数的结果中获取count值(增量聚合函数的结果是全窗口函数的输入)
            Long count = elements.iterator().next();
            // 窗口的开始时间
            Long start = context.window().getStart();
            // 窗口的结束时间
            Long end = context.window().getEnd();
            // 输出结果
            out.collect(new UrlViewCount(url, count, start, end));
        }
    }
}

10.窗口处理迟到的数据

我们知道,在事件时间语义下,窗口中可能会出现数据迟到的情况。为了解决窗口迟到数据的问题,flink提供了两种解决方案,分别是允许延迟和将迟到的数据放入侧输出流。

10.1.允许延迟

Flink提供了一个特殊的接口,可以为窗口算子设置一个“允许的最大延迟”(Allowed Lateness)。也就是说,我们可以设定允许延迟一段时间,在这段时间内,窗口不会销毁,继续到来的数据依然可以进入窗口中并触发计算。直到水位线推进到了 窗口结束时间 + 延迟时间,才真正将窗口的内容清空,正式关闭窗口。

基于WindowedStream调用.allowedLateness()方法,传入一个Time类型的延迟时间,就可以表示允许这段时间内的延迟数据。

stream.keyBy(...)
     .window(TumblingEventTimeWindows.of(Time.hours(1)))
     .allowedLateness(Time.minutes(1))

比如上面的代码中,我们定义了1小时的滚动窗口,并设置了允许1分钟的延迟数据。

10.2.将迟到的数据放入侧输出流

Flink还提供了另外一种方式处理迟到数据。我们可以将未收入窗口的迟到数据,放入“侧输出流”(side output)进行另外的处理。所谓的侧输出流,相当于是数据流的一个“分支”。

基于WindowedStream调用.sideOutputLateData() 方法,就可以实现这个功能。方法需要传入一个“输出标签”(OutputTag),用来标记分支的迟到数据流。因为保存的就是流中的原始数据,所以OutputTag的类型与流中数据类型相同。

说明:迟到数据仅对事件时间语义有效。

水位线时间延迟:和窗口没有关系,全局有效,相当于逻辑时钟的表调慢了。可以理解成汽车站到了发车时间,司机多等了两分钟才发车。

窗口允许延迟:属于窗口内保障数据迟到的机制,表示窗口到了触发计算时间计算完之后不关闭这个窗口,而是继续保留指定长度的时间,在这段时间内,迟到的数据进来,依旧可以在窗口计算的结果上叠加计算。

将迟到的数据放入侧输出流:属于窗口内保障数据迟到的机制,相当于一个兜底方案,在允许延迟的情况下,一般也不会允许延迟太久,要是出现迟到数据超过了窗口允许延迟时间,则数据会丢弃。这种情况下吧数据放到侧输出流可以保证不丢失数据,但是这种情况已经没有办法直接和之前窗口的计算结果聚合到一起了,需要进行手动的合并数据。

12.窗口处理迟到数据测试

代码示例:在上面增量聚合函数和全窗口函数的结合案例中增加允许延迟和将迟到的数据放入侧输出流进行测试。为了方便验证,调整输入数据为手动录入,方便模拟乱序数据。

public class UrlViewCountExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 定义数据源
        SingleOutputStreamOperator<Event> stream = env.socketTextStream("hadoop01", 7777)
                .map(value -> {
                    String[] fields = value.split(",");
                    return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim()));
                })
          			// 水位线延迟设置2秒
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        .withTimestampAssigner((element, recordTimestamp) -> element.timestamp)
                );

        stream.print("input");

        // 定义一个输出标签,用于表示侧输出流
        OutputTag<Event> outputTag = new OutputTag<Event>("late") {};

        // 统计10秒内,每个url的访问频次
        SingleOutputStreamOperator<UrlViewCount> urlCountStream = stream.keyBy(value -> value.url)
                // 滚动事件时间窗口
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                // 允许数据延迟5秒
                .allowedLateness(Time.seconds(5))
                // 将数据放到侧输出流
                .sideOutputLateData(outputTag)
                // 增量聚合函数和全窗口函数的结合
                .aggregate(new UrlCountAgg(), new UrlCountWindowResult());

        urlCountStream.print("url-count");
        urlCountStream.getSideOutput(outputTag).print("late");

        env.execute();
    }

    // 实现自定义的增量聚合函数,做一个计数统计,每来一个数据就加1
    public static class UrlCountAgg implements AggregateFunction<Event, Long, Long>{
        // 创建一个初始值为0的累加器
        @Override
        public Long createAccumulator() {
            return 0L;
        }
        // 实现计数,每来一个数据就加一
        @Override
        public Long add(Event value, Long accumulator) {
            return accumulator + 1;
        }

        // 返回计数结果
        @Override
        public Long getResult(Long accumulator) {
            return accumulator;
        }

        // 用不到
        @Override
        public Long merge(Long a, Long b) {
            return null;
        }
    }

    // 实现自定义的全窗口函数,包装窗口信息和url信息
    public static class UrlCountWindowResult extends ProcessWindowFunction<Long, UrlViewCount, String, TimeWindow>{
        @Override
        public void process(String url, ProcessWindowFunction<Long, UrlViewCount, String, TimeWindow>.Context context, Iterable<Long> elements, Collector<UrlViewCount> out) throws Exception {
            // 从增量聚合函数的结果中获取count值(增量聚合函数的结果是全窗口函数的输入)
            Long count = elements.iterator().next();
            // 窗口的开始时间
            Long start = context.window().getStart();
            // 窗口的结束时间
            Long end = context.window().getEnd();
            // 输出结果
            out.collect(new UrlViewCount(url, count, start, end));
        }
    }
}

测试结果:

第一波:测试水位线延迟2秒效果

输入测试数据如下:

image-20221218125623992

程序输出结果:可以看到一个应该10秒触发计算并关闭的窗口,等到12秒数据到来才发生计算,且10~12秒的数据没有在1~10秒窗口内进行计算。说明水位线延迟2秒在这里生效。

image-20221218125613916第二波:测试窗口允许延迟5秒的效果

输入测试数据如下:

image-20221218130415821

程序输出结果:可以看到在22秒数据到来的时候触发了计算,但是又来了一条18秒的数据,flink在18秒迟到数据来的时候,就又计算了一次,然后又来了一条15秒的迟到数据,flink又计算了一次15秒的数据。说明窗口计算之后会继续等延迟数据5秒,5秒之内延迟数据到来,就在计算一次。

image-20221218131101995

第三波:测试迟到数据放入侧输出流

输入的测试数据如下:将时间戳推进到27秒,这个时候水位线时间是25秒,然后在放入一条15秒到数据

image-20221218131324463

程序输出结果:延时数据被放入了侧输出流,要是为了让这个数据不丢失,就需要程序员手动处理了

image-20221218131414582

转载请注明:西门飞冰的博客 » Flink 中的窗口

喜欢 (4)or分享 (0)