1.介绍
通过API实现双流join有个弊端,就是不管是基于窗口join还是状态join都只支持内连接
2.内连接
内连接:合并具有同一列的两个以上的表的行, 结果集中不包含一个表与另一个表不匹配的行
代码示例:
public class FlinkSqlJoin { public static void main(String[] args) throws Exception { // 1.基本环境准备 //1.1 指定流处理环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //1.2 设置并行度 env.setParallelism(1); //1.3 指定表执行环境 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); //1.4 设置状态的TTL tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(10)); //2.从指定端口读取员工数据 并指定Watermark SingleOutputStreamOperator<Emp> empDS = env .socketTextStream("127.0.0.1", 8888) .map(new MapFunction<String, Emp>() { @Override public Emp map(String lineStr) throws Exception { String[] fieldArr = lineStr.split(","); return new Emp( Integer.valueOf(fieldArr[0]), fieldArr[1], Integer.valueOf(fieldArr[2]), Long.valueOf(fieldArr[3]) ); } }); tableEnv.createTemporaryView("emp",empDS); //3.从指定端口读取部门数据 并指定Watermark SingleOutputStreamOperator<Dept> deptDS = env .socketTextStream("127.0.0.1", 8889) .map( new MapFunction<String, Dept>() { @Override public Dept map(String jsonStr) throws Exception { String[] fieldArr = jsonStr.split(","); return new Dept(Integer.valueOf(fieldArr[0]), fieldArr[1], Long.valueOf(fieldArr[2])); } } ); tableEnv.createTemporaryView("dept",deptDS); //4.内连接 tableEnv.executeSql("select e.empno,e.ename,d.deptno,d.dname from emp e join dept d on e.deptno = d.deptno").print(); } }
如果使用普通的内外连接,FlinkSQL底层会自动维护两个状态,分别用来存放参与连接的两张表的数据默认情况下,状态永远不会失效,在生产环境中,不可能让其永不失效,可以通过如下方式设置状态的TTL tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(10));
3.左外连接
左外连接:将左表数据全部展现出来,右表如果没有数据则模拟一个空值进行匹配
tableEnv.executeSql("select e.empno,e.ename,d.deptno,d.dname from emp e left join dept d on e.deptno = d.deptno").print();
测试:右表数据先来,左表数据后到
测试:左表数据先来,右表数据后到
左外连接,底层连接过程:
如果左表数据先来,右表数据后到
当左表数据来的时候,生成一条数据[左表,null],标记为+I
当右表数据来的时候,会将上面的数据撤回,生成一条数据[左表,null],标记为-D
再生成一条数据,[左表,右表],标记为+I
这样的动态表转换的流称之为回撤流
4.右外连接
右外连接:将右表数据全部展现出来,左表如果没有数据则模拟一个空值进行匹配
tableEnv.executeSql("select e.empno,e.ename,d.deptno,d.dname from emp e right join dept d on e.deptno = d.deptno").print();
5.全外连接
全外连接:左右表匹配的数据 + 左表没有匹配到的数据 + 右表没有匹配到的数据。
tableEnv.executeSql("select e.empno,e.ename,d.deptno,d.dname from emp e full join dept d on e.deptno = d.deptno").print();
6.
左表 | 右表 | |
---|---|---|
内连接 | OnCreateAndWrite | OnCreateAndWrite |
左外连接 | OnReadAndWrite | OnCreateAndWrite |
右外连接 | OnCreateAndWrite | OnReadAndWrite |
全外连接 | OnReadAndWrite | OnReadAndWrite |
OnReadAndWrite:只要每读取一次数据,状态的TTL就要重新开始记时
OnCreateAndWrite:仅在创建和写入时更新
7.lookupJoin
lookup join:从一个外部系统表,来丰富当前的数据。
lookupjoin底层原理:
底层没有维护两个状态,以左表进行驱动的,当左表数据到来的时候,再去右表中查询对应数据。
lookup join是以左表进行驱动的,当左表数据到来之后,发送请求和右表进行连接
lookup join和普通的内外连接区别:
和普通的内外连接的实现方式完全不一样,底层不会维护两个状态,用于存放参与连接的两张表数据
且使用内外连接做关联的话,状态失效时间不太好设置,要是不设置状态的失效时间,那么维度数据就会一直保存在内存中,对内存压力比较大,要是设置失效时间的话,数据过了一段时间就失效了,在进行join就会导致关联不上相关数据
示例代码:
public class PreCartAdd { public static void main(String[] args) { // 指定流处理环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置并行度 env.setParallelism(1); // 指定表执行环境 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // 从kafka主题中读取员工数据 tableEnv.executeSql("CREATE TABLE emp (\n" + " empno integer,\n" + " ename string,\n" + " deptno integer,\n" + " proc_time AS PROCTIME()\n" + // 添加一个处理时间的列,主要是为了loopup join计算准备 ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'first',\n" + " 'properties.bootstrap.servers' = 'hadoop01:9092',\n" + " 'properties.group.id' = 'testGroup',\n" + " 'scan.startup.mode' = 'latest-offset',\n" + " 'format' = 'json'\n" + ")"); // 从mySQL表中读取部门数据 tableEnv.executeSql("CREATE TABLE dept (\n" + " deptno integer,\n" + " dname string\n" + ") WITH (\n" + " 'connector' = 'jdbc',\n" + " 'url' = 'jdbc:mysql://hadoop01:3306/gmall_realtime_config',\n" + " 'table-name' = 't_dept',\n" + " 'driver' = 'com.mysql.cj.jdbc.Driver',\n" + " 'username' = 'maxwell',\n" + " 'password' = '@Fblinux123'\n" + ")"); // 使用lookup join将员工和部门进行关联 tableEnv.executeSql("SELECT e.empno,e.ename,d.deptno,d.dname FROM emp AS e\n" + " JOIN dept FOR SYSTEM_TIME AS OF e.proc_time AS d ON e.deptno = d.deptno").print(); } }
Kafka 生产者输出:
[root@hadoop01 ~]# /opt/module/kafka/bin/kafka-console-producer.sh --broker-list hadoop01:9092 --topic first >{"empno":100,"ename":"feibing","deptno":10}
转载请注明:西门飞冰的博客 » Flink Sql的join方式