数据湖技术之iceberg(十二)Flink与Iceberg整合-SQL API操作

南墨2年前技术文章1100

1.SQL API 创建Iceberg表并写入数据

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);

 

        env.enableCheckpointing(1000);

 

        //1.创建Catalog

        tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +

                "'type'='iceberg'," +

                "'catalog-type'='hadoop'," +

                "'warehouse'='hdfs://mycluster/flink_iceberg')");

 

        //2.使用当前Catalog

        tblEnv.useCatalog("hadoop_iceberg");

 

        //3.创建数据库

        tblEnv.executeSql("create database iceberg_db");

 

        //4.使用数据库

        tblEnv.useDatabase("iceberg_db");

 

        //5.创建iceberg表 flink_iceberg_tbl

        tblEnv.executeSql("create table hadoop_iceberg.iceberg_db.flink_iceberg_tbl2(id int,name string,age int,loc string) partitioned by (loc)");

 

        //6.写入数据到表 flink_iceberg_tbl

        tblEnv.executeSql("insert into hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 values (1,'zs',18,'beijing'),(2,'ls',19,'shanghai'),(3,'ww',20,'guangzhou')");

在Hive中执行如下命令创建对应的Iceberg表:

#在Hive中创建Iceberg表

CREATE TABLE flink_iceberg_tbl2  (

  id int,

  name string,

  age int,

  loc string

)

STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'

LOCATION 'hdfs://mycluster/flink_iceberg/iceberg_db/flink_iceberg_tbl2'

TBLPROPERTIES ('iceberg.catalog'='location_based_table');

#在Hive中查询Iceberg表中的数据

hive> select * from flink_iceberg_tbl2;

OK

3   ww  20  guangzhou

1   zs  18  beijing

2   ls  19  shanghai

2.SQL API 批量查询Iceberg表数据

Flink SQL API 批量查询Iceberg表数据,直接查询显示即可。代码如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);

 

env.enableCheckpointing(1000);

 

//1.创建Catalog

tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +

    "'type'='iceberg'," +

    "'catalog-type'='hadoop'," +

    "'warehouse'='hdfs://mycluster/flink_iceberg')");

//2.批量读取表数据

TableResult tableResult = tblEnv.executeSql("select * from hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 ");

 

tableResult.print();

结果如下:

1.png

3.SQL API 实时查询Iceberg表数据

Flink SQL API 实时查询Iceberg表数据时需要设置参数“table.dynamic-table-options.enabled”为true,以支持SQL语法中的“OPTIONS”选项,代码如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);

 

env.enableCheckpointing(1000);

 

Configuration configuration = tblEnv.getConfig().getConfiguration();

// 支持SQL语法中的 OPTIONS 选项

configuration.setBoolean("table.dynamic-table-options.enabled", true);

 

//1.创建Catalog

tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +

        "'type'='iceberg'," +

        "'catalog-type'='hadoop'," +

        "'warehouse'='hdfs://mycluster/flink_iceberg')");

 

//2.从Iceberg表当前快照读取所有数据,并继续增量读取数据

// streaming指定为true支持实时读取数据,monitor_interval 监控数据的间隔,默认1s

TableResult tableResult = tblEnv.executeSql("select * from hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/");

 

tableResult.print();

启动以上代码后,可以看到会将目前存在于Iceberg表中的数据读取出来,向Hive中对应的Iceberg表中插入数据,可以看到控制台实时获取数据。

#在向Hive的Iceberg表中插入数据之前需要加入以下两个包:

add jar /software/hive-3.1.2/lib/iceberg-hive-runtime-0.12.1.jar;

add jar /software/hive-3.1.2/lib/libfb303-0.9.3.jar;

 

#向Hive 中Iceberg 表插入两条数据

hive> insert into flink_iceberg_tbl2 values (4,'ml',30,'shenzhen'),(5,'tq',31,'beijing');

在控制台可以看到实时新增数据

2.png

4.SQL API指定基于快照实时增量读取数据

Flink SQL API 还支持基于某个snapshot-id来继续实时获取数据,代码如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);

env.enableCheckpointing(1000);

 

Configuration configuration = tblEnv.getConfig().getConfiguration();

// 支持SQL语法中的 OPTIONS 选项

configuration.setBoolean("table.dynamic-table-options.enabled", true);

 

//1.创建Catalog

tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +

      "'type'='iceberg'," +

      "'catalog-type'='hadoop'," +

      "'warehouse'='hdfs://mycluster/flink_iceberg')");

 

//2.从Iceberg 指定的快照继续实时读取数据,快照ID从对应的元数据中获取

//start-snapshot-id :快照ID

TableResult tableResult2 = tblEnv.executeSql("SELECT * FROM hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-snapshot-id'='3821550127947089987')*/");

tableResult2.print();

SQL API 读取Kafka数据实时写入Iceberg表

从Kafka中实时读取数据写入到Iceberg表中,操作步骤如下:

一、首先需要创建对应的Iceberg表

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);

env.enableCheckpointing(1000);

//1.创建Catalog

tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +

        "'type'='iceberg'," +

        "'catalog-type'='hadoop'," +

        "'warehouse'='hdfs://mycluster/flink_iceberg')");

//2.创建iceberg表 flink_iceberg_tbl

tblEnv.executeSql("create table hadoop_iceberg.iceberg_db.flink_iceberg_tbl3(id int,name string,age int,loc string) partitioned by (loc)");

二、编写代码读取Kafka数据实时写入Iceberg

public class ReadKafkaToIceberg {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);

        env.enableCheckpointing(1000);

 

        /**

         * 1.需要预先创建 Catalog 及Iceberg表

         */

        //1.创建Catalog

        tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +

                "'type'='iceberg'," +

                "'catalog-type'='hadoop'," +

                "'warehouse'='hdfs://mycluster/flink_iceberg')");

 

        //2.创建iceberg表 flink_iceberg_tbl

//        tblEnv.executeSql("create table hadoop_iceberg.iceberg_db.flink_iceberg_tbl3(id int,name string,age int,loc string) partitioned by (loc)");

 

        //3.创建 Kafka Connector,连接消费Kafka中数据

        tblEnv.executeSql("create table kafka_input_table(" +

                " id int," +

                " name varchar," +

                " age int," +

                " loc varchar" +

                ") with (" +

                " 'connector' = 'kafka'," +

                " 'topic' = 'flink-iceberg-topic'," +

                " 'properties.bootstrap.servers'='node1:9092,node2:9092,node3:9092'," +

                " 'scan.startup.mode'='latest-offset'," +

                " 'properties.group.id' = 'my-group-id'," +

                " 'format' = 'csv'" +

                ")");

 

        //4.配置 table.dynamic-table-options.enabled

        Configuration configuration = tblEnv.getConfig().getConfiguration();

        // 支持SQL语法中的 OPTIONS 选项

        configuration.setBoolean("table.dynamic-table-options.enabled", true);

 

        //5.写入数据到表 flink_iceberg_tbl3

        tblEnv.executeSql("insert into hadoop_iceberg.iceberg_db.flink_iceberg_tbl3 select id,name,age,loc from kafka_input_table");

 

        //6.查询表数据

        TableResult tableResult = tblEnv.executeSql("select * from hadoop_iceberg.iceberg_db.flink_iceberg_tbl3 /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/");

        tableResult.print();

    }

}

启动以上代码,向Kafka topic中生产如下数据:

1,zs,18,beijing

2,ls,19,shanghai

3,ww,20,beijing

4,ml,21,shanghai

我们可以看到控制台上有对应实时数据输出,查看对应的Icberg HDFS目录,数据写入成功。


相关文章

Sentry管理Hive目录acl -setacl不生效

Sentry管理Hive目录acl -setacl不生效

CDH在启动Sentry后/user/hive/warehouse这个目录 hdfs手动setacl会不生效首先确保hdfs参数dfs.namenode.acls.enabled=true;还有另一个...

MySQL运维实战(4.8) SQL_MODE之NO_ENGINE_SUBSTITUTION

开启NO_ENGINE_SUBSTITUTION,建表时如果指定的存储引擎不可用或不存在,SQL报错。否则会使用默认的存储引擎替换。如果不设置NO_ENGINE_SUBSTITUTION,建表时指定的...

SQL Server优化入门系列(三)—— 性能计数器(performance counter)

SQL Server优化入门系列(三)—— 性能计数器(performance counter)

说明Performance Counter是windows系统中通用的性能分析工具。Windows OS和SQL Server暴露了很多Performance Counter,可用户分析整个系统的运行...

Python 查询字符串转字典

Python 标准库中的 urllib.parse.parse_qs() 函数可以将查询字符串解析为一个字典,这个函数会将查询字符串中的键值对解析为字典的键和值。实例:from urllib.pars...

MySQL优化器特性(二)MRR优化

MySQL优化器特性(二)MRR优化

Index Range Scan索引范围扫描的一般步骤:1、根据where条件,从B+树定位到第一条记录。2、从索引页子节点中获取到行号(rowid),根据rowid回表查询数据。3、使用额外的whe...

EasyMR之Yarn资源队列管理

EasyMR之Yarn资源队列管理

设想一下,你现在所在的公司有一套线上的hadoop集群。A部门经常做一些定时的BI报表,B部门则经常使用一些软件做一些临时需求。那么他们肯定会遇到同时提交任务的场景,这个时候到底如何分配资源满足这两个...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。