由于blog各种垃圾评论太多,而且本人审核评论周期较长,所以懒得管理评论了,就把评论功能关闭,有问题可以直接qq骚扰我

Iceberg和spark集成笔记

大数据 西门飞冰 4950℃
[隐藏]

1.简介

Iceberg 的优势有以下几点:

1、增量数据更新:Iceberg 可以支持增量数据更新,而不是全量数据覆盖,从而减少了数据更新的时间和成本。

2、事务管理:Iceberg 支持事务管理,可以确保数据的一致性和可靠性。

3、版本管理:Iceberg 支持版本管理,可以方便地管理数据的历史版本,从而更好地追踪数据变化。

4、数据格式支持:Iceberg 支持多种数据格式,包括 Parquet、ORC、Avro 等,可以满足不同的数据处理需求。

5、跨存储介质支持:Iceberg 可以支持在多个存储介质上存储数据,包括 HDFS、S3、Azure Blob Storage 等。

Iceberg 最适合与 Spark、Hive、Presto 等大数据处理框架集成。这些框架都可以通过使用 Iceberg API 来访问和操作 Iceberg 存储的数据,从而获得 Iceberg 的优势和能力。同时,Iceberg 也可以与其他数据处理框架集成,如 Flink、Kafka 等,但需要通过一些额外的工作来实现。

iceberg和spark集成 功能是最全的,iceberg的很多功能在spark都能实现,但是其他引擎就不一定了

Spark与Iceberg的版本对应关系如下:

Spark版本 Iceberg版本
2.4 0.7.0-incubating – 1.1.0
3.0 0.9.0 – 1.0.0
3.1 0.12.0 – 1.1.0
3.2 0.13.0 – 1.1.0
3.3 0.14.0 – 1.1.0

2.环境准备

版本说明:

hadoop:3.1.3

hive:3.1.2

spark:3.3.1

iceberg:1.1.0

1、安装spark

tar -zxvf spark-3.3.1-bin-hadoop3.tgz -C /opt/module/
mv /opt/module/spark-3.3.1-bin-hadoop3 /opt/module/spark-3.3.1

配置环境变量

[root@hadoop-test01 ~]# vim /etc/profile
#SPARK_HOME
export SPARK_HOME=/opt/module/spark-3.3.1
export PATH=$PATH:$SPARK_HOME/bin
[root@hadoop-test01 ~]# source /etc/profile

2、下载iceberg的jar包到spark的jars目录

wget https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.3_2.12/1.1.0/iceberg-spark-runtime-3.3_2.12-1.1.0.jar
cp iceberg-spark-runtime-3.3_2.12-1.1.0.jar /opt/module/spark-3.3.1/jars

到此:iceberg和spark sql就集成完毕了

3.Catalog 配置

Spark中支持两种Catalog的设置:hive和hadoop,Hive Catalog就是Iceberg表存储使用Hive默认的数据路径,Hadoop Catalog需要指定Iceberg格式表存储路径。

1、hive Catalog配置

]# vim conf/spark-defaults.conf
spark.sql.catalog.hive_prod = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hive_prod.type = hive
spark.sql.catalog.hive_prod.uri = thrift://hadoop-test01:9083

需要同时将hive配置文件hive-site.xml 放到spark的conf目录下

2、Hadoop Catalog配置

]# vim conf/spark-defaults.conf
spark.sql.catalog.hadoop_prod = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hadoop_prod.type = hadoop
spark.sql.catalog.hadoop_prod.warehouse = hdfs://hadoop-test01:8020/warehouse/spark-iceberg

说明:hive和Hadoop的Catalog 可以同时配置,使用的时候使用use进行切换即可

use hive_prod;
use hadoop_prod;

配置完成之后就可以进入spark-sql 执行相关操作了

4.catalog和namespace使用

设置使用的catalog和namespace(就是hive数据库的意思)

spark-sql>  use hive_prod.default;		# 切换到指定的catalog和namespace
spark-sql>  show current namespace;		# 显示当前的namespace
hive_prod	default
spark-sql>  show namespaces;					# 显示所有的namespace
default
spark-sql> create database fblinux;		# 也可以创建新的namespace
spark-sql> show namespaces;
default
fblinux

5.创建表(常规)

1)创建普通表

创建普通表和sql基本一致,就是需要指定一下catalog和namespace(非必需)和使用USING设置为iceberg表

spark-sql> CREATE TABLE hive_prod.default.sample1 (
         >     id bigint COMMENT 'unique id',
         >     data string)
         > USING iceberg;			# 指定设置为iceberg表
spark-sql> desc sample1;
id                  	bigint              	unique id           
data                	string              	                    
                    	                    	                    
# Partitioning      	                    	                    
Not partitioned  

建表的一些参数:

  • PARTITIONED BY (partition-expressions) :配置分区
  • LOCATION ‘(fully-qualified-uri)’ :指定表路径
  • COMMENT ‘table documentation’ :配置表备注
  • TBLPROPERTIES (‘key’=’value’, …) :配置表属性

表属性:https://iceberg.apache.org/docs/latest/configuration/

对Iceberg表的每次更改都会生成一个新的元数据文件(json文件)以提供原子性。默认情况下,旧元数据文件作为历史文件保存不会删除。

如果要自动清除元数据文件,在表属性中设置write.metadata.delete-after-commit.enabled=true。这将保留一些元数据文件(直到write.metadata.previous-versions-max),并在每个新创建的元数据文件之后删除旧的元数据文件。

2)创建分区表

iceberg 和hive有区别的是分区的字段已经在字段列表里了

spark-sql> CREATE TABLE hive_prod.default.sample2 (
         >     id bigint,
         >     data string,
         >     category string)
         > USING iceberg
         > PARTITIONED BY (category);
         
spark-sql> desc sample2;
id                  	bigint              	                    
data                	string              	                    
category            	string              	                    
                    	                    	                    
# Partitioning      	                    	                    
Part 0              	category            	                    
Time taken: 0.036 seconds, Fetched 6 row(s)

spark-sql> desc formatted sample2;
id                  	bigint              	                    
data                	string              	                    
category            	string              	                    
                    	                    	                    
# Partitioning      	                    	                    
Part 0              	category            	                    
                    	                    	                    
# Metadata Columns  	                    	                    
_spec_id            	int                 	                    
_partition          	struct<category:string>	                    
_file               	string              	                    
_pos                	bigint              	                    
_deleted            	boolean             	                    
                    	                    	                    
# Detailed Table Information	                    	                    
Name                	hive_prod.default.sample2	                    
Location            	hdfs://hadoop-test01:8020/user/hive/warehouse/sample2	                    
Provider            	iceberg             	                    
Owner               	hadoop              	                    
Table Properties    	[current-snapshot-id=none,format=iceberg/parquet,format-version=1]

3)创建隐藏分区表

隐藏分区是什么:指定分区字段可以做计算,然后分区的字段不用体现在建表语句当中,如下建表语句 id通过bucket函数分成16个桶,这个列并没有直接存在于字段列表(id分16桶的结果没有作为字段存在于字段列表),这种分区字段就是一种隐藏的分区,如果是hive表就需要将结果定义为字段显示出来。

spark-sql> CREATE TABLE hive_prod.default.sample3 (
         >     id bigint,
         >     data string,
         >     category string,
         >     ts timestamp)
         > USING iceberg
         > PARTITIONED BY (bucket(16, id), days(ts), category);		

spark-sql> desc formatted sample3;
id                  	bigint              	                    
data                	string              	                    
category            	string              	                    
ts                  	timestamp           	                    
                    	                    	                    
# Partitioning      	                    	                    
Part 0              	bucket(16, id)      	                    
Part 1              	days(ts)            	                    
Part 2              	category            	                    
                    	                    	                    
# Metadata Columns  	                    	                    
_spec_id            	int                 	                    
_partition          	struct<id_bucket:int,ts_day:date,category:string>	                    
_file               	string              	                    
_pos                	bigint              	                    
_deleted            	boolean             	                    
                    	                    	                    
# Detailed Table Information	                    	                    
Name                	hive_prod.default.sample3	                    
Location            	hdfs://hadoop-test01:8020/user/hive/warehouse/sample3	                    
Provider            	iceberg             	                    
Owner               	hadoop              	                    
Table Properties    	[current-snapshot-id=none,format=iceberg/parquet,format-version=1]

支持的转换有:

  • years(ts):按年划分
  • months(ts):按月划分
  • days(ts)或date(ts):等效于dateint分区
  • hours(ts)或date_hour(ts):等效于dateint和hour分区
  • bucket(N, col):按哈希值划分mod N个桶
  • truncate(L, col):按截断为L的值划分
  • 字符串被截断为给定的长度

整型和长型截断为bin: truncate(10, i)生成分区0,10,20,30,…

6.CTAS 语法建表

除了常规建表以外还可以使用CTAS建表,就是后面添加一个查询,这种语法iceberg是支持的

spark-sql> CREATE TABLE hive_prod.default.sample4
         > USING iceberg
         > AS SELECT * from hive_prod.default.sample3;
         
spark-sql> desc formatted sample4;
id                  	bigint              	                    
data                	string              	                    
category            	string              	                    
ts                  	timestamp           	                    
                    	                    	                    
# Partitioning      	                    	                    
Not partitioned     	                    	                    
                    	                    	                    
# Metadata Columns  	                    	                    
_spec_id            	int                 	                    
_partition          	struct<>            	                    
_file               	string              	                    
_pos                	bigint              	                    
_deleted            	boolean             	                    
                    	                    	                    
# Detailed Table Information	                    	                    
Name                	hive_prod.default.sample4	                    
Location            	hdfs://hadoop-test01:8020/user/hive/warehouse/sample4	                    
Provider            	iceberg             	                    
Owner               	hadoop              	                    
Table Properties    	[current-snapshot-id=3746870716195160225,format=iceberg/parquet,format-version=1]

从上面的formatted可以看到,使用CTAS 语法建表有个问题,就是无法将表分区携带过来,CTAS在iceberg的特点就是不指定分区就是无分区,需要重新指定分区、表属性,也可以改原来的分区和表属性,如下所示:

spark-sql> CREATE TABLE hive_prod.default.sample5
         > USING iceberg
         > PARTITIONED BY (bucket(8, id), hours(ts), category)
         > TBLPROPERTIES ('key'='value')
         > AS SELECT * from hive_prod.default.sample3;
spark-sql> desc formatted sample5;
id                  	bigint              	                    
data                	string              	                    
category            	string              	                    
ts                  	timestamp           	                    
                    	                    	                    
# Partitioning      	                    	                    
Part 0              	bucket(8, id)       	                    
Part 1              	hours(ts)           	                    
Part 2              	category            	                    
                    	                    	                    
# Metadata Columns  	                    	                    
_spec_id            	int                 	                    
_partition          	struct<id_bucket:int,ts_hour:int,category:string>	                    
_file               	string              	                    
_pos                	bigint              	                    
_deleted            	boolean             	                    
                    	                    	                    
# Detailed Table Information	                    	                    
Name                	hive_prod.default.sample5	                    
Location            	hdfs://hadoop-test01:8020/user/hive/warehouse/sample5	                    
Provider            	iceberg             	                    
Owner               	hadoop              	                    
Table Properties    	[current-snapshot-id=2956392194498691023,format=iceberg/parquet,format-version=1,key=value]

7.Replace table建表

REPLACE 相当于把表重构,必需使用已经存在的表,要是使用不存在的表会报错

如下语句可以看到将分区信息覆盖了

spark-sql> REPLACE TABLE hive_prod.default.sample5
         > USING iceberg
         > AS SELECT * from hive_prod.default.sample3;
Time taken: 0.271 seconds
spark-sql> desc formatted sample5;
id                  	bigint              	                    
data                	string              	                    
category            	string              	                    
ts                  	timestamp           	                    
                    	                    	                    
# Partitioning      	                    	                    
Not partitioned     	                    	                    
                    	                    	                    
# Metadata Columns  	                    	                    
_spec_id            	int                 	                    
_partition          	struct<id_bucket:int,ts_hour:int,category:string>	                    
_file               	string              	                    
_pos                	bigint              	                    
_deleted            	boolean             	                    
                    	                    	                    
# Detailed Table Information	                    	                    
Name                	hive_prod.default.sample5	                    
Location            	hdfs://hadoop-test01:8020/user/hive/warehouse/sample5	                    
Provider            	iceberg             	                    
Owner               	hadoop              	                    
Table Properties    	[current-snapshot-id=8333723822811548252,format=iceberg/parquet,format-version=1,key=value]	

使用REPLACE的时候同样可以指定分区和表属性

spark-sql> REPLACE TABLE hive_prod.default.sample5
         > USING iceberg
         > PARTITIONED BY (part)
         > TBLPROPERTIES ('key'='value')
         > AS SELECT * from hive_prod.default.sample3;

表不存在自动创建存在则自动替换使用下面这种方式:

spark-sql> CREATE OR REPLACE TABLE hive_prod.default.sample6
         > USING iceberg
         > AS SELECT * from hive_prod.default.sample3;

8.删除表

对于HadoopCatalog而言:运行DROP TABLE将从catalog中删除表并删除表内容。

CREATE EXTERNAL TABLE hadoop_prod.default.sample7 (
    id bigint COMMENT 'unique id',
    data string)
USING iceberg

INSERT INTO hadoop_prod.default.sample7 values(1,'a')
DROP TABLE hadoop_prod.default.sample7

对于HiveCatalog而言:

  • 在0.14之前,运行DROP TABLE将从catalog中删除表并删除表内容。
  • 从0.14开始,DROP TABLE只会从catalog中删除表,不会删除数据。为了删除表内容,应该使用DROP table PURGE。
CREATE TABLE hive_prod.default.sample7 (
    id bigint COMMENT 'unique id',
    data string)
USING iceberg

INSERT INTO hive_prod.default.sample7 values(1,'a')

删除表:

DROP TABLE hive_prod.default.sample7

删除表和数据:

DROP TABLE hive_prod.default.sample7 PURGE

9.修改表

Iceberg在Spark 3中完全支持ALTER TABLE,包括:

  • 重命名表
  • 设置或删除表属性
  • 添加、删除和重命名列
  • 添加、删除和重命名嵌套字段
  • 重新排序顶级列和嵌套结构字段
  • 扩大int、float和decimal字段的类型
  • 将必选列变为可选列

此外,还可以使用SQL扩展来添加对分区演变的支持和设置表的写顺序。

创建测试用表:

spark-sql> CREATE TABLE hive_prod.fblinux.sample1 (
         >     id bigint COMMENT 'unique id',
         >     data string)
         > USING iceberg;

1)修改表名

不支持修改HadoopCatalog的表名

spark-sql> ALTER TABLE hive_prod.fblinux.sample1 RENAME TO hive_prod.fblinux.sample2;
spark-sql> show tables;
sample2

2)修改表属性

spark-sql> ALTER TABLE hive_prod.fblinux.sample1 SET TBLPROPERTIES (
         >     'read.split.target-size'='268435456');
spark-sql> ALTER TABLE hive_prod.fblinux.sample1 SET TBLPROPERTIES (
         >     'comment' = 'A table comment.');
spark-sql> describe formatted sample1;
id                  	bigint              	unique id           
data                	string              	                    
                    	                    	                    
# Partitioning      	                    	                    
Not partitioned     	                    	                    
                    	                    	                    
# Metadata Columns  	                    	                    
_spec_id            	int                 	                    
_partition          	struct<>            	                    
_file               	string              	                    
_pos                	bigint              	                    
_deleted            	boolean             	                    
                    	                    	                    
# Detailed Table Information	                    	                    
Name                	hive_prod.fblinux.sample1	                    
Comment             	A table comment.    	                    
Location            	hdfs://hadoop-test01:8020/user/hive/warehouse/fblinux.db/sample1	                    
Provider            	iceberg             	                    
Owner               	hadoop              	                    
Table Properties    	[current-snapshot-id=none,format=iceberg/parquet,format-version=1,read.split.target-size=268435456]        

删除表属性:将SET改成UNSET然后跟上要删除的参数名称就可以了

ALTER TABLE hive_prod.fblinux.sample1 UNSET TBLPROPERTIES ('read.split.target-size');

3)添加列

spark-sql> ALTER TABLE hive_prod.fblinux.sample1
         > ADD COLUMNS (
         >     category string comment 'new_column'
         >   );
-- 添加struct类型的列         
spark-sql> ALTER TABLE hive_prod.fblinux.sample1
         > ADD COLUMN point struct<x: double, y: double>;
-- 往struct类型的列中添加字段         
spark-sql> ALTER TABLE hive_prod.fblinux.sample1
         > ADD COLUMN point.z double;
-- 创建struct的嵌套数组列
spark-sql> ALTER TABLE hive_prod.fblinux.sample1
         > ADD COLUMN points array<struct<x: double, y: double>>;
-- 在数组中的结构中添加一个字段。使用关键字'element'访问数组的元素列。
spark-sql> ALTER TABLE hive_prod.fblinux.sample1
         > ADD COLUMN points.element.z double;
-- 创建一个包含Map类型的列,key和value都为struct类型
spark-sql> ALTER TABLE hive_prod.fblinux.sample1
         > ADD COLUMN pointsm map<struct<x: int>, struct<a: int>>;
-- 在Map类型的value的struct中添加一个字段
spark-sql> ALTER TABLE hive_prod.fblinux.sample1
         > ADD COLUMN pointsm.value.b int;
-- 查看最终的结果         
spark-sql> describe formatted sample1;
id                  	bigint              	unique id           
data                	string              	                    
category            	string              	new_column          
point               	struct<x:double,y:double,z:double>	                    
points              	array<struct<x:double,y:double,z:double>>	                    
pointsm             	map<struct<x:int>,struct<a:int,b:int>>	                    
                    	                    	                    
# Partitioning      	                    	                    
Not partitioned     	                    	                    
                    	                    	                    
# Metadata Columns  	                    	                    
_spec_id            	int                 	                    
_partition          	struct<>            	                    
_file               	string              	                    
_pos                	bigint              	                    
_deleted            	boolean             	                    
                    	                    	                    
# Detailed Table Information	                    	                    
Name                	hive_prod.fblinux.sample1	                    
Comment             	A table comment.    	                    
Location            	hdfs://hadoop-test01:8020/user/hive/warehouse/fblinux.db/sample1	                    
Provider            	iceberg             	                    
Owner               	hadoop              	                    
Table Properties    	[current-snapshot-id=none,format=iceberg/parquet,format-version=1]  

在Spark 2.4.4及以后版本中,可以通过添加FIRST(添加在最前面)或AFTER(指定列的后面)子句在任何位置添加列:

ALTER TABLE hive_prod.fblinux.sample1
ADD COLUMN new_column1 bigint AFTER id;

ALTER TABLE hive_prod.fblinux.sample1
ADD COLUMN new_column2 bigint FIRST;

4)修改列

-- 修改列名
spark-sql> ALTER TABLE hive_prod.fblinux.sample1 RENAME COLUMN data TO data1;

-- Alter Column修改类型(只允许安全的转换,如int转bigint)
ALTER TABLE hive_prod.fblinux.sample1
ADD COLUMNS (
    idd int
  );
ALTER TABLE hive_prod.fblinux.sample1 ALTER COLUMN idd TYPE bigint;
-- Alter Column 修改列的注释
ALTER TABLE hive_prod.fblinux.sample1 ALTER COLUMN id TYPE double COMMENT 'a';
ALTER TABLE hive_prod.fblinux.sample1 ALTER COLUMN id COMMENT 'b';
-- Alter Column修改列的顺序
ALTER TABLE hive_prod.fblinux.sample1 ALTER COLUMN id FIRST;
ALTER TABLE hive_prod.fblinux.sample1 ALTER COLUMN new_column2 AFTER new_column1;
-- Alter Column修改列是否允许为null
ALTER TABLE hive_prod.fblinux.sample1 ALTER COLUMN id DROP NOT NULL;

ALTER COLUMN不用于更新struct类型。使用ADD COLUMN和DROP COLUMN添加或删除struct类型的字段。

5)删除列

spark-sql> ALTER TABLE hive_prod.fblinux.sample1 DROP COLUMN idd;
spark-sql> ALTER TABLE hive_prod.fblinux.sample1 DROP COLUMN point.z;

10.修改分区

修改分区使用条件(1)spark3才支持,(2)需要配置扩展

1)添加分区

添加扩展配置(无法临时set 只能启动前配置)

# vim spark-default.conf

spark.sql.extensions = org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions

重新进入spark-sql shell:

-- 添加一个已有字段作为分区
ALTER TABLE hive_prod.fblinux.sample1 ADD PARTITION FIELD category;

-- 添加隐藏分区,对id取模
ALTER TABLE hive_prod.fblinux.sample1 ADD PARTITION FIELD bucket(16, id);
ALTER TABLE hive_prod.fblinux.sample1 ADD PARTITION FIELD truncate(data, 4);
ALTER TABLE hive_prod.fblinux.sample1 ADD PARTITION FIELD years(ts);

ALTER TABLE hive_prod.fblinux.sample1 ADD PARTITION FIELD bucket(16, id) AS shard;

2)删除分区

ALTER TABLE hive_prod.fblinux.sample1 DROP PARTITION FIELD category;
ALTER TABLE hive_prod.fblinux.sample1 DROP PARTITION FIELD bucket(16, id);
ALTER TABLE hive_prod.fblinux.sample1 DROP PARTITION FIELD truncate(data, 4);
ALTER TABLE hive_prod.fblinux.sample1 DROP PARTITION FIELD years(ts);
ALTER TABLE hive_prod.fblinux.sample1 DROP PARTITION FIELD shard;

注意,尽管删除了分区,但列仍然存在于表结构中。

删除分区字段是元数据操作,不会改变任何现有的表数据。新数据将被写入新的分区,但现有数据将保留在旧的分区布局中。

当分区发生变化时,动态分区覆盖行为也会发生变化。例如,如果按天划分分区,而改为按小时划分分区,那么覆盖将覆盖每小时划分的分区,而不再覆盖按天划分的分区。

删除分区字段时要小心,可能导致元数据查询失败或产生不同的结果。

3)修改分区:比如之前按天分区可以使用REPLACE转换为按小时分区

-- 将16个bucket修改为8个bucket
ALTER TABLE hive_prod.fblinux.sample1 REPLACE PARTITION FIELD bucket(16, id) WITH bucket(8, id);

11.修改表的写入顺序

ALTER TABLE hive_prod.fblinux.sample1 WRITE ORDERED BY category, id;

-- 按照category进行升序写入,按照id进行降序写入
ALTER TABLE hive_prod.fblinux.sample1 WRITE ORDERED BY category ASC, id DESC;
-- 按照category进行升序写入,要是有null值则放在最后面,按照id进行降序,要是有null值放在最前面
ALTER TABLE hive_prod.fblinux.sample1 WRITE ORDERED BY category ASC NULLS LAST, id DESC NULLS FIRST;

表写顺序不能保证查询的数据顺序。它只影响数据写入表的方式。

WRITE ORDERED BY设置了一个全局排序,即跨任务的行排序,就像在INSERT命令中使用ORDER BY一样:

INSERT INTO hive_prod.fblinux.sample1
SELECT id, data, category, ts FROM another_table
ORDER BY ts, category;

要在每个任务内排序,而不是跨任务排序,使用local ORDERED BY:

ALTER TABLE hadoop_prod.default.sample1 WRITE LOCALLY ORDERED BY category, id;

12.插入数据

准备两张表:

CREATE TABLE hive_prod.fblinux.a (
    id bigint,
    count bigint)
USING iceberg;

CREATE TABLE hive_prod.fblinux.b (
    id bigint,
count bigint,
flag string)
USING iceberg;

insert into 普通写入:

INSERT INTO hive_prod.fblinux.a VALUES (1, 1), (2, 2), (3, 3);
INSERT INTO hive_prod.fblinux.b VALUES (1, 1, 'a'), (2, 2, 'b'), (4, 4, 'd');

merge into行级更新:它可以帮我们实现类似于join的效果,但是在很多场景下用MERGE INTO效率会更高一点,特别是在数仓当中有一个join维度的需求,用merge into可能是一个不错的选择。

MERGE INTO hive_prod.fblinux.a t 			-- 这里是结果表
USING (SELECT * FROM hive_prod.fblinux.b) u ON t.id = u.id		-- USING 是关联表 ON 是关联条件
WHEN MATCHED AND u.flag='b' THEN UPDATE SET t.count = t.count + u.count		-- WHEN MATCHED 是关联上了怎么处理,用THEN做更新和删除都可以
WHEN MATCHED AND u.flag='a' THEN DELETE
WHEN NOT MATCHED THEN INSERT (id,count) values (u.id,u.count);	-- WHEN NOT 是没有关联上怎么处理

13.查询数据

1)普通查询

SELECT count(1) as count, data
FROM hive_prod.fblinux.a
GROUP BY data;

2)查询元数据

-- 查询表快照
spark-sql> SELECT * FROM hive_prod.fblinux.a.snapshots;
2023-04-16 11:22:34.023	2292277810484862958	NULL	append	hdfs://hadoop-test01:8020/user/hive/warehouse/fblinux.db/a/metadata/snap-2292277810484862958-1-b642aa9b-b76a-4290-bb29-8eb6a880c817.avro	{"added-data-files":"3","added-files-size":"2034","added-records":"3","changed-partition-count":"1","spark.app.id":"local-1681612655211","total-data-files":"3","total-delete-files":"0","total-equality-deletes":"0","total-files-size":"2034","total-position-deletes":"0","total-records":"3"}
2023-04-16 11:23:21.716	936323692260036362	2292277810484862958	overwrite	hdfs://hadoop-test01:8020/user/hive/warehouse/fblinux.db/a/metadata/snap-936323692260036362-1-69f8c74c-1daa-4e17-92b0-6810265ecc76.avro	{"added-data-files":"1","added-files-size":"750","added-records":"2","changed-partition-count":"1","deleted-data-files":"2","deleted-records":"2","removed-files-size":"1356","spark.app.id":"local-1681612655211","total-data-files":"2","total-delete-files":"0","total-equality-deletes":"0","total-files-size":"1428","total-position-deletes":"0","total-records":"3"}
Time taken: 0.241 seconds, Fetched 2 row(s)
-- 查询数据文件信息
spark-sql> SELECT * FROM hive_prod.fblinux.a.files;
0	hdfs://hadoop-test01:8020/user/hive/warehouse/fblinux.db/a/data/00000-8-a8b23f72-b062-405c-baa8-a9b5dd8d7ff7-00001.parquet	PARQUET	0	2	750	{1:54,2:92}	{1:2,2:2}	{1:0,2:0}	{}	{1:,2:}	{1:,2:}	NULL	[4]	NULL	0
0	hdfs://hadoop-test01:8020/user/hive/warehouse/fblinux.db/a/data/00002-2-e3b90f14-eb87-4a2f-a6d6-4289259c9823-00001.parquet	PARQUET	0	1	678	{1:46,2:46}	{1:1,2:1}	{1:0,2:0}	{}	{1:,2:}	{1:,2:}	NULL	[4]	NULL	0
Time taken: 0.153 seconds, Fetched 2 row(s)
-- 查询表历史
spark-sql> SELECT * FROM hive_prod.fblinux.a.history;
2023-04-16 11:22:34.023	2292277810484862958	NULL	true
2023-04-16 11:23:21.716	936323692260036362	2292277810484862958	true
Time taken: 0.089 seconds, Fetched 2 row(s)
-- 查询 manifest
spark-sql> SELECT * FROM hive_prod.fblinux.a.manifests;
0	hdfs://hadoop-test01:8020/user/hive/warehouse/fblinux.db/a/metadata/69f8c74c-1daa-4e17-92b0-6810265ecc76-m1.avro	5803	0	936323692260036362	1	0	0	0	0	0	[]
0	hdfs://hadoop-test01:8020/user/hive/warehouse/fblinux.db/a/metadata/69f8c74c-1daa-4e17-92b0-6810265ecc76-m0.avro	5908	0	936323692260036362	0	1	2	0	0	0	[]
Time taken: 0.091 seconds, Fetched 2 row(s)

14.存储过程(高级管理功能)

spark的用法当中还支持一个存储过程,主要是一些高级管理功能,而不是传统的查询插入一些东西。Procedures可以通过CALL从任何已配置的Iceberg Catalog中使用。所有Procedures都在namespace中。

1)语法

按照参数名传参

CALL catalog_name.system.procedure_name(arg_name_2 => arg_2, arg_name_1 => arg_1);

当按位置传递参数时,如果结束参数是可选的,则只有结束参数可以省略。

CALL catalog_name.system.procedure_name(arg_1, arg_2, ... arg_n);

2)快照管理

(1)回滚到指定的快照id

CALL hadoop_prod.system.rollback_to_snapshot('default.a', 7601163594701794741);

(2)回滚到指定时间的快照

CALL hadoop_prod.system.rollback_to_timestamp('db.sample', TIMESTAMP '2021-06-30 00:00:00.000');

(3)设置表的当前快照ID

CALL hadoop_prod.system.set_current_snapshot('db.sample', 1);

(4)从快照变为当前表状态

CALL hadoop_prod.system.cherrypick_snapshot('default.a', 7629160535368763452);

CALL hadoop_prod.system.cherrypick_snapshot(snapshot_id => 7629160535368763452, table => 'default.a' );

3)元数据管理

(1)删除早于指定日期和时间的快照,但保留最近100个快照:

CALL hive_prod.system.expire_snapshots('db.sample', TIMESTAMP '2021-06-30 00:00:00.000', 100)

(2)删除Iceberg表中任何元数据文件中没有引用的文件

-- 列出所有需要删除的候选文件
CALL catalog_name.system.remove_orphan_files(table => 'db.sample', dry_run => true)

-- 删除指定目录中db.sample表不知道的任何文件
CALL catalog_name.system.remove_orphan_files(table => 'db.sample', location => 'tablelocation/data')

(3)合并数据文件(合并小文件)

CALL catalog_name.system.rewrite_data_files('db.sample')

CALL catalog_name.system.rewrite_data_files(table => 'db.sample', strategy => 'sort', sort_order => 'id DESC NULLS LAST,name ASC NULLS FIRST')

CALL catalog_name.system.rewrite_data_files(table => 'db.sample', strategy => 'sort', sort_order => 'zorder(c1,c2)')

CALL catalog_name.system.rewrite_data_files(table => 'db.sample', options => map('min-input-files','2'))

CALL catalog_name.system.rewrite_data_files(table => 'db.sample', where => 'id = 3 and name = "foo"')

(4)重写表清单来优化执行计划

CALL catalog_name.system.rewrite_manifests('db.sample')

-- 重写表db中的清单。并禁用Spark缓存的使用。这样做可以避免执行程序上的内存问题。
CALL catalog_name.system.rewrite_manifests('db.sample', false)

4)迁移表

(1)快照

CALL catalog_name.system.snapshot('db.sample', 'db.snap')

CALL catalog_name.system.snapshot('db.sample', 'db.snap', '/tmp/temptable/')

(2)迁移

CALL catalog_name.system.migrate('spark_catalog.db.sample', map('foo', 'bar'))

CALL catalog_name.system.migrate('db.sample')

(3)添加数据文件

CALL spark_catalog.system.add_files(
table => 'db.tbl',
source_table => 'db.src_tbl',
partition_filter => map('part_col_1', 'A')
)

CALL spark_catalog.system.add_files(
 table => 'db.tbl',
 source_table => '`parquet`.`path/to/table`'
)

5)元数据信息

(1)获取指定快照的父快照id

CALL spark_catalog.system.ancestors_of('db.tbl')

(2)获取指定快照的所有祖先快照

CALL spark_catalog.system.ancestors_of('db.tbl', 1)

CALL spark_catalog.system.ancestors_of(snapshot_id => 1, table => 'db.tbl')

 

转载请注明:西门飞冰的博客 » Iceberg和spark集成笔记

喜欢 (1)or分享 (0)