数据湖技术之iceberg(九)Spark与Iceberg整合写操作

南墨2年前技术文章1233

1. INSERT INTO

"insert into"是向Iceberg表中插入数据,有两种语法形式:"INSERT INTO tbl VALUES (1,"zs",18),(2,"ls",19)"、"INSERT INTO tbl SELECT ...",以上两种方式比较简单,这里不再详细记录。

2.    MERGE INTO

Iceberg "merge into"语法可以对表数据进行行级更新或删除,在Spark3.x版本之后支持,其原理是重写包含需要删除和更新行数据所在的data files。"merge into"可以使用一个查询结果数据来更新目标表的数据,其语法通过类似join关联方式,根据指定的匹配条件对匹配的行数据进行相应操作。"merge into"语法如下:

MERGE INTO tbl t

USING (SELECT ...) s

ON t.id = s.id

WHEN MATCHED AND ... THEN DELETE //删除

WHEN MATCHED AND ... THEN UPDATE SET ... //更新

WHEN MATCHED AND ... AND ... THEN UPDATE SET ... //多条件更新

WHEN NOT MATCHED ADN ... THEN INSERT (col1,col2...) VALUES(s.col1,s.col2 ...)//匹配不上向目标表插入数据

具体案例如下:

1、首先创建a表和b表,并插入数据

val spark: SparkSession = SparkSession.builder().master("local").appName("SparkOperateIceberg")

  //指定hadoop catalog,catalog名称为hadoop_prod

  .config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")

  .config("spark.sql.catalog.hadoop_prod.type", "hadoop")

  .config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://mycluster/sparkoperateiceberg")

  .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")

  .getOrCreate()

 

//创建一张表 a ,并插入数据

spark.sql(

  """

    |create table  hadoop_prod.default.a (id int,name string,age int) using iceberg

  """.stripMargin)

spark.sql(

  """

    |insert into hadoop_prod.default.a values (1,"zs",18),(2,"ls",19),(3,"ww",20)

  """.stripMargin)

 

//创建另外一张表b ,并插入数据

spark.sql(

  """

    |create table  hadoop_prod.default.b (id int,name string,age int,tp string) using iceberg

  """.stripMargin)

spark.sql(

  """

    |insert into hadoop_prod.default.b values (1,"zs",30,"delete"),(2,"李四",31,"update"),(4,"王五",32,"add")

  """.stripMargin)

2、使用MERGE INTO 语法向目标表更新、删除、新增数据

这里我们计划将b表与a表匹配id,如果b表中tp字段是"delete"那么a表中对应的id数据删除,如果b表中tp字段是"update",那么a表中对应的id数据其他字段进行更新,如果a表与b表id匹配不上,那么将b表中的数据插入到a表中,具体操作如下:

//将表b 中与表a中相同id的数据更新到表a,表a中没有表b中有的id对应数据写入增加到表a

spark.sql(

  """

    |merge into hadoop_prod.default.a  t1

    |using (select id,name ,age,tp from hadoop_prod.default.b) t2

    |on t1.id = t2.id

    |when matched and t2.tp = 'delete' then delete

    |when matched and t2.tp = 'update' then update set t1.name = t2.name,t1.age = t2.age

    |when not matched then insert (id,name,age) values (t2.id,t2.name,t2.age)

  """.stripMargin)

 

spark.sql("""select * from hadoop_prod.default.a """).show()

最终结果如下:

1.png

注意:更新数据时,在查询的数据中只能有一条匹配的数据更新到目标表,否则将报错。

3.   INSERT OVERWRITE

"insert overwrite"可以覆盖Iceberg表中的数据,这种操作会将表中全部数据替换掉,建议如果有部分数据替换操作可以使用"merge into"操作。

对于Iceberg分区表使用"insert overwrite"操作时,有两种情况,第一种是“动态覆盖”,第二种是“静态覆盖”。

动态分区覆盖:

动态覆盖会全量将原有数据覆盖,并将新插入的数据根据Iceberg表分区规则自动分区,类似Hive中的动态分区。

静态分区覆盖:

静态覆盖需要在向Iceberg中插入数据时需要手动指定分区,如果当前Iceberg表存在这个分区,那么只有这个分区的数据会被覆盖,其他分区数据不受影响,如果Iceberg表不存在这个分区,那么相当于给Iceberg表增加了个一个分区。具体操作如下:

3.1、创建三张表

创建test1分区表、test2普通表、test3普通表三张表,并插入数据,每张表字段相同,但是插入数据不同。

//创建 test1 分区表,并插入数据

spark.sql(

  """

    |create table  hadoop_prod.default.test1 (id int,name string,loc string)

    |using iceberg

    |partitioned by (loc)

  """.stripMargin)

 

spark.sql(

  """

    |insert into hadoop_prod.default.test1 values (1,"zs","beijing"),(2,"ls","shanghai")

  """.stripMargin)

//创建 test2 普通表,并插入数据

spark.sql(

  """

    |create table  hadoop_prod.default.test2 (id int,name string,loc string)

    |using iceberg

  """.stripMargin)

 

spark.sql(

  """

    |insert into hadoop_prod.default.test2 values (10,"x1","shandong"),(11,"x2","hunan")

  """.stripMargin)

//创建 test3 普通表,并插入数据

spark.sql(

  """

    |create table  hadoop_prod.default.test3 (id int,name string,loc string)

    |using iceberg

  """.stripMargin)

 

spark.sql(

  """

    |insert into hadoop_prod.default.test3 values (3,"ww","beijing"),(4,"ml","shanghai"),(5,"tq","guangzhou")

  """.stripMargin)

3.2、使用insert overwrite 读取test3表中的数据覆盖到test2表中

//使用insert overwrite 读取test3 表中的数据覆盖到test2 普通表中

spark.sql(

  """

    |insert overwrite hadoop_prod.default.test2

    |select id,name,loc from  hadoop_prod.default.test3

  """.stripMargin)

 

//查询 test2 表中的数据

spark.sql(

  """

    |select * from hadoop_prod.default.test2

  """.stripMargin).show()

Iceberg 表 test2结果如下:

2.png

3.3、使用insert overwrite 读取test3表数据,动态分区方式覆盖到表test1

// 使用insert overwrite 读取test3表数据 动态分区方式覆盖到表 test1

spark.sql(

  """

    |insert overwrite hadoop_prod.default.test1

    |select id,name,loc from  hadoop_prod.default.test3

  """.stripMargin)

 

//查询 test1 表数据

spark.sql(

  """

    |select * from hadoop_prod.default.test1

  """.stripMargin).show()

Iceberg 表 test1结果如下:

3.png

3.4、静态分区方式,将iceberg表test3的数据覆盖到Iceberg表test1中

这里可以将test1表删除,然后重新创建,加载数据,也可以直接读取test3中的数据静态分区方式更新到test1。另外,使用insert overwrite 语法覆盖静态分区方式时,查询的语句中就不要再次写入分区列,否则会重复。

//删除表test1,重新创建表test1 分区表,并插入数据

spark.sql(

  """

    |drop table hadoop_prod.default.test1

  """.stripMargin)

 

spark.sql(

  """

    |create table  hadoop_prod.default.test1 (id int,name string,loc string)

    |using iceberg

    |partitioned by (loc)

  """.stripMargin)

 

spark.sql(

  """

    |insert into hadoop_prod.default.test1 values (1,"zs","beijing"),(2,"ls","shanghai")

  """.stripMargin)

 

spark.sql("select * from hadoop_prod.default.test1").show()

Iceberg 表 test1结果如下:

4.png

//注意:指定静态分区"jiangsu",静态分区下,就不要在查询 “loc" 列了,否则重复

spark.sql(

  """

    |insert overwrite hadoop_prod.default.test1

    |partition (loc = "jiangsu")

    |select id,name from  hadoop_prod.default.test3

  """.stripMargin)

 

//查询 test1 表数据

spark.sql(

  """

    |select * from hadoop_prod.default.test1

  """.stripMargin).show()

Iceberg 表 test1结果如下:

5.png

注意:使用insert overwrite 读取test3表数据 静态分区方式覆盖到表 test1,表中其他分区数据不受影响,只会覆盖指定的静态分区数据。

4.    DELETE FROM

Spark3.x版本之后支持"Delete from"可以根据指定的where条件来删除表中数据。如果where条件匹配Iceberg表一个分区的数据,Iceberg仅会修改元数据,如果where条件匹配的表的单个行,则Iceberg会重写受影响行所在的数据文件。具体操作如下:

//创建表 delete_tbl ,并加载数据

spark.sql(

  """

    |create table hadoop_prod.default.delete_tbl (id int,name string,age int) using iceberg

    |""".stripMargin)

 

spark.sql(

  """

    |insert into hadoop_prod.default.delete_tbl values (1,"zs",18),(2,"ls",19),(3,"ww",20),(4,"ml",21),(5,"tq",22),(6,"gb",23)

  """.stripMargin)

 

//根据条件范围删除表 delete_tbl 中的数据

spark.sql(

  """

    |delete from hadoop_prod.default.delete_tbl where id >3 and id <6

  """.stripMargin)

 

spark.sql("select * from hadoop_prod.default.delete_tbl").show()

Iceberg 表 delete_tbl结果如下:

1.png

//根据条件删除表 delete_tbl 中的一条数据

spark.sql(

  """

    |delete from hadoop_prod.default.delete_tbl where id = 2

  """.stripMargin)

 

spark.sql("select * from hadoop_prod.default.delete_tbl").show()

Iceberg 表 delete_tbl结果如下:

2.png

5.   UPDATE

Spark3.x+版本支持了update更新数据操作,可以根据匹配的条件进行数据更新操作。

操作如下:

//创建表 delete_tbl ,并加载数据

spark.sql(

  """

    |create table hadoop_prod.default.update_tbl (id int,name string,age int) using iceberg

    |""".stripMargin)

spark.sql(

  """

    |insert into hadoop_prod.default.update_tbl values (1,"zs",18),(2,"ls",19),(3,"ww",20),(4,"ml",21),(5,"tq",22),(6,"gb",23)

  """.stripMargin)

通过“update”更新表中id小于等于3的数据name列改为“zhangsan”,age列改为30,操作如下:

//更新 delete_tbl 表

spark.sql(

  """

    |update hadoop_prod.default.update_tbl set name = 'zhangsan' ,age = 30

    |where id <=3

  """.stripMargin)

spark.sql(

  """

    |select * from hadoop_prod.default.update_tbl

  """.stripMargin).show()

Iceberg 表 update_tbl结果如下:

3.png

6.    DataFrame API 写入Iceberg

Spark向Iceberg中写数据时不仅可以使用SQL方式,也可以使用DataFrame Api方式操作Iceberg,建议使用SQL方式操作。

DataFrame创建Iceberg表分为创建普通表和分区表,创建分区表时需要指定分区列,分区列可以是多个列。创建表的语法如下:

df.write(tbl).create() 相当于 CREATE TABLE AS SELECT ... df.write(tbl).replace() 相当于 REPLACE TABLE AS SELECT ... df.write(tbl).append() 相当于 INSERT INTO ... df.write(tbl).overwritePartitions() 相当于动态 INSERT OVERWRITE ...

具体操作如下:

//1.准备数据,使用DataFrame Api 写入Iceberg表及分区表

val nameJsonList = List[String](

  "{\"id\":1,\"name\":\"zs\",\"age\":18,\"loc\":\"beijing\"}",

  "{\"id\":2,\"name\":\"ls\",\"age\":19,\"loc\":\"shanghai\"}",

  "{\"id\":3,\"name\":\"ww\",\"age\":20,\"loc\":\"beijing\"}",

  "{\"id\":4,\"name\":\"ml\",\"age\":21,\"loc\":\"shanghai\"}")

 

import spark.implicits._

val df: DataFrame = spark.read.json(nameJsonList.toDS)

 

//创建普通表df_tbl1,并将数据写入到Iceberg表,其中DF中的列就是Iceberg表中的列

df.writeTo("hadoop_prod.default.df_tbl1").create()

 

//查询表 hadoop_prod.default.df_tbl1 中的数据,并查看数据存储结构

spark.read.table("hadoop_prod.default.df_tbl1").show()

Iceberg 表 df_tbl1结果如下:

4.png

Iceberg 表 df_tbl1存储如下:

5.png

//创建分区表df_tbl2,并将数据写入到Iceberg表,其中DF中的列就是Iceberg表中的列

df.sortWithinPartitions($"loc")//写入分区表,必须按照分区列进行排序

  .writeTo("hadoop_prod.default.df_tbl2")

  .partitionedBy($"loc")//这里可以指定多个列为联合分区

  .create()

//查询分区表 hadoop_prod.default.df_tbl2 中的数据,并查看数据存储结构

spark.read.table("hadoop_prod.default.df_tbl2").show()

Iceberg 分区表 df_tbl2结果如下:

6.png

Iceberg 分区表 df_tbl2存储如下:

7.png


相关文章

Hbase rowkey设计原则

HBase 中的 rowkey 设计需要遵循以下原则:1 rowkey 唯一原则若在 HBase 中向同一张表插入相同 rowkey 的记录,如没有设置版本数量,则此 rowkey 原先的数据会被覆盖...

MySQL 评估 ALTER TABLE 进度(5.7)

MySQL 评估 ALTER TABLE 进度(5.7)

一、前言问题:大表里执行 ALTER TABLE 的时候,经常会比较忐忑,会面临 “跑又跑不完 Kill 也不敢 Kill” 的窘境。需求:客户在执行 ALTER TABLE 时也会让我们来评估影响的...

MySQL性能优化(三)函数运算导致无法使用索引

MySQL性能优化(三)函数运算导致无法使用索引

有时侯我们会遇到这样的情况:明明字段上已经建立了索引,但是查询还是无法使用索引。其中有一种情况是因为SQL中对索引字段进行了运算。一个例子select * from us...

MySQL运维实战之ProxySQL(9.2)ProxySQL安装和配置

proxysql安装proxysql提供了各个linux发行版的安装包,我们可以使用操作系统的包管理系统来安装proxysql。这里我们以CentOS 7为例:1、从github下载安装包根据OS版本...

高效便捷!解锁阿里云跨账号专线互联的全新实施方案

高效便捷!解锁阿里云跨账号专线互联的全新实施方案

01背    景为持续提升金融云环境的合规标准以及可用区内产品服务的性能和稳定性,阿里云将对杭州地域BCD三个金融云可用区进行基础设施架构升级与改造,对应可用区云产品将于 2024...

ranger审计Solr部署

安装前准备1.1. 创建用户和用户组groupadd solruseradd -g solr solr1.2. 添加环境变量vi /etc/profile export SOLR_HOME=/opt/...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。