数据湖技术之iceberg(十二)Flink与Iceberg整合-SQL API操作

南墨2年前技术文章702

1.SQL API 创建Iceberg表并写入数据

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);

 

        env.enableCheckpointing(1000);

 

        //1.创建Catalog

        tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +

                "'type'='iceberg'," +

                "'catalog-type'='hadoop'," +

                "'warehouse'='hdfs://mycluster/flink_iceberg')");

 

        //2.使用当前Catalog

        tblEnv.useCatalog("hadoop_iceberg");

 

        //3.创建数据库

        tblEnv.executeSql("create database iceberg_db");

 

        //4.使用数据库

        tblEnv.useDatabase("iceberg_db");

 

        //5.创建iceberg表 flink_iceberg_tbl

        tblEnv.executeSql("create table hadoop_iceberg.iceberg_db.flink_iceberg_tbl2(id int,name string,age int,loc string) partitioned by (loc)");

 

        //6.写入数据到表 flink_iceberg_tbl

        tblEnv.executeSql("insert into hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 values (1,'zs',18,'beijing'),(2,'ls',19,'shanghai'),(3,'ww',20,'guangzhou')");

在Hive中执行如下命令创建对应的Iceberg表:

#在Hive中创建Iceberg表

CREATE TABLE flink_iceberg_tbl2  (

  id int,

  name string,

  age int,

  loc string

)

STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'

LOCATION 'hdfs://mycluster/flink_iceberg/iceberg_db/flink_iceberg_tbl2'

TBLPROPERTIES ('iceberg.catalog'='location_based_table');

#在Hive中查询Iceberg表中的数据

hive> select * from flink_iceberg_tbl2;

OK

3   ww  20  guangzhou

1   zs  18  beijing

2   ls  19  shanghai

2.SQL API 批量查询Iceberg表数据

Flink SQL API 批量查询Iceberg表数据,直接查询显示即可。代码如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);

 

env.enableCheckpointing(1000);

 

//1.创建Catalog

tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +

    "'type'='iceberg'," +

    "'catalog-type'='hadoop'," +

    "'warehouse'='hdfs://mycluster/flink_iceberg')");

//2.批量读取表数据

TableResult tableResult = tblEnv.executeSql("select * from hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 ");

 

tableResult.print();

结果如下:

1.png

3.SQL API 实时查询Iceberg表数据

Flink SQL API 实时查询Iceberg表数据时需要设置参数“table.dynamic-table-options.enabled”为true,以支持SQL语法中的“OPTIONS”选项,代码如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);

 

env.enableCheckpointing(1000);

 

Configuration configuration = tblEnv.getConfig().getConfiguration();

// 支持SQL语法中的 OPTIONS 选项

configuration.setBoolean("table.dynamic-table-options.enabled", true);

 

//1.创建Catalog

tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +

        "'type'='iceberg'," +

        "'catalog-type'='hadoop'," +

        "'warehouse'='hdfs://mycluster/flink_iceberg')");

 

//2.从Iceberg表当前快照读取所有数据,并继续增量读取数据

// streaming指定为true支持实时读取数据,monitor_interval 监控数据的间隔,默认1s

TableResult tableResult = tblEnv.executeSql("select * from hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/");

 

tableResult.print();

启动以上代码后,可以看到会将目前存在于Iceberg表中的数据读取出来,向Hive中对应的Iceberg表中插入数据,可以看到控制台实时获取数据。

#在向Hive的Iceberg表中插入数据之前需要加入以下两个包:

add jar /software/hive-3.1.2/lib/iceberg-hive-runtime-0.12.1.jar;

add jar /software/hive-3.1.2/lib/libfb303-0.9.3.jar;

 

#向Hive 中Iceberg 表插入两条数据

hive> insert into flink_iceberg_tbl2 values (4,'ml',30,'shenzhen'),(5,'tq',31,'beijing');

在控制台可以看到实时新增数据

2.png

4.SQL API指定基于快照实时增量读取数据

Flink SQL API 还支持基于某个snapshot-id来继续实时获取数据,代码如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);

env.enableCheckpointing(1000);

 

Configuration configuration = tblEnv.getConfig().getConfiguration();

// 支持SQL语法中的 OPTIONS 选项

configuration.setBoolean("table.dynamic-table-options.enabled", true);

 

//1.创建Catalog

tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +

      "'type'='iceberg'," +

      "'catalog-type'='hadoop'," +

      "'warehouse'='hdfs://mycluster/flink_iceberg')");

 

//2.从Iceberg 指定的快照继续实时读取数据,快照ID从对应的元数据中获取

//start-snapshot-id :快照ID

TableResult tableResult2 = tblEnv.executeSql("SELECT * FROM hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-snapshot-id'='3821550127947089987')*/");

tableResult2.print();

SQL API 读取Kafka数据实时写入Iceberg表

从Kafka中实时读取数据写入到Iceberg表中,操作步骤如下:

一、首先需要创建对应的Iceberg表

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);

env.enableCheckpointing(1000);

//1.创建Catalog

tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +

        "'type'='iceberg'," +

        "'catalog-type'='hadoop'," +

        "'warehouse'='hdfs://mycluster/flink_iceberg')");

//2.创建iceberg表 flink_iceberg_tbl

tblEnv.executeSql("create table hadoop_iceberg.iceberg_db.flink_iceberg_tbl3(id int,name string,age int,loc string) partitioned by (loc)");

二、编写代码读取Kafka数据实时写入Iceberg

public class ReadKafkaToIceberg {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);

        env.enableCheckpointing(1000);

 

        /**

         * 1.需要预先创建 Catalog 及Iceberg表

         */

        //1.创建Catalog

        tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +

                "'type'='iceberg'," +

                "'catalog-type'='hadoop'," +

                "'warehouse'='hdfs://mycluster/flink_iceberg')");

 

        //2.创建iceberg表 flink_iceberg_tbl

//        tblEnv.executeSql("create table hadoop_iceberg.iceberg_db.flink_iceberg_tbl3(id int,name string,age int,loc string) partitioned by (loc)");

 

        //3.创建 Kafka Connector,连接消费Kafka中数据

        tblEnv.executeSql("create table kafka_input_table(" +

                " id int," +

                " name varchar," +

                " age int," +

                " loc varchar" +

                ") with (" +

                " 'connector' = 'kafka'," +

                " 'topic' = 'flink-iceberg-topic'," +

                " 'properties.bootstrap.servers'='node1:9092,node2:9092,node3:9092'," +

                " 'scan.startup.mode'='latest-offset'," +

                " 'properties.group.id' = 'my-group-id'," +

                " 'format' = 'csv'" +

                ")");

 

        //4.配置 table.dynamic-table-options.enabled

        Configuration configuration = tblEnv.getConfig().getConfiguration();

        // 支持SQL语法中的 OPTIONS 选项

        configuration.setBoolean("table.dynamic-table-options.enabled", true);

 

        //5.写入数据到表 flink_iceberg_tbl3

        tblEnv.executeSql("insert into hadoop_iceberg.iceberg_db.flink_iceberg_tbl3 select id,name,age,loc from kafka_input_table");

 

        //6.查询表数据

        TableResult tableResult = tblEnv.executeSql("select * from hadoop_iceberg.iceberg_db.flink_iceberg_tbl3 /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/");

        tableResult.print();

    }

}

启动以上代码,向Kafka topic中生产如下数据:

1,zs,18,beijing

2,ls,19,shanghai

3,ww,20,beijing

4,ml,21,shanghai

我们可以看到控制台上有对应实时数据输出,查看对应的Icberg HDFS目录,数据写入成功。


相关文章

pgbench 压测工具

pgbench 压测工具

一、基本参数pgbench工具是Postgres自带的一款轻量型基准压测工具。它自定义相关场景下脚本进行1.1 初始化参数参数含义-i / --initialize调用初始化模式-I init_ste...

大数据高可用系列--kudu高可用应急方案

大数据高可用系列--kudu高可用应急方案

1 设置机架感知1.1 前置说明    1.9版本后的kudu已经支持机架感知(cdh6之后的版本中的kudu已支持),由于kudu的每个Tablet一般是三副...

手动模拟 flannel 的 vxlan 实现节点命名空间互通

手动模拟 flannel 的 vxlan 实现节点命名空间互通

在flannel网络插件中实现两个节点互通方式有host-gw vxlan ipip等方式,之前已经手动模拟过host-gw模式,此处再模拟一下vxlan模式点对点访问。手动实验两个命名空间通信Nod...

mysql查线上数据注意数据库的隔离级别

数据库的隔离级别定义了一个事务可能对其他并发事务的可见性,以及它们可能对数据库的影响。隔离级别的选择影响着并发性能和数据的一致性,不同的隔离级别能够防止不同程度的并发问题,如脏读(Dirty Read...

Ubuntu 网卡启动及配置

Ubuntu 网卡启动及配置

问题分析打开虚拟机后发现没有网卡网络。查看网卡信息sudo ip link set ens33 up1得到本机的所有网卡信息,例如我这边网卡为ens33启动网卡启动网卡后发现依然网卡没有IP地址。配置...

Phoenix 安装使用文档

Phoenix 安装使用文档

Phoenix 安装使用文档下载Phoenix直接到Phoenix官网下载安装包,官网地址为:http://mirror.bit.edu.cn/apache/phoenix/备注:Phoenix 版本...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。