数据湖技术之iceberg(七)Spark管理iceberg表

南墨2年前技术文章1353

1.SparkSQL设置catalog配置

以下操作主要是SparkSQL操作Iceberg,同样Spark中支持两种Catalog的设置:hive和hadoop,Hive Catalog就是iceberg表存储使用Hive默认的数据路径,Hadoop Catalog需要指定Iceberg格式表存储路径。

在SparkSQL代码中通过以下方式来指定使用的Catalog:

val spark: SparkSession = SparkSession.builder().master("local").appName("SparkOperateIceberg")

  //指定hive catalog, catalog名称为hive_prod

  .config("spark.sql.catalog.hive_prod", "org.apache.iceberg.spark.SparkCatalog")

  .config("spark.sql.catalog.hive_prod.type", "hive")

  .config("spark.sql.catalog.hive_prod.uri", "thrift://node1:9083")

  .config("iceberg.engine.hive.enabled", "true")

 

  //指定hadoop catalog,catalog名称为hadoop_prod

  .config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")

  .config("spark.sql.catalog.hadoop_prod.type", "hadoop")

  .config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://mycluster/sparkoperateiceberg")

  .getOrCreate()

2.使用Hive Catalog管理Iceberg表

使用Hive Catalog管理Iceberg表默认数据存储在Hive对应的Warehouse目录下,在Hive中会自动创建对应的Iceberg表,SparkSQL 相当于是Hive客户端,需要额外设置“iceberg.engine.hive.enabled”属性为true,否则在Hive对应的Iceberg格式表中查询不到数据。

1、创建表

//创建表 ,hive_pord:指定catalog名称。default:指定Hive中存在的库。test:创建的iceberg表名。

spark.sql(

      """

        | create table if not exists hive_prod.default.test(id int,name string,age int) using iceberg

      """.stripMargin)

注意:

1)创建表时,表名称为:${catalog名称}.${Hive中库名}.${创建的Iceberg格式表名}

2)表创建之后,可以在Hive中查询到对应的test表,创建的是Hive外表,在对应的Hive warehouse 目录下可以看到对应的数据目录。

1.png

2、插入数据

//插入数据

spark.sql(

  """

    |insert into hive_prod.default.test values (1,"zs",18),(2,"ls",19),(3,"ww",20)

  """.stripMargin)

3、查询数据

//查询数据

spark.sql(

  """

    |select * from hive_prod.default.test

  """.stripMargin).show()

结果如下:

2.png

在Hive对应的test表中也能查询到数据:

3.png

4、删除表

//删除表,删除表对应的数据不会被删除

spark.sql(

  """

    |drop table hive_prod.default.test

  """.stripMargin)

注意:删除表后,数据会被删除,但是表目录还是存在,如果彻底删除数据,需要把对应的表目录删除。

3.用Hadoop Catalog管理Iceberg表

使用Hadoop Catalog管理表,需要指定对应Iceberg存储数据的目录。

1、创建表

//创建表 ,hadoop_prod:指定Hadoop catalog名称。default:指定库名称。test:创建的iceberg表名。

spark.sql(

  """

    | create table if not exists hadoop_prod.default.test(id int,name string,age int) using iceberg

  """.stripMargin)

注意:

1)创建表名称为:${Hadoop Catalog名称}.${随意定义的库名}.${Iceberg格式表名}

2)创建表后,会在hadoop_prod名称对应的目录下创建该表

4.png

2、插入数据

//插入数据

spark.sql(

  """

    |insert into hadoop_prod.default.test values (1,"zs",18),(2,"ls",19),(3,"ww",20)

  """.stripMargin)

3、查询数据

spark.sql(

  """

    |select * from hadoop_prod.default.test

  """.stripMargin).show()

5.png

4、创建对应的Hive表映射数据

在Hive表中执行如下建表语句:

CREATE TABLE hdfs_iceberg  (

  id int,

  name string,

  age int

)

STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'

LOCATION 'hdfs://mycluster/sparkoperateiceberg/default/test'

TBLPROPERTIES ('iceberg.catalog'='location_based_table');

在Hive中查询“hdfs_iceberg”表数据如下:

6.png

5、删除表

spark.sql(

  """

    |drop table hadoop_prod.default.test

  """.stripMargin)

注意:删除iceberg表后,数据被删除,对应的库目录存在。


相关文章

Python 并发编程 Futures

Python 并发编程 Futures

说明编程中如果能合理利用编程语言的并发编程技巧,都可以极大提升程序的性能。在 Python 3.2 版本为用户提供了一个标准库 concurrent.futures 可以实现进程池 和 线程池,本篇文...

变更 Rancher Server IP 或域名

变更 Rancher Server IP 或域名

一.背景由于各种原因导致的需要对rancher的Server IP或者域名进行变更(更改访问地址,更改公网IP地址等)二.流程图三.操作前了解相关配置和要求相关官方文档rancher:日常使用的ran...

xx客户大数据相关问题答疑

xx客户大数据相关问题答疑

1、官方发布的补丁是否可以在CDH5.X上patch?如果是cdh的包,需要在cdh官方给出相关补丁包,然后我们可以进行补丁操作。如果是开源的包,是无法进行补丁操作的,因为cdh会对开源有些细节点的适...

Flume使用案例之实时读取目录到HDFS

目标:使用flume监听整个目录的文件分步实现:1. 创建配置文件flume-dir.conf#1 Agenta3.sources = r3a3.sinks = k3a3.channels = c3 ...

Ranger中Solr审计日志配置修改

Ranger中Solr审计日志配置修改

1、获取solr 中的rangeraudits的配置#查看其中的配置及 solrctl instancedir --list#获取配置 solrctl instancedir --get rang...

MySQL运维实战之备份和恢复(8.1)xtrabackup全量备份

xtrabackup是percona开源的mysql物理备份工具。xtrabackup 8.0支持mysql 8.0版本的备份和恢复。xtrabackup 2.4支持mysql 5.7及以下版本的备份...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。