数据湖技术之iceberg(十一)Flink与Iceberg整合-DataStream API

南墨2年前技术文章1043

1.实时写入Iceberg

DataStream Api方式操作Iceberg方式目前仅支持Java Api。使用DataStream API 实时写入Iceberg表具体操作如下:

2、编写代码使用DataStream API将Kafka数据写入到Iceberg表

import com.google.common.collect.ImmutableMap;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;

import org.apache.flink.api.common.functions.MapFunction;

import org.apache.flink.api.common.serialization.SimpleStringSchema;

import org.apache.flink.connector.kafka.source.KafkaSource;

import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.data.GenericRowData;

import org.apache.flink.table.data.RowData;

import org.apache.hadoop.conf.Configuration;

import org.apache.iceberg.*;

import org.apache.iceberg.catalog.Catalog;

import org.apache.iceberg.catalog.TableIdentifier;

import org.apache.iceberg.flink.TableLoader;

import org.apache.flink.table.data.StringData;

import org.apache.iceberg.flink.sink.FlinkSink;

import org.apache.iceberg.hadoop.HadoopCatalog;

import org.apache.iceberg.types.Types;

import java.util.Map;

 

/**

 * 使用DataStream Api 向Iceberg 表写入数据

 */

public class StreamAPIWriteIceberg {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //1.必须设置checkpoint ,Flink向Iceberg中写入数据时当checkpoint发生后,才会commit数据。

        env.enableCheckpointing(5000);

 

        //2.读取Kafka 中的topic 数据

        KafkaSource<String> source = KafkaSource.<String>builder()

                .setBootstrapServers("node1:9092,node2:9092,node3:9092")

                .setTopics("flink-iceberg-topic")

                .setGroupId("my-group-id")

                .setStartingOffsets(OffsetsInitializer.latest())

                .setValueOnlyDeserializer(new SimpleStringSchema())

                .build();

        DataStreamSource<String> kafkaSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

 

        //3.对数据进行处理,包装成RowData 对象,方便保存到Iceberg表中。

        SingleOutputStreamOperator<RowData> dataStream = kafkaSource.map(new MapFunction<String, RowData>() {

            @Override

            public RowData map(String s) throws Exception {

                System.out.println("s = "+s);

                String[] split = s.split(",");

                GenericRowData row = new GenericRowData(4);

                row.setField(0, Integer.valueOf(split[0]));

                row.setField(1, StringData.fromString(split[1]));

                row.setField(2, Integer.valueOf(split[2]));

                row.setField(3, StringData.fromString(split[3]));

                return row;

            }

        });

 

        //4.创建Hadoop配置、Catalog配置和表的Schema,方便后续向路径写数据时可以找到对应的表

        Configuration hadoopConf = new Configuration();

        Catalog catalog = new HadoopCatalog(hadoopConf,"hdfs://mycluster/flink_iceberg/");

 

        //配置iceberg 库名和表名

        TableIdentifier name =

                TableIdentifier.of("icebergdb", "flink_iceberg_tbl");

 

        //创建Icebeng表Schema

        Schema schema = new Schema(

                Types.NestedField.required(1, "id", Types.IntegerType.get()),

                Types.NestedField.required(2, "nane", Types.StringType.get()),

                Types.NestedField.required(3, "age", Types.IntegerType.get()),

                Types.NestedField.required(4, "loc", Types.StringType.get()));

 

        //如果有分区指定对应分区,这里“loc”列为分区列,可以指定unpartitioned 方法不设置表分区

//        PartitionSpec spec = PartitionSpec.unpartitioned();

        PartitionSpec spec = PartitionSpec.builderFor(schema).identity("loc").build();

 

        //指定Iceberg表数据格式化为Parquet存储

        Map<String, String> props =

                ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name());

        Table table = null;

 

        // 通过catalog判断表是否存在,不存在就创建,存在就加载

        if (!catalog.tableExists(name)) {

            table = catalog.createTable(name, schema, spec, props);

        }else {

            table = catalog.loadTable(name);

        }

 

        TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://mycluster/flink_iceberg/icebergdb/flink_iceberg_tbl", hadoopConf);

 

        //5.通过DataStream Api 向Iceberg中写入数据

        FlinkSink.forRowData(dataStream)

                //这个 .table 也可以不写,指定tableLoader 对应的路径就可以。

                .table(table)

                .tableLoader(tableLoader)

                //默认为false,追加数据。如果设置为true 就是覆盖数据

                .overwrite(false)

                .build();

 

        env.execute("DataStream Api Write Data To Iceberg");

    }

}

以上代码有如下几个注意点:

l   需要设置Checkpoint,Flink向Iceberg中写入Commit数据时,只有Checkpoint成功之后才会Commit数据,否则后期在Hive中查询不到数据。

l   读取Kafka数据后需要包装成RowData或者Row对象,才能向Iceberg表中写出数据。写出数据时默认是追加数据,如果指定overwrite就是全部覆盖数据。

l   在向Iceberg表中写数据之前需要创建对应的Catalog、表Schema,否则写出时只指定对应的路径会报错找不到对应的Iceberg表。

l   不建议使用DataStream API 向Iceberg中写数据,建议使用SQL API。

3、在Kafka 中创建代码中指定的“flink-iceberg-topic”并启动代码生产数据

# 在Kafka 中创建 flink-iceberg-topic topic

[root@node1 bin]# ./kafka-topics.sh  --zookeeper node3:2181,node4:2181,node5:2181  --create  --topic flink-iceberg-topic  --partitions 3 --replication-factor 3

创建好以上topic之后,启动代码,然后向topic中生产以下数据:

[root@node1 bin]#./kafka-console-producer.sh  --topic flink-iceberg-topic --broker-list node1:9092,node2:9092,node3:9092

1,zs,18,beijing

2,ls,19,shanghai

3,ww,20,beijing

4,ml,21,shanghai

可以看到在HDFS 对应的路径中保存了对应的数据:

1.png

4、通过Hive查看保存到Iceberg中的数据

启动Hive、Hive Metastore 在Hive中创建映射Iceberg的外表:

CREATE TABLE flink_iceberg_tbl  (

  id int,

  name string,

  age int,

  loc string

)

STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'

LOCATION 'hdfs://mycluster/flink_iceberg/icebergdb/flink_iceberg_tbl'

TBLPROPERTIES ('iceberg.catalog'='location_based_table');

注意:虽然loc是分区列,创建时忽略分区列就可以,此外映射表的路径要保持与保存Iceberg数据路径一致。

通过Hive查询对应的Iceberg表中的数据,结果如下:

hive> select * from flink_iceberg_tbl;

OK

2   ls  19  shanghai

3   ww  20  beijing

1   zs  18  beijing

4   ml  21  shanghai

2.批量/实时读取Iceberg

DataStream API 读取Iceberg表又分为批量读取和实时读取。通过方法“streaming(true/false)”来控制。

1、批量/全量读取

import org.apache.flink.api.common.functions.MapFunction;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.data.RowData;

import org.apache.hadoop.conf.Configuration;

import org.apache.iceberg.flink.TableLoader;

import org.apache.iceberg.flink.source.FlinkSource;

 

/**

 * 使用DataStream Api 批量/实时 读取Iceberg 数据

 */

public class StreamAPIReadIceberg {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

 

        //1.配置TableLoader

        Configuration hadoopConf = new Configuration();

        TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://mycluster/flink_iceberg/icebergdb/flink_iceberg_tbl", hadoopConf);

 

        //2.从Iceberg中读取全量/增量读取数据

        DataStream<RowData> batchData = FlinkSource.forRowData().env(env)

                .tableLoader(tableLoader)

                //默认为false,整批次读取,设置为true 为流式读取

                .streaming(false)

                .build();

 

        batchData.map(new MapFunction<RowData, String>() {

            @Override

            public String map(RowData rowData) throws Exception {

                int id = rowData.getInt(0);

                String name = rowData.getString(1).toString();

                int age = rowData.getInt(2);

                String loc = rowData.getString(3).toString();

                return id+","+name+","+age+","+loc;

            }

        }).print();

 

        env.execute("DataStream Api Read Data From Iceberg");

 

    }

}

结果如下:

2.png

2、实时读取

//当配置 streaming参数为true时就是实时读取

DataStream<RowData> batchData = FlinkSource.forRowData().env(env)

        .tableLoader(tableLoader)

        //默认为false,整批次读取,设置为true 为流式读取

        .streaming(true)

        .build();

修改以上代码并启动,向Hive 对应的Iceberg表“flink_iceberg_tbl”中插入2条数据:

在向Hive的Iceberg表中插入数据之前需要加入以下两个包:

add jar /software/hive-3.1.2/lib/iceberg-hive-runtime-0.12.1.jar;

add jar /software/hive-3.1.2/lib/libfb303-0.9.3.jar;

向Hive 中Iceberg 表插入两条数据

hive> insert into flink_iceberg_tbl values (5,'s1',30,'guangzhou'),(6,'s2',31,'tianjin');

插入完成之后,可以看到Flink 控制台实时读取到对应数据

3.png

3.指定基于快照实时增量读取数据

以上案例我们发现Flink将表中所有数据都读取出来,我们也可以指定对应的snapshot-id 决定基于哪些数据增量读取数据。

DataStream<RowData> batchData = FlinkSource.forRowData().env(env)

        .tableLoader(tableLoader)

        //基于某个快照实时增量读取数据,快照需要从元数据中获取

        .startSnapshotId(4226332606322964975L)

        //默认为false,整批次读取,设置为true 为流式读取

        .streaming(true)

        .build();

结果只读取到指定快照往后的数据,如下:

4.png

4. 合并data files

Iceberg提供Api将小文件合并成大文件,可以通过Flink 批任务来执行。Flink中合并小文件与Spark中小文件合并完全一样。

代码如下:

import org.apache.flink.api.java.ExecutionEnvironment;

import org.apache.hadoop.conf.Configuration;

import org.apache.iceberg.Table;

import org.apache.iceberg.actions.RewriteDataFilesActionResult;

import org.apache.iceberg.catalog.Catalog;

import org.apache.iceberg.catalog.TableIdentifier;

import org.apache.iceberg.flink.TableLoader;

import org.apache.iceberg.flink.actions.Actions;

import org.apache.iceberg.hadoop.HadoopCatalog;

 

/**

 *  可以通过提交Flink批量任务来合并Data Files 文件。

 */

public class RewrietDataFiles {

    public static void main(String[] args) {

 

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

 

        //1.配置TableLoader

        Configuration hadoopConf = new Configuration();

 

        //2.创建Hadoop配置、Catalog配置和表的Schema,方便后续向路径写数据时可以找到对应的表

        Catalog catalog = new HadoopCatalog(hadoopConf,"hdfs://mycluster/flink_iceberg/");

 

        //3.配置iceberg 库名和表名并加载表

        TableIdentifier name =

                TableIdentifier.of("icebergdb", "flink_iceberg_tbl");

        Table table = catalog.loadTable(name);

 

        //4..合并 data files 小文件

        RewriteDataFilesActionResult result = Actions.forTable(table)

                .rewriteDataFiles()

                //默认 512M ,可以手动通过以下指定合并文件大小,与Spark中一样。

                .targetSizeInBytes(536870912L)

                .execute();

    }

}


相关文章

docker日志管理

docker日志管理

docker的日志分类 一、Docker 引擎日志(也就是 dockerd 运行时的日志)Ubuntu14.04: /var/log/upstart/docker.logCentos 6/7或ubun...

RDS通过DMS管理登录处理

RDS通过DMS管理登录处理

问题描述无法通过DMS管理登录进入数据库,报错如下:问题处理方式一在RDS控制台新建账号 账号管理--创建账号将此数据库添加进DMS在DMS控制台--数据库实例--新增实例将新建的数据库账号信息进行录...

python-序列化和反序列化

1、为什么要序列化内存中的字典、列表、集合以及各种对象,如何保存到一个文件中?如果是自己定义的类的实例,如何保存到一个文件中?如何从文件中读取数据,并让它们在内存中再次恢复成自己对应的类的实例?要设计...

Spark接入Kerberos交互式命令窗口提交任务

Spark接入Kerberos交互式命令窗口提交任务

1. Spark-shell首先需要使用有操作hdfs文件权限的keytab用户认证,认证上之后可以通过spark-shell交互命令行窗口执行任务如果集成了Ranger组件,如果创建的普通用户没有在...

Redis 内存使用情况查看

内存情况查看1、MEMORY STATS--连接 redisredis-cli--执行 MEMORY STATS命令查询内存使用详情 MEMORY STATSRedis实例的内存开销主要由两部分组成:...

Hive 重新编译-解决Tez JobName的问题

Hive 重新编译-解决Tez JobName的问题

本文采用linux编译首先下载源码https://dlcdn.apache.org/hive/hive-3.1.2/apache-hive-3.1.2-src.tar.gz源码位置ql/src/jav...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。