数据湖技术之iceberg(十二)Flink与Iceberg整合-SQL API操作

南墨2年前技术文章799

1.SQL API 创建Iceberg表并写入数据

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);

 

        env.enableCheckpointing(1000);

 

        //1.创建Catalog

        tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +

                "'type'='iceberg'," +

                "'catalog-type'='hadoop'," +

                "'warehouse'='hdfs://mycluster/flink_iceberg')");

 

        //2.使用当前Catalog

        tblEnv.useCatalog("hadoop_iceberg");

 

        //3.创建数据库

        tblEnv.executeSql("create database iceberg_db");

 

        //4.使用数据库

        tblEnv.useDatabase("iceberg_db");

 

        //5.创建iceberg表 flink_iceberg_tbl

        tblEnv.executeSql("create table hadoop_iceberg.iceberg_db.flink_iceberg_tbl2(id int,name string,age int,loc string) partitioned by (loc)");

 

        //6.写入数据到表 flink_iceberg_tbl

        tblEnv.executeSql("insert into hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 values (1,'zs',18,'beijing'),(2,'ls',19,'shanghai'),(3,'ww',20,'guangzhou')");

在Hive中执行如下命令创建对应的Iceberg表:

#在Hive中创建Iceberg表

CREATE TABLE flink_iceberg_tbl2  (

  id int,

  name string,

  age int,

  loc string

)

STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'

LOCATION 'hdfs://mycluster/flink_iceberg/iceberg_db/flink_iceberg_tbl2'

TBLPROPERTIES ('iceberg.catalog'='location_based_table');

#在Hive中查询Iceberg表中的数据

hive> select * from flink_iceberg_tbl2;

OK

3   ww  20  guangzhou

1   zs  18  beijing

2   ls  19  shanghai

2.SQL API 批量查询Iceberg表数据

Flink SQL API 批量查询Iceberg表数据,直接查询显示即可。代码如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);

 

env.enableCheckpointing(1000);

 

//1.创建Catalog

tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +

    "'type'='iceberg'," +

    "'catalog-type'='hadoop'," +

    "'warehouse'='hdfs://mycluster/flink_iceberg')");

//2.批量读取表数据

TableResult tableResult = tblEnv.executeSql("select * from hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 ");

 

tableResult.print();

结果如下:

1.png

3.SQL API 实时查询Iceberg表数据

Flink SQL API 实时查询Iceberg表数据时需要设置参数“table.dynamic-table-options.enabled”为true,以支持SQL语法中的“OPTIONS”选项,代码如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);

 

env.enableCheckpointing(1000);

 

Configuration configuration = tblEnv.getConfig().getConfiguration();

// 支持SQL语法中的 OPTIONS 选项

configuration.setBoolean("table.dynamic-table-options.enabled", true);

 

//1.创建Catalog

tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +

        "'type'='iceberg'," +

        "'catalog-type'='hadoop'," +

        "'warehouse'='hdfs://mycluster/flink_iceberg')");

 

//2.从Iceberg表当前快照读取所有数据,并继续增量读取数据

// streaming指定为true支持实时读取数据,monitor_interval 监控数据的间隔,默认1s

TableResult tableResult = tblEnv.executeSql("select * from hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/");

 

tableResult.print();

启动以上代码后,可以看到会将目前存在于Iceberg表中的数据读取出来,向Hive中对应的Iceberg表中插入数据,可以看到控制台实时获取数据。

#在向Hive的Iceberg表中插入数据之前需要加入以下两个包:

add jar /software/hive-3.1.2/lib/iceberg-hive-runtime-0.12.1.jar;

add jar /software/hive-3.1.2/lib/libfb303-0.9.3.jar;

 

#向Hive 中Iceberg 表插入两条数据

hive> insert into flink_iceberg_tbl2 values (4,'ml',30,'shenzhen'),(5,'tq',31,'beijing');

在控制台可以看到实时新增数据

2.png

4.SQL API指定基于快照实时增量读取数据

Flink SQL API 还支持基于某个snapshot-id来继续实时获取数据,代码如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);

env.enableCheckpointing(1000);

 

Configuration configuration = tblEnv.getConfig().getConfiguration();

// 支持SQL语法中的 OPTIONS 选项

configuration.setBoolean("table.dynamic-table-options.enabled", true);

 

//1.创建Catalog

tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +

      "'type'='iceberg'," +

      "'catalog-type'='hadoop'," +

      "'warehouse'='hdfs://mycluster/flink_iceberg')");

 

//2.从Iceberg 指定的快照继续实时读取数据,快照ID从对应的元数据中获取

//start-snapshot-id :快照ID

TableResult tableResult2 = tblEnv.executeSql("SELECT * FROM hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-snapshot-id'='3821550127947089987')*/");

tableResult2.print();

SQL API 读取Kafka数据实时写入Iceberg表

从Kafka中实时读取数据写入到Iceberg表中,操作步骤如下:

一、首先需要创建对应的Iceberg表

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);

env.enableCheckpointing(1000);

//1.创建Catalog

tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +

        "'type'='iceberg'," +

        "'catalog-type'='hadoop'," +

        "'warehouse'='hdfs://mycluster/flink_iceberg')");

//2.创建iceberg表 flink_iceberg_tbl

tblEnv.executeSql("create table hadoop_iceberg.iceberg_db.flink_iceberg_tbl3(id int,name string,age int,loc string) partitioned by (loc)");

二、编写代码读取Kafka数据实时写入Iceberg

public class ReadKafkaToIceberg {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);

        env.enableCheckpointing(1000);

 

        /**

         * 1.需要预先创建 Catalog 及Iceberg表

         */

        //1.创建Catalog

        tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +

                "'type'='iceberg'," +

                "'catalog-type'='hadoop'," +

                "'warehouse'='hdfs://mycluster/flink_iceberg')");

 

        //2.创建iceberg表 flink_iceberg_tbl

//        tblEnv.executeSql("create table hadoop_iceberg.iceberg_db.flink_iceberg_tbl3(id int,name string,age int,loc string) partitioned by (loc)");

 

        //3.创建 Kafka Connector,连接消费Kafka中数据

        tblEnv.executeSql("create table kafka_input_table(" +

                " id int," +

                " name varchar," +

                " age int," +

                " loc varchar" +

                ") with (" +

                " 'connector' = 'kafka'," +

                " 'topic' = 'flink-iceberg-topic'," +

                " 'properties.bootstrap.servers'='node1:9092,node2:9092,node3:9092'," +

                " 'scan.startup.mode'='latest-offset'," +

                " 'properties.group.id' = 'my-group-id'," +

                " 'format' = 'csv'" +

                ")");

 

        //4.配置 table.dynamic-table-options.enabled

        Configuration configuration = tblEnv.getConfig().getConfiguration();

        // 支持SQL语法中的 OPTIONS 选项

        configuration.setBoolean("table.dynamic-table-options.enabled", true);

 

        //5.写入数据到表 flink_iceberg_tbl3

        tblEnv.executeSql("insert into hadoop_iceberg.iceberg_db.flink_iceberg_tbl3 select id,name,age,loc from kafka_input_table");

 

        //6.查询表数据

        TableResult tableResult = tblEnv.executeSql("select * from hadoop_iceberg.iceberg_db.flink_iceberg_tbl3 /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/");

        tableResult.print();

    }

}

启动以上代码,向Kafka topic中生产如下数据:

1,zs,18,beijing

2,ls,19,shanghai

3,ww,20,beijing

4,ml,21,shanghai

我们可以看到控制台上有对应实时数据输出,查看对应的Icberg HDFS目录,数据写入成功。


相关文章

HDFS分层存储(一)

1、介绍Hadoop分布式文件系统支持在HDFS中的各种存储类型。现在,您可以为DataNode数据目录指定不同的存储类型,这样可以根据数据使用频率优化数据使用并降低成本。例如需要频繁使用的数据,可以...

Linux命令traceroute—追踪网络路由利器

说明:通过traceroute我们可以知道信息从你的计算机到互联网另一端的主机是走的什么路径。当然每次数据包由某一同样的出发点(source)到达某一同样的目的地(destination)走的路径可能...

DRDS 整库恢复介绍

DRDS 整库恢复介绍

1 整库恢复注意事项1、PolarDB-X 1.0自动备份策略默认关闭,需要您手动开启。PolarDB-X 1.0日志备份能力依赖下层RDS,PolarDB-X1.0控制台设置的日志备份策略会自动同步...

oracle 回退db补丁31718723报错ins_emagent.mk:113: warning: overriding recipe for target `nmosudo'

报错内容如下:ins_emagent.mk:113: warning: overriding recipe for target `nmosudo'原因:该补丁的readme有说明该报错如下图参考mo...

网络数据链路层-MAC帧(1)

网络数据链路层-MAC帧(1)

1.数据链路层数据链路层是网络协议栈中最底层的内容,而在之前对其他层次的学习让我们知道传输层可以保证数据的可靠性问题,网络层保证数据跨网络转发的路由问题,而数据链路层解决的就是局域网内两台主机间通信的...

分布式存储-GlusterFS

分布式存储-GlusterFS

一、分布式存储介绍我们知道NAS是远程通过网络共享目录, SAN是远程通过网络共享块设备。那么分布式存储你可以看作拥有多台存储服务器连接起来的存储输出端。把这多台存储服务器的存储合起来做成一个整体再通...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。