数据湖技术之iceberg(十二)Flink与Iceberg整合-SQL API操作

南墨11个月前技术文章413

1.SQL API 创建Iceberg表并写入数据

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);

 

        env.enableCheckpointing(1000);

 

        //1.创建Catalog

        tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +

                "'type'='iceberg'," +

                "'catalog-type'='hadoop'," +

                "'warehouse'='hdfs://mycluster/flink_iceberg')");

 

        //2.使用当前Catalog

        tblEnv.useCatalog("hadoop_iceberg");

 

        //3.创建数据库

        tblEnv.executeSql("create database iceberg_db");

 

        //4.使用数据库

        tblEnv.useDatabase("iceberg_db");

 

        //5.创建iceberg表 flink_iceberg_tbl

        tblEnv.executeSql("create table hadoop_iceberg.iceberg_db.flink_iceberg_tbl2(id int,name string,age int,loc string) partitioned by (loc)");

 

        //6.写入数据到表 flink_iceberg_tbl

        tblEnv.executeSql("insert into hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 values (1,'zs',18,'beijing'),(2,'ls',19,'shanghai'),(3,'ww',20,'guangzhou')");

在Hive中执行如下命令创建对应的Iceberg表:

#在Hive中创建Iceberg表

CREATE TABLE flink_iceberg_tbl2  (

  id int,

  name string,

  age int,

  loc string

)

STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'

LOCATION 'hdfs://mycluster/flink_iceberg/iceberg_db/flink_iceberg_tbl2'

TBLPROPERTIES ('iceberg.catalog'='location_based_table');

#在Hive中查询Iceberg表中的数据

hive> select * from flink_iceberg_tbl2;

OK

3   ww  20  guangzhou

1   zs  18  beijing

2   ls  19  shanghai

2.SQL API 批量查询Iceberg表数据

Flink SQL API 批量查询Iceberg表数据,直接查询显示即可。代码如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);

 

env.enableCheckpointing(1000);

 

//1.创建Catalog

tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +

    "'type'='iceberg'," +

    "'catalog-type'='hadoop'," +

    "'warehouse'='hdfs://mycluster/flink_iceberg')");

//2.批量读取表数据

TableResult tableResult = tblEnv.executeSql("select * from hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 ");

 

tableResult.print();

结果如下:

1.png

3.SQL API 实时查询Iceberg表数据

Flink SQL API 实时查询Iceberg表数据时需要设置参数“table.dynamic-table-options.enabled”为true,以支持SQL语法中的“OPTIONS”选项,代码如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);

 

env.enableCheckpointing(1000);

 

Configuration configuration = tblEnv.getConfig().getConfiguration();

// 支持SQL语法中的 OPTIONS 选项

configuration.setBoolean("table.dynamic-table-options.enabled", true);

 

//1.创建Catalog

tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +

        "'type'='iceberg'," +

        "'catalog-type'='hadoop'," +

        "'warehouse'='hdfs://mycluster/flink_iceberg')");

 

//2.从Iceberg表当前快照读取所有数据,并继续增量读取数据

// streaming指定为true支持实时读取数据,monitor_interval 监控数据的间隔,默认1s

TableResult tableResult = tblEnv.executeSql("select * from hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/");

 

tableResult.print();

启动以上代码后,可以看到会将目前存在于Iceberg表中的数据读取出来,向Hive中对应的Iceberg表中插入数据,可以看到控制台实时获取数据。

#在向Hive的Iceberg表中插入数据之前需要加入以下两个包:

add jar /software/hive-3.1.2/lib/iceberg-hive-runtime-0.12.1.jar;

add jar /software/hive-3.1.2/lib/libfb303-0.9.3.jar;

 

#向Hive 中Iceberg 表插入两条数据

hive> insert into flink_iceberg_tbl2 values (4,'ml',30,'shenzhen'),(5,'tq',31,'beijing');

在控制台可以看到实时新增数据

2.png

4.SQL API指定基于快照实时增量读取数据

Flink SQL API 还支持基于某个snapshot-id来继续实时获取数据,代码如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);

env.enableCheckpointing(1000);

 

Configuration configuration = tblEnv.getConfig().getConfiguration();

// 支持SQL语法中的 OPTIONS 选项

configuration.setBoolean("table.dynamic-table-options.enabled", true);

 

//1.创建Catalog

tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +

      "'type'='iceberg'," +

      "'catalog-type'='hadoop'," +

      "'warehouse'='hdfs://mycluster/flink_iceberg')");

 

//2.从Iceberg 指定的快照继续实时读取数据,快照ID从对应的元数据中获取

//start-snapshot-id :快照ID

TableResult tableResult2 = tblEnv.executeSql("SELECT * FROM hadoop_iceberg.iceberg_db.flink_iceberg_tbl2 /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-snapshot-id'='3821550127947089987')*/");

tableResult2.print();

SQL API 读取Kafka数据实时写入Iceberg表

从Kafka中实时读取数据写入到Iceberg表中,操作步骤如下:

一、首先需要创建对应的Iceberg表

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);

env.enableCheckpointing(1000);

//1.创建Catalog

tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +

        "'type'='iceberg'," +

        "'catalog-type'='hadoop'," +

        "'warehouse'='hdfs://mycluster/flink_iceberg')");

//2.创建iceberg表 flink_iceberg_tbl

tblEnv.executeSql("create table hadoop_iceberg.iceberg_db.flink_iceberg_tbl3(id int,name string,age int,loc string) partitioned by (loc)");

二、编写代码读取Kafka数据实时写入Iceberg

public class ReadKafkaToIceberg {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);

        env.enableCheckpointing(1000);

 

        /**

         * 1.需要预先创建 Catalog 及Iceberg表

         */

        //1.创建Catalog

        tblEnv.executeSql("CREATE CATALOG hadoop_iceberg WITH (" +

                "'type'='iceberg'," +

                "'catalog-type'='hadoop'," +

                "'warehouse'='hdfs://mycluster/flink_iceberg')");

 

        //2.创建iceberg表 flink_iceberg_tbl

//        tblEnv.executeSql("create table hadoop_iceberg.iceberg_db.flink_iceberg_tbl3(id int,name string,age int,loc string) partitioned by (loc)");

 

        //3.创建 Kafka Connector,连接消费Kafka中数据

        tblEnv.executeSql("create table kafka_input_table(" +

                " id int," +

                " name varchar," +

                " age int," +

                " loc varchar" +

                ") with (" +

                " 'connector' = 'kafka'," +

                " 'topic' = 'flink-iceberg-topic'," +

                " 'properties.bootstrap.servers'='node1:9092,node2:9092,node3:9092'," +

                " 'scan.startup.mode'='latest-offset'," +

                " 'properties.group.id' = 'my-group-id'," +

                " 'format' = 'csv'" +

                ")");

 

        //4.配置 table.dynamic-table-options.enabled

        Configuration configuration = tblEnv.getConfig().getConfiguration();

        // 支持SQL语法中的 OPTIONS 选项

        configuration.setBoolean("table.dynamic-table-options.enabled", true);

 

        //5.写入数据到表 flink_iceberg_tbl3

        tblEnv.executeSql("insert into hadoop_iceberg.iceberg_db.flink_iceberg_tbl3 select id,name,age,loc from kafka_input_table");

 

        //6.查询表数据

        TableResult tableResult = tblEnv.executeSql("select * from hadoop_iceberg.iceberg_db.flink_iceberg_tbl3 /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/");

        tableResult.print();

    }

}

启动以上代码,向Kafka topic中生产如下数据:

1,zs,18,beijing

2,ls,19,shanghai

3,ww,20,beijing

4,ml,21,shanghai

我们可以看到控制台上有对应实时数据输出,查看对应的Icberg HDFS目录,数据写入成功。


相关文章

可持续集成工具

可持续集成工具

持续集成中常用的 Jenkins 替代方案。1BuildMaster项目地址:https://inedo.com/buildmasterInedo 的 BuildMaster 是 Jenkins 替代...

慢查询导致ES CPU资源打满

慢查询导致ES CPU资源打满

o   一、问题背景§  1.1 集群背景信息§  1.2 收到告警o   二、处理过程§  2.1 查看监控§  2.2 登录kibana查看§  2.3 登录服务器使用命令查看·       2....

8.0 新特性-Generated Invisible Primary Key

8.0 新特性-Generated Invisible Primary Key

说明MySQL Innodb 引擎采用的是 IOT(索引组织表)存储方式,主键的重要性就不言而喻。在早期版本用户如果没有显式指定主键,会自动生成隐藏主键 row_id 来组织 B+ 树,隐藏主键 ro...

MongoDB的索引(五)

十一、2d Indexes1、在MongoDB 2.2版本之前或者地址位置字段没有使用GeoJSON进行存储的情况下,我们使用2d索引比较多。2、2d索引一般是用来计算平面上的计算,对于球面的一些几何...

Solr常用API详细操作

Solr常用API详细操作

1. 监督集群的状态和统计返回监督器(overseer)的当前状态,各种监督器API的性能统计信息以及每种操作类型的最近10次故障/admin/collections?action=OVERSEERS...

CDP实操--配置RangerKMS 并基于Navigator Trustee Server存储KMS密钥

CDP实操--配置RangerKMS 并基于Navigator Trustee Server存储KMS密钥

1.1添加用于部署KMS的服务器到集群从集群host页面里添加两台服务器用于部署rangerkms选择kms-1和kms-2两台服务器等待parcel分发到新加服务器上并自动完成激活 等待host i...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。