由于blog各种垃圾评论太多,而且本人审核评论周期较长,所以懒得管理评论了,就把评论功能关闭,有问题可以直接qq骚扰我

Flink Sql的join方式

大数据 西门飞冰 4909℃
[隐藏]

1.介绍

通过API实现双流join有个弊端,就是不管是基于窗口join还是状态join都只支持内连接

Flink这个时候要支持其他连接就需要用到flink sql 进行join了

2.内连接

内连接:合并具有同一列的两个以上的表的行, 结果集中不包含一个表与另一个表不匹配的行

代码示例:

public class FlinkSqlJoin {
    public static void main(String[] args) throws Exception {
        // 1.基本环境准备
        //1.1 指定流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //1.2 设置并行度
        env.setParallelism(1);
        //1.3 指定表执行环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        //1.4 设置状态的TTL
        tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(10));

        //2.从指定端口读取员工数据 并指定Watermark
        SingleOutputStreamOperator<Emp> empDS = env
                .socketTextStream("127.0.0.1", 8888)
                .map(new MapFunction<String, Emp>() {
                    @Override
                    public Emp map(String lineStr) throws Exception {
                        String[] fieldArr = lineStr.split(",");
                        return new Emp(
                                Integer.valueOf(fieldArr[0]),
                                fieldArr[1],
                                Integer.valueOf(fieldArr[2]),
                                Long.valueOf(fieldArr[3])
                        );
                    }
                });
        tableEnv.createTemporaryView("emp",empDS);

        //3.从指定端口读取部门数据 并指定Watermark
        SingleOutputStreamOperator<Dept> deptDS = env
                .socketTextStream("127.0.0.1", 8889)
                .map(
                        new MapFunction<String, Dept>() {
                            @Override
                            public Dept map(String jsonStr) throws Exception {
                                String[] fieldArr = jsonStr.split(",");
                                return new Dept(Integer.valueOf(fieldArr[0]), fieldArr[1], Long.valueOf(fieldArr[2]));
                            }
                        }
                );
        tableEnv.createTemporaryView("dept",deptDS);

        //4.内连接
        tableEnv.executeSql("select e.empno,e.ename,d.deptno,d.dname from emp e join dept d on e.deptno = d.deptno").print();
    }
}

输入数据如下:

image-20230101201318882

程序输出如下:

image-20230101201208370

如果使用普通的内外连接,FlinkSQL底层会自动维护两个状态,分别用来存放参与连接的两张表的数据默认情况下,状态永远不会失效,在生产环境中,不可能让其永不失效,可以通过如下方式设置状态的TTL tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(10));

注意:这个配置是对所有参与连接的表进行统一设置,无法给参与连接的单个表设置单独的TTL

3.左外连接

左外连接:将左表数据全部展现出来,右表如果没有数据则模拟一个空值进行匹配

实现sql:

tableEnv.executeSql("select e.empno,e.ename,d.deptno,d.dname from emp e left join dept d on e.deptno = d.deptno").print();

测试:右表数据先来,左表数据后到

程序输出如下

image-20230101203402853

测试:左表数据先来,右表数据后到

程序输出如下,得到了三条数据

image-20230101203505679

左外连接,底层连接过程:

如果左表数据先来,右表数据后到

当左表数据来的时候,生成一条数据[左表,null],标记为+I

当右表数据来的时候,会将上面的数据撤回,生成一条数据[左表,null],标记为-D

再生成一条数据,[左表,右表],标记为+I

这样的动态表转换的流称之为回撤流

4.右外连接

右外连接:将右表数据全部展现出来,左表如果没有数据则模拟一个空值进行匹配

实现sql:

tableEnv.executeSql("select e.empno,e.ename,d.deptno,d.dname from emp e right join dept d on e.deptno = d.deptno").print();

5.全外连接

全外连接:左右表匹配的数据 + 左表没有匹配到的数据 + 右表没有匹配到的数据。

实现sql:

tableEnv.executeSql("select e.empno,e.ename,d.deptno,d.dname from emp e full join dept d on e.deptno = d.deptno").print();

6.join的状态更新时间

左表 右表
内连接 OnCreateAndWrite OnCreateAndWrite
左外连接 OnReadAndWrite OnCreateAndWrite
右外连接 OnCreateAndWrite OnReadAndWrite
全外连接 OnReadAndWrite OnReadAndWrite

OnReadAndWrite:只要每读取一次数据,状态的TTL就要重新开始记时

OnCreateAndWrite:仅在创建和写入时更新

7.lookupJoin

lookup join:从一个外部系统表,来丰富当前的数据。

lookupjoin底层原理:

底层没有维护两个状态,以左表进行驱动的,当左表数据到来的时候,再去右表中查询对应数据。

lookup join是以左表进行驱动的,当左表数据到来之后,发送请求和右表进行连接

lookup join和普通的内外连接区别:

和普通的内外连接的实现方式完全不一样,底层不会维护两个状态,用于存放参与连接的两张表数据

且使用内外连接做关联的话,状态失效时间不太好设置,要是不设置状态的失效时间,那么维度数据就会一直保存在内存中,对内存压力比较大,要是设置失效时间的话,数据过了一段时间就失效了,在进行join就会导致关联不上相关数据

lookupjoin使用的话,是基于处理时间的,因此需要在左表指定一个处理时间列

image-20230115143513859

示例代码:

public class PreCartAdd {
    public static void main(String[] args) {
        // 指定流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度
        env.setParallelism(1);
        // 指定表执行环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        // 从kafka主题中读取员工数据
        tableEnv.executeSql("CREATE TABLE emp (\n" +
                "   empno integer,\n" +
                "   ename string,\n" +
                "   deptno integer,\n" +
                "   proc_time AS PROCTIME()\n" +	// 添加一个处理时间的列,主要是为了loopup join计算准备
                ") WITH (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'first',\n" +
                "  'properties.bootstrap.servers' = 'hadoop01:9092',\n" +
                "  'properties.group.id' = 'testGroup',\n" +
                "  'scan.startup.mode' = 'latest-offset',\n" +
                "  'format' = 'json'\n" +
                ")");
        // 从mySQL表中读取部门数据
        tableEnv.executeSql("CREATE TABLE dept (\n" +
                "  deptno integer,\n" +
                "  dname string\n" +
                ") WITH (\n" +
                "   'connector' = 'jdbc',\n" +
                "   'url' = 'jdbc:mysql://hadoop01:3306/gmall_realtime_config',\n" +
                "   'table-name' = 't_dept',\n" +
                "   'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
                "   'username' = 'maxwell',\n" +
                "   'password' = '@Fblinux123'\n" +
                ")");

        // 使用lookup join将员工和部门进行关联
        tableEnv.executeSql("SELECT e.empno,e.ename,d.deptno,d.dname FROM emp AS e\n" +
                "  JOIN dept FOR SYSTEM_TIME AS OF e.proc_time AS d ON e.deptno = d.deptno").print();
    }
}

Mysql 中数据:

image-20230110214143954

Kafka 生产者输出:

程序输出结果:

[root@hadoop01 ~]# /opt/module/kafka/bin/kafka-console-producer.sh --broker-list hadoop01:9092 --topic first
>{"empno":100,"ename":"feibing","deptno":10}

image-20230110214111202

Kafka每来一条数据都要去mysql数据库进行查询明显不是很优雅,我们可以把查询的数据放到缓存中,这样就会减少mysql的查询压力,默认情况下,lookup cache 不开启,所以所有请求都会发送到外部数据库。 启用缓存后,Flink 将优先查找缓存,只有当缓存未查找到时才向外部数据库发送请求,并使用返回的数据更新缓存。 配置方法如下所示:

image-20230110214748108

转载请注明:西门飞冰的博客 » Flink Sql的join方式

喜欢 (0)or分享 (0)