1.介绍
在Flink中无论是基本的简单转换和聚合,还是基于窗口的计算,我们都是针对一条流上的数据进行处理的。而在实际应用中,可能需要将不同来源的数据连接合并在一起处理,也有可能需要将一条流拆分开,所以经常会有对多条流进行处理的场景。
2.分流
所谓”分流”,就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream,得到完全平等的多个子DataStream。一般来说,我们会定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里。
2.1.
其实根据条件筛选数据的需求,本身非常容易实现:只要针对同一条流多次独立调用.filter()方法进行筛选,就可以得到拆分之后的流了。
public class SimpleSplitStreamTest { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<Event> stream = env.addSource(new ClickSource()); // 按照用户拣选分流,用filter实现 SingleOutputStreamOperator<Event> aliceStream = stream.filter(value -> value.user.equals("Alice")); SingleOutputStreamOperator<Event> bobStream = stream.filter(value -> value.user.equals("Bob")); SingleOutputStreamOperator<Event> elseStream = stream.filter(value -> !value.user.equals("Alice") && !value.user.equals("Bob")); aliceStream.print("alice"); bobStream.print("bob"); elseStream.print("else"); env.execute(); } }
2.2.使用侧输出流实现
使用侧数据流的好处相对于filter而言,基本思路其实就是在一条流中,按照给定的筛选条件,给数据分类“打标签”;然后基于这个标签将数据分到对应流中。
使用侧输出流也是Flink官方推荐的用法。
代码示例如下:
public class SideOutputSplitStreamTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<Event> stream = env.addSource(new ClickSource()); // 定义输出标签,用来标识侧输出流 OutputTag<Event> aliceTag = new OutputTag<Event>("alice") {}; OutputTag<Tuple2<String, String>> bobTag = new OutputTag<Tuple2<String, String>>("bob") {}; // 使用处理函数的侧输出流功能实现分流 SingleOutputStreamOperator<Event> elseStream = stream.process(new ProcessFunction<Event, Event>() { @Override public void processElement(Event value, ProcessFunction<Event, Event>.Context ctx, Collector<Event> out) throws Exception { // 根据user做拣选分配 if (value.user.equals("Alice")) { ctx.output(aliceTag, value); } else if (value.user.equals("Bob")) { ctx.output(bobTag, Tuple2.of(value.user, value.url + "," + value.timestamp)); } else { out.collect(value); } } }); elseStream.print("else"); elseStream.getSideOutput(aliceTag).print("alice"); elseStream.getSideOutput(bobTag).print("bob"); env.execute(); } }
3.合流_Union
最简单的合流操作,就是直接将多条流合在一起,叫作流的“联合”(union)。联合操作要求流中的数据类型必须相同,合并之后的新流会包括所有流中的元素,数据类型不变。
在代码中,我们只要基于DataStream直接调用.union()方法,传入其他DataStream作为参数,就可以实现流的联合了;得到的依然是一个DataStream:
stream1.union(stream2, stream3, ...)
代码示例:
public class UnionTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 通过集合模拟两条流 DataStreamSource<Integer> stream1 = env.fromElements(1, 2, 3, 4); DataStreamSource<String> stream2 = env.fromElements("a", "b", "c", "d", "e"); // 通过union完成两条流的合流操作 DataStream<String> unionStream = stream1.map(value -> "" + value).union(stream2); unionStream.print(); env.execute(); } }
4.测试水位线传递规则
测试代码:
public class UnionTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 测试水位线传递规则 SingleOutputStreamOperator<Event> stream1 = env.socketTextStream("hadoop01", 7777) .map(value -> { String[] fields = value.split(","); return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim())); }) .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO) .withTimestampAssigner((element, recordTimestamp) -> element.timestamp) ); SingleOutputStreamOperator<Event> stream2 = env.socketTextStream("hadoop01", 7778) .map(value -> { String[] fields = value.split(","); return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim())); }) .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO) .withTimestampAssigner((element, recordTimestamp) -> element.timestamp) ); // 通过union完成两条流的合流操作 stream1.union(stream2) .process(new ProcessFunction<Event, String>() { @Override public void processElement(Event value, ProcessFunction<Event, String>.Context ctx, Collector<String> out) throws Exception { out.collect(value.user + ", " + value.url + ", " + value.timestamp); out.collect("当前水位线为:" + ctx.timerService().currentWatermark()); } }) .print(); env.execute(); } }
第一波:
5.合流_Connect
流的联合虽然简单,不过受限于数据类型不能改变,灵活性大打折扣,所以实际应用较少出现。除了联合(union),Flink还提供了另外一种方便的合流操作——连接(connect)。
Connect相当于把两条流放在了一起,但是没有合并成一个流,因为数据类型不一样,没法直接合并成一个流。而是两个流进行连接,得到一个ConnectedStreams。连接流可以看成是两条流形式上的“统一”,被放在了一个同一个流中;事实上内部仍保持各自的数据形式不变,彼此之间是相互独立的。
示例代码:connect基本用法
public class ConnectTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<Integer> stream1 = env.fromElements(1, 2, 3, 4); DataStreamSource<String> stream2 = env.fromElements("a", "b", "c", "d", "e"); // 用connect做一般化的连接 SingleOutputStreamOperator<Tuple2<String, Integer>> tupleResult = stream1.connect(stream2) .map(new CoMapFunction<Integer, String, Tuple2<String, Integer>>() { // 每来一个第一条流里面的数据调用一下map1 @Override public Tuple2<String, Integer> map1(Integer value) throws Exception { return Tuple2.of("number", value); } // 每来一个第二条流里面的数据调用一下map2 @Override public Tuple2<String, Integer> map2(String value) throws Exception { return Tuple2.of(value, 0); } }); tupleResult.print(); env.execute(); } }
5.1.Connect做inner join
Connect 是一个更加通用的连接两条流的方式,上面的案例相当于一般化连接起来,转换成相同的二元组输出就完事了。
代码示例:
public class InnerJoinSimulateTest_WithConnect { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 需要定义两条流,有相同的key DataStreamSource<Event> stream1 = env.fromElements( new Event("Alice", "./home", 1000L), new Event("Bob", "./cart", 2000L), new Event("Alice", "./cart", 3000L), new Event("Alice", "./fav", 4000L), new Event("Bob", "./home", 5000L), new Event("Cary", "./prod?id=1", 6000L) ); DataStreamSource<Tuple2<String, Integer>> stream2 = env.fromElements( Tuple2.of("Alice", 35), Tuple2.of("Bob", 20), Tuple2.of("Alice", 17), Tuple2.of("Mary", 50) ); // 用connect连接两条流,以user作为key连接 stream1.keyBy(value -> value.user) .connect(stream2.keyBy(value -> value.f0)) .flatMap(new RichCoFlatMapFunction<Event, Tuple2<String, Integer>, String>() { // 使用列表状态,保存已经到来的数据 ListState<Event> EventListState; ListState<Tuple2<String, Integer>> tuple2ListState; @Override public void open(Configuration parameters) throws Exception { EventListState = getRuntimeContext().getListState(new ListStateDescriptor<Event>("Event-list", Event.class)); tuple2ListState = getRuntimeContext().getListState(new ListStateDescriptor<Tuple2<String, Integer>>("tuple-list", Types.TUPLE(Types.STRING, Types.INT))); } @Override public void flatMap1(Event value, Collector<String> out) throws Exception { // 第一条流数据到来,遍历第二条流数据保存的列表状态,匹配输出 for (Tuple2<String, Integer> tuple2 : tuple2ListState.get()) { out.collect(value + " --> " + tuple2); } // 保存数据到对应的列表状态 EventListState.add(value); } @Override public void flatMap2(Tuple2<String, Integer> value, Collector<String> out) throws Exception { for (Event Event : EventListState.get()) { out.collect(Event + " --> " + value); } // 保存数据到对应的列表状态 tuple2ListState.add(value); } }) .print(); env.execute(); } }
6.合流总结
因为connect更加通用,且对类型没有要求,后面还能指定连接的key实现类似于inner join的功能。所以,一般用到合流操作的时候,都用connect,union用的很少。
union 对于connect的优势在于,可以连接多个流,connect仅能连接两个流。如果要用connect连接多条数据类型不一样的流,那就只能两条流先连接,连接处理完之后在和第三条流连接。
7.应用案例_实时对账
7.1.需求说明
实时对账的需求,就是app的支付操作事件和第三方的支付操作事件的一个双流Join。App的支付事件和第三方的支付事件将会互相等待5秒钟,如果等不来对应的支付事件,那么就输出报警信息。
7.2.代码实现
订单事件流POJO:
public class OrderEvent { public String user; public String orderId; public String eventType; public Long ts; public OrderEvent() { } public OrderEvent(String user, String orderId, String eventType, Long ts) { this.user = user; this.orderId = orderId; this.eventType = eventType; this.ts = ts; } @Override public String toString() { return "OrderEvent{" + "user='" + user + '\'' + ", orderId='" + orderId + '\'' + ", eventType='" + eventType + '\'' + ", ts=" + ts + '}'; } }
第三方支付事件流POJO:
public class ThirdPartyPayEvent { public String orderId; // 支付的平台 public String platform; public Long ts; public ThirdPartyPayEvent() { } public ThirdPartyPayEvent(String orderId, String platform, Long ts) { this.orderId = orderId; this.platform = platform; this.ts = ts; } @Override public String toString() { return "ThirdPartyPayEvent{" + "orderId='" + orderId + '\'' + ", platform='" + platform + '\'' + ", ts=" + ts + '}'; } }
具体实现代码:
public class BillCheckExample { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 读取app订单事件流 SingleOutputStreamOperator<OrderEvent> orderStream = env.fromElements( new OrderEvent("Alice", "order-1", "create", 1000L), new OrderEvent("Bob", "order-2", "create", 2000L), new OrderEvent("Alice", "order-1", "modify", 5000L), new OrderEvent("Alice", "order-1", "pay", 6000L), new OrderEvent("Bob", "order-2", "pay", 8000L), new OrderEvent("Alice", "order-3", "create", 9000L) ) .assignTimestampsAndWatermarks(WatermarkStrategy.<OrderEvent>forBoundedOutOfOrderness(Duration.ofSeconds(0)) .withTimestampAssigner((element, recordTimestamp) -> element.ts) ); // 读取第三方平台支付事件流 SingleOutputStreamOperator<ThirdPartyPayEvent> thirdPartyPayStream = env.fromElements( new ThirdPartyPayEvent("order-1", "alipay", 9000L), new ThirdPartyPayEvent("order-3", "wechat", 10000L) ).assignTimestampsAndWatermarks(WatermarkStrategy.<ThirdPartyPayEvent>forBoundedOutOfOrderness(Duration.ofSeconds(0)) .withTimestampAssigner((element, recordTimestamp) -> element.ts) ); // 定义输出标签,表示未匹配报警输出流 OutputTag<String> outputTag = new OutputTag<String>("warning") { }; // 连接两条流,选取pay事件,按照订单id分组进行对账匹配 SingleOutputStreamOperator<String> payedStream = orderStream.filter(value -> value.eventType.equals("pay")) .connect(thirdPartyPayStream) .keyBy(value -> value.orderId, value -> value.orderId) .process(new BillCheck()); payedStream.print("payed"); payedStream.getSideOutput(outputTag).print("warning"); env.execute(); } public static class BillCheck extends KeyedCoProcessFunction<String, OrderEvent, ThirdPartyPayEvent, String>{ ValueState<OrderEvent> orderEventValueState; ValueState<ThirdPartyPayEvent> thirdPartyPayEventValueState; @Override public void open(Configuration parameters) throws Exception { orderEventValueState = getRuntimeContext().getState(new ValueStateDescriptor<OrderEvent>("order-event", OrderEvent.class)); thirdPartyPayEventValueState = getRuntimeContext().getState(new ValueStateDescriptor<ThirdPartyPayEvent>("third-party-pay", ThirdPartyPayEvent.class)); } @Override public void processElement1(OrderEvent orderEvent, KeyedCoProcessFunction<String, OrderEvent, ThirdPartyPayEvent, String>.Context ctx, Collector<String> out) throws Exception { // 来了一个订单事件,就检测另一条流事件是否到来 ThirdPartyPayEvent thirdPartyPayEvent = thirdPartyPayEventValueState.value(); if (thirdPartyPayEvent != null){ // 输出匹配成功信息 out.collect( "订单" + orderEvent.orderId + "成功支付!" + orderEvent + "-->" + thirdPartyPayEvent ); thirdPartyPayEventValueState.clear(); // 清理状态 } else { // 注册10秒后的定时器,开始等待 ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 10000); orderEventValueState.update(orderEvent); // 保存状态 } } @Override public void processElement2(ThirdPartyPayEvent thirdPartyPayEvent, KeyedCoProcessFunction<String, OrderEvent, ThirdPartyPayEvent, String>.Context ctx, Collector<String> out) throws Exception { OrderEvent orderEvent = orderEventValueState.value(); if (orderEvent != null){ // 输出匹配成功信息 out.collect( "订单" + orderEvent.orderId + "成功支付!" + orderEvent + "-->" + thirdPartyPayEvent ); orderEventValueState.clear(); // 清理状态 } else { // 注册10秒后的定时器,开始等待 ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 10000); thirdPartyPayEventValueState.update(thirdPartyPayEvent); // 保存状态 } } @Override public void onTimer(long timestamp, KeyedCoProcessFunction<String, OrderEvent, ThirdPartyPayEvent, String>.OnTimerContext ctx, Collector<String> out) throws Exception { OutputTag<String> outputTag = new OutputTag<String>("warning") {}; // 判断状态是否有值,如果有值说明另一条流事件没来 if (orderEventValueState.value() != null){ ctx.output(outputTag, "订单" + orderEventValueState.value().orderId + "未匹配成功!第三方支付事件未到"); } if (thirdPartyPayEventValueState.value() != null){ ctx.output(outputTag, "订单" + thirdPartyPayEventValueState.value().orderId + "未匹配成功!APP订单支付事件未到"); } // 清空状态 orderEventValueState.clear(); thirdPartyPayEventValueState.clear(); } } }
转载请注明:西门飞冰的博客 » Flink 多流转换分流合流