由于blog各种垃圾评论太多,而且本人审核评论周期较长,所以懒得管理评论了,就把评论功能关闭,有问题可以直接qq骚扰我

Flink 多流转换分流合流

大数据 西门飞冰 777℃
[隐藏]

1.介绍

在Flink中无论是基本的简单转换和聚合,还是基于窗口的计算,我们都是针对一条流上的数据进行处理的。而在实际应用中,可能需要将不同来源的数据连接合并在一起处理,也有可能需要将一条流拆分开,所以经常会有对多条流进行处理的场景。

简单划分的话,多流转换可以分为“分流”和“合流”两大类。

2.分流

所谓”分流”,就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream,得到完全平等的多个子DataStream。一般来说,我们会定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里。

2.1.使用Filter简单实现

其实根据条件筛选数据的需求,本身非常容易实现:只要针对同一条流多次独立调用.filter()方法进行筛选,就可以得到拆分之后的流了。

public class SimpleSplitStreamTest {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<Event> stream = env.addSource(new ClickSource());

        // 按照用户拣选分流,用filter实现
        SingleOutputStreamOperator<Event> aliceStream = stream.filter(value -> value.user.equals("Alice"));
        SingleOutputStreamOperator<Event> bobStream = stream.filter(value -> value.user.equals("Bob"));

        SingleOutputStreamOperator<Event> elseStream = stream.filter(value -> !value.user.equals("Alice") && !value.user.equals("Bob"));

        aliceStream.print("alice");
        bobStream.print("bob");
        elseStream.print("else");

        env.execute();
    }
}

方案缺点:每一个stream都有一个filter,相当于重复的数据被过滤处理了3次,然后对每一份数据分别做筛选,而且每多一条流就要被多复制一次数据,效率比较低效

2.2.使用侧输出流实现

使用侧数据流的好处相对于filter而言,基本思路其实就是在一条流中,按照给定的筛选条件,给数据分类“打标签”;然后基于这个标签将数据分到对应流中。

使用侧输出流也是Flink官方推荐的用法。

代码示例如下:

public class SideOutputSplitStreamTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<Event> stream = env.addSource(new ClickSource());

        // 定义输出标签,用来标识侧输出流
        OutputTag<Event> aliceTag = new OutputTag<Event>("alice") {};
        OutputTag<Tuple2<String, String>> bobTag = new OutputTag<Tuple2<String, String>>("bob") {};

        // 使用处理函数的侧输出流功能实现分流
        SingleOutputStreamOperator<Event> elseStream = stream.process(new ProcessFunction<Event, Event>() {
            @Override
            public void processElement(Event value, ProcessFunction<Event, Event>.Context ctx, Collector<Event> out) throws Exception {
                // 根据user做拣选分配
                if (value.user.equals("Alice")) {
                    ctx.output(aliceTag, value);
                } else if (value.user.equals("Bob")) {
                    ctx.output(bobTag, Tuple2.of(value.user, value.url + "," + value.timestamp));
                } else {
                    out.collect(value);
                }
            }
        });

        elseStream.print("else");
        elseStream.getSideOutput(aliceTag).print("alice");
        elseStream.getSideOutput(bobTag).print("bob");

        env.execute();
    }
}

3.合流_Union

最简单的合流操作,就是直接将多条流合在一起,叫作流的“联合”(union)。联合操作要求流中的数据类型必须相同,合并之后的新流会包括所有流中的元素,数据类型不变。

在代码中,我们只要基于DataStream直接调用.union()方法,传入其他DataStream作为参数,就可以实现流的联合了;得到的依然是一个DataStream:

stream1.union(stream2, stream3, ...)

代码示例:

public class UnionTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 通过集合模拟两条流
        DataStreamSource<Integer> stream1 = env.fromElements(1, 2, 3, 4);
        DataStreamSource<String> stream2 = env.fromElements("a", "b", "c", "d", "e");

        // 通过union完成两条流的合流操作
        DataStream<String> unionStream = stream1.map(value -> "" + value).union(stream2);

        unionStream.print();

        env.execute();
    }
}

说明:事件时间语义下,根据并行任务水位线传递规则,多条流的合并,水位线以最小的那个为准。

4.测试水位线传递规则

测试代码:

public class UnionTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 测试水位线传递规则
        SingleOutputStreamOperator<Event> stream1 = env.socketTextStream("hadoop01", 7777)
                .map(value -> {
                    String[] fields = value.split(",");
                    return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim()));
                })
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner((element, recordTimestamp) -> element.timestamp)
                );

        SingleOutputStreamOperator<Event> stream2 = env.socketTextStream("hadoop01", 7778)
                .map(value -> {
                    String[] fields = value.split(",");
                    return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim()));
                })
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner((element, recordTimestamp) -> element.timestamp)
                );

        // 通过union完成两条流的合流操作            
        stream1.union(stream2)
                .process(new ProcessFunction<Event, String>() {
                    @Override
                    public void processElement(Event value, ProcessFunction<Event, String>.Context ctx, Collector<String> out) throws Exception {
                        out.collect(value.user + ", " + value.url + ", " + value.timestamp);
                        out.collect("当前水位线为:" + ctx.timerService().currentWatermark());
                    }
                })
                .print();

        env.execute();
    }
}

第一波:

水位线传递测试:在一个数据流中传递两条数据

image-20221220161700250

程序输出结果:我们看到第一条水位线的时间是负数,这个很正常,因为第一条数据来的时候还没有生成水位线,所以获取不到之前水位线的时间;第二条数据的水位线时间还是负数,因为我们另外一条流没有输出传递进来,根据水位线传递规则,多条流的合并,水位线以最小的那个为准。要是另外一条流一直没有数据进入推进水位线的话,那么合并后的流,水位线会一直不变。

image-20221220161642788

当两条流都有了数据,在看水位线,还是以最小的为准。

image-20221220162417202

image-20221220162521723

5.合流_Connect

流的联合虽然简单,不过受限于数据类型不能改变,灵活性大打折扣,所以实际应用较少出现。除了联合(union),Flink还提供了另外一种方便的合流操作——连接(connect)。

Connect相当于把两条流放在了一起,但是没有合并成一个流,因为数据类型不一样,没法直接合并成一个流。而是两个流进行连接,得到一个ConnectedStreams。连接流可以看成是两条流形式上的“统一”,被放在了一个同一个流中;事实上内部仍保持各自的数据形式不变,彼此之间是相互独立的。

最后这两个流里的数据只要能汇合到一个数据流中就可以了。

image-20221220164557378

示例代码:connect基本用法

public class ConnectTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<Integer> stream1 = env.fromElements(1, 2, 3, 4);
        DataStreamSource<String> stream2 = env.fromElements("a", "b", "c", "d", "e");

        // 用connect做一般化的连接
        SingleOutputStreamOperator<Tuple2<String, Integer>> tupleResult = stream1.connect(stream2)
                .map(new CoMapFunction<Integer, String, Tuple2<String, Integer>>() {
                    // 每来一个第一条流里面的数据调用一下map1
                    @Override
                    public Tuple2<String, Integer> map1(Integer value) throws Exception {
                        return Tuple2.of("number", value);
                    }
                    // 每来一个第二条流里面的数据调用一下map2
                    @Override
                    public Tuple2<String, Integer> map2(String value) throws Exception {
                        return Tuple2.of(value, 0);
                    }
                });

        tupleResult.print();

        env.execute();
    }
}

5.1.Connect做inner join

Connect 是一个更加通用的连接两条流的方式,上面的案例相当于一般化连接起来,转换成相同的二元组输出就完事了。

我们在这里使用Connect实现一个类似于两张表做join的功能,我们知道在数据库中两张表做join需要一个连接条件,那么在flink 两个流中使用连接条件进行关联就需要用到keyBy来进行实现。

代码示例:

public class InnerJoinSimulateTest_WithConnect {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 需要定义两条流,有相同的key
        DataStreamSource<Event> stream1 = env.fromElements(
                new Event("Alice", "./home", 1000L),
                new Event("Bob", "./cart", 2000L),
                new Event("Alice", "./cart", 3000L),
                new Event("Alice", "./fav", 4000L),
                new Event("Bob", "./home", 5000L),
                new Event("Cary", "./prod?id=1", 6000L)
        );

        DataStreamSource<Tuple2<String, Integer>> stream2 = env.fromElements(
                Tuple2.of("Alice", 35),
                Tuple2.of("Bob", 20),
                Tuple2.of("Alice", 17),
                Tuple2.of("Mary", 50)
        );

        // 用connect连接两条流,以user作为key连接
        stream1.keyBy(value -> value.user)
                .connect(stream2.keyBy(value -> value.f0))
                .flatMap(new RichCoFlatMapFunction<Event, Tuple2<String, Integer>, String>() {
                    // 使用列表状态,保存已经到来的数据
                    ListState<Event> EventListState;
                    ListState<Tuple2<String, Integer>> tuple2ListState;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        EventListState = getRuntimeContext().getListState(new ListStateDescriptor<Event>("Event-list", Event.class));
                        tuple2ListState = getRuntimeContext().getListState(new ListStateDescriptor<Tuple2<String, Integer>>("tuple-list", Types.TUPLE(Types.STRING, Types.INT)));
                    }

                    @Override
                    public void flatMap1(Event value, Collector<String> out) throws Exception {
                        // 第一条流数据到来,遍历第二条流数据保存的列表状态,匹配输出
                        for (Tuple2<String, Integer> tuple2 : tuple2ListState.get()) {
                            out.collect(value + " --> " + tuple2);
                        }

                        // 保存数据到对应的列表状态
                        EventListState.add(value);
                    }

                    @Override
                    public void flatMap2(Tuple2<String, Integer> value, Collector<String> out) throws Exception {
                        for (Event Event : EventListState.get()) {
                            out.collect(Event + " --> " + value);
                        }

                        // 保存数据到对应的列表状态
                        tuple2ListState.add(value);
                    }
                })
                .print();

        env.execute();
    }
}

6.合流总结

因为connect更加通用,且对类型没有要求,后面还能指定连接的key实现类似于inner join的功能。所以,一般用到合流操作的时候,都用connect,union用的很少。

union 对于connect的优势在于,可以连接多个流,connect仅能连接两个流。如果要用connect连接多条数据类型不一样的流,那就只能两条流先连接,连接处理完之后在和第三条流连接。

7.应用案例_实时对账

7.1.需求说明

实时对账的需求,就是app的支付操作事件和第三方的支付操作事件的一个双流Join。App的支付事件和第三方的支付事件将会互相等待5秒钟,如果等不来对应的支付事件,那么就输出报警信息。

7.2.代码实现

订单事件流POJO:

public class OrderEvent {
    public String user;
    public String orderId;
    public String eventType;
    public Long ts;

    public OrderEvent() {
    }

    public OrderEvent(String user, String orderId, String eventType, Long ts) {
        this.user = user;
        this.orderId = orderId;
        this.eventType = eventType;
        this.ts = ts;
    }

    @Override
    public String toString() {
        return "OrderEvent{" +
                "user='" + user + '\'' +
                ", orderId='" + orderId + '\'' +
                ", eventType='" + eventType + '\'' +
                ", ts=" + ts +
                '}';
    }
}

第三方支付事件流POJO:

public class ThirdPartyPayEvent {
    public String orderId;
  	// 支付的平台
    public String platform;
    public Long ts;

    public ThirdPartyPayEvent() {
    }

    public ThirdPartyPayEvent(String orderId, String platform, Long ts) {
        this.orderId = orderId;
        this.platform = platform;
        this.ts = ts;
    }

    @Override
    public String toString() {
        return "ThirdPartyPayEvent{" +
                "orderId='" + orderId + '\'' +
                ", platform='" + platform + '\'' +
                ", ts=" + ts +
                '}';
    }
}

具体实现代码:

public class BillCheckExample {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 读取app订单事件流
        SingleOutputStreamOperator<OrderEvent> orderStream = env.fromElements(
                        new OrderEvent("Alice", "order-1", "create", 1000L),
                        new OrderEvent("Bob", "order-2", "create", 2000L),
                        new OrderEvent("Alice", "order-1", "modify", 5000L),
                        new OrderEvent("Alice", "order-1", "pay", 6000L),
                        new OrderEvent("Bob", "order-2", "pay", 8000L),
                        new OrderEvent("Alice", "order-3", "create", 9000L)
                )
                .assignTimestampsAndWatermarks(WatermarkStrategy.<OrderEvent>forBoundedOutOfOrderness(Duration.ofSeconds(0))
                        .withTimestampAssigner((element, recordTimestamp) -> element.ts)
                );

        // 读取第三方平台支付事件流
        SingleOutputStreamOperator<ThirdPartyPayEvent> thirdPartyPayStream = env.fromElements(
                new ThirdPartyPayEvent("order-1", "alipay", 9000L),
                new ThirdPartyPayEvent("order-3", "wechat", 10000L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.<ThirdPartyPayEvent>forBoundedOutOfOrderness(Duration.ofSeconds(0))
                .withTimestampAssigner((element, recordTimestamp) -> element.ts)
        );

        // 定义输出标签,表示未匹配报警输出流
        OutputTag<String> outputTag = new OutputTag<String>("warning") {
        };

        // 连接两条流,选取pay事件,按照订单id分组进行对账匹配
        SingleOutputStreamOperator<String> payedStream = orderStream.filter(value -> value.eventType.equals("pay"))
                .connect(thirdPartyPayStream)
                .keyBy(value -> value.orderId, value -> value.orderId)
                .process(new BillCheck());

        payedStream.print("payed");
        payedStream.getSideOutput(outputTag).print("warning");

        env.execute();
    }

    public static class BillCheck extends KeyedCoProcessFunction<String, OrderEvent, ThirdPartyPayEvent, String>{
        ValueState<OrderEvent> orderEventValueState;
        ValueState<ThirdPartyPayEvent> thirdPartyPayEventValueState;

        @Override
        public void open(Configuration parameters) throws Exception {
            orderEventValueState = getRuntimeContext().getState(new ValueStateDescriptor<OrderEvent>("order-event", OrderEvent.class));
            thirdPartyPayEventValueState = getRuntimeContext().getState(new ValueStateDescriptor<ThirdPartyPayEvent>("third-party-pay", ThirdPartyPayEvent.class));
        }

        @Override
        public void processElement1(OrderEvent orderEvent, KeyedCoProcessFunction<String, OrderEvent, ThirdPartyPayEvent, String>.Context ctx, Collector<String> out) throws Exception {
            // 来了一个订单事件,就检测另一条流事件是否到来
            ThirdPartyPayEvent thirdPartyPayEvent = thirdPartyPayEventValueState.value();

            if (thirdPartyPayEvent != null){
                // 输出匹配成功信息
                out.collect( "订单" + orderEvent.orderId + "成功支付!" + orderEvent + "-->" + thirdPartyPayEvent );
                thirdPartyPayEventValueState.clear();    // 清理状态
            } else {
                // 注册10秒后的定时器,开始等待
                ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 10000);
                orderEventValueState.update(orderEvent);    // 保存状态
            }
        }

        @Override
        public void processElement2(ThirdPartyPayEvent thirdPartyPayEvent, KeyedCoProcessFunction<String, OrderEvent, ThirdPartyPayEvent, String>.Context ctx, Collector<String> out) throws Exception {
            OrderEvent orderEvent = orderEventValueState.value();

            if (orderEvent != null){
                // 输出匹配成功信息
                out.collect( "订单" + orderEvent.orderId + "成功支付!" + orderEvent + "-->" + thirdPartyPayEvent );
                orderEventValueState.clear();    // 清理状态
            } else {
                // 注册10秒后的定时器,开始等待
                ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 10000);
                thirdPartyPayEventValueState.update(thirdPartyPayEvent);    // 保存状态
            }
        }

        @Override
        public void onTimer(long timestamp, KeyedCoProcessFunction<String, OrderEvent, ThirdPartyPayEvent, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
            OutputTag<String> outputTag = new OutputTag<String>("warning") {};

            // 判断状态是否有值,如果有值说明另一条流事件没来
            if (orderEventValueState.value() != null){
                ctx.output(outputTag, "订单" + orderEventValueState.value().orderId + "未匹配成功!第三方支付事件未到");
            }
            if (thirdPartyPayEventValueState.value() != null){
                ctx.output(outputTag, "订单" + thirdPartyPayEventValueState.value().orderId + "未匹配成功!APP订单支付事件未到");
            }
            // 清空状态
            orderEventValueState.clear();
            thirdPartyPayEventValueState.clear();
        }
    }
}

 

 

转载请注明:西门飞冰的博客 » Flink 多流转换分流合流

喜欢 (1)or分享 (0)