数据湖技术之iceberg(十)Structured Streaming实时写入Iceberg

南墨2年前技术文章721

目前Spark中Structured Streaming只支持实时向Iceberg中写入数据,不支持实时从Iceberg中读取数据,下面案例我们将使用Structured Streaming从Kafka中实时读取数据,然后将结果实时写入到Iceberg中。

一、创建Kafka topic

启动Kafka集群,创建“kafka-iceberg-topic”

[root@node1 bin]# ./kafka-topics.sh  --zookeeper node3:2181,node4:2181,node5:2181  --create  --topic kafka-iceberg-topic  --partitions 3 --replication-factor 3

二、编写向Kafka生产数据代码

/**

  * 向Kafka中写入数据

  */

object WriteDataToKafka {

  def main(args: Array[String]): Unit = {

    val props = new Properties()

    props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092")

    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")

    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

 

    val producer = new KafkaProducer[String,String](props)

    var counter = 0

    var keyFlag = 0

    while(true){

      counter +=1

      keyFlag +=1

      val content: String = userlogs()

      producer.send(new ProducerRecord[String, String]("kafka-iceberg-topic", content))

      //producer.send(new ProducerRecord[String, String]("kafka-iceberg-topic", s"key-$keyFlag", content))

      if(0 == counter%100){

        counter = 0

        Thread.sleep(5000)

      }

    }

    producer.close()

  }

 

  def userlogs()={

    val userLogBuffer = new StringBuffer("")

    val timestamp = new Date().getTime();

    var userID = 0L

    var pageID = 0L

 

    //随机生成的用户ID

    userID = Random.nextInt(2000)

 

    //随机生成的页面ID

    pageID =  Random.nextInt(2000);

 

    //随机生成Channel

    val channelNames = Array[String]("Spark","Scala","Kafka","Flink","Hadoop","Storm","Hive","Impala","HBase","ML")

    val channel = channelNames(Random.nextInt(10))

 

    val actionNames = Array[String]("View", "Register")

    //随机生成action行为

    val action = actionNames(Random.nextInt(2))

 

    val dateToday = new SimpleDateFormat("yyyy-MM-dd").format(new Date())

    userLogBuffer.append(dateToday)

      .append("\t")

      .append(timestamp)

      .append("\t")

      .append(userID)

      .append("\t")

      .append(pageID)

      .append("\t")

      .append(channel)

      .append("\t")

      .append(action)

    System.out.println(userLogBuffer.toString())

    userLogBuffer.toString()

  }

}

三、编写Structured Streaming读取Kafka数据实时写入Iceberg

object StructuredStreamingSinkIceberg {

  def main(args: Array[String]): Unit = {

    //1.准备对象

    val spark: SparkSession = SparkSession.builder().master("local").appName("StructuredSinkIceberg")

      //指定hadoop catalog,catalog名称为hadoop_prod

      .config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")

      .config("spark.sql.catalog.hadoop_prod.type", "hadoop")

      .config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://mycluster/structuredstreaming")

      .getOrCreate()

//    spark.sparkContext.setLogLevel("Error")

 

    //2.创建Iceberg 表

    spark.sql(

      """

        |create table if not exists hadoop_prod.iceberg_db.iceberg_table (

        | current_day string,

        | user_id string,

        | page_id string,

        | channel string,

        | action string

        |) using iceberg

      """.stripMargin)

 

    val checkpointPath = "hdfs://mycluster/iceberg_table_checkpoint"

    val bootstrapServers = "node1:9092,node2:9092,node3:9092"

    //多个topic 逗号分开

    val topic = "kafka-iceberg-topic"

 

    //3.读取Kafka读取数据

    val df = spark.readStream

      .format("kafka")

      .option("kafka.bootstrap.servers", bootstrapServers)

      .option("auto.offset.reset", "latest")

      .option("group.id", "iceberg-kafka")

      .option("subscribe", topic)

      .load()

 

    import spark.implicits._

    import org.apache.spark.sql.functions._

 

    val resDF = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

      .as[(String, String)].toDF("id", "data")

 

    val transDF: DataFrame = resDF.withColumn("current_day", split(col("data"), "\t")(0))

      .withColumn("ts", split(col("data"), "\t")(1))

      .withColumn("user_id", split(col("data"), "\t")(2))

      .withColumn("page_id", split(col("data"), "\t")(3))

      .withColumn("channel", split(col("data"), "\t")(4))

      .withColumn("action", split(col("data"), "\t")(5))

      .select("current_day", "user_id", "page_id", "channel", "action")

 

    //结果打印到控制台,Default trigger (runs micro-batch as soon as it can)

//    val query: StreamingQuery = transDF.writeStream

//      .outputMode("append")

//      .format("console")

//      .start()

 

    //4.流式写入Iceberg表

    val query = transDF.writeStream

      .format("iceberg")

      .outputMode("append")

      //每分钟触发一次Trigger.ProcessingTime(1, TimeUnit.MINUTES)

      //每10s 触发一次 Trigger.ProcessingTime(1, TimeUnit.MINUTES)

      .trigger(Trigger.ProcessingTime(10, TimeUnit.SECONDS))

      .option("path", "hadoop_prod.iceberg_db.iceberg_table")

      .option("fanout-enabled", "true")

      .option("checkpointLocation", checkpointPath)

      .start()

 

    query.awaitTermination()

 

  }

}

注意:以上代码执行时由于使用的Spark版本为3.1.2,其依赖的Hadoop版本为Hadoop3.2版本,所以需要在本地Window中配置Hadoop3.1.2的环境变量以及将对应的hadoop.dll放入window "C:\Windows\System32"路径下。

Structuerd Streaming向Iceberg实时写入数据有以下几个注意点:

写Iceberg表写出数据支持两种模式:append和complete,append是将每个微批数据行追加到表中。complete是替换每个微批数据内容。

向Iceberg中写出数据时指定的path可以是HDFS路径,可以是Iceberg表名,如果是表名,要预先创建好Iceberg表。

写出参数fanout-enabled指的是如果Iceberg写出的表是分区表,在向表中写数据之前要求Spark每个分区的数据必须排序,但这样会带来数据延迟,为了避免这个延迟,可以设置“fanout-enabled”参数为true,可以针对每个Spark分区打开一个文件,直到当前task批次数据写完,这个文件再关闭。

实时向Iceberg表中写数据时,建议trigger设置至少为1分钟提交一次,因为每次提交都会产生一个新的数据文件和元数据文件,这样可以减少一些小文件。为了进一步减少数据文件,建议定期合并“data files”(参照1.9.6.9)和删除旧的快照(1.9.6.10)。

四、查看Iceberg中数据结果

启动向Kafka生产数据代码,启动向Iceberg中写入数据的Structured Streaming程序,执行以下代码来查看对应的Iceberg结果:

//1.准备对象

val spark: SparkSession = SparkSession.builder().master("local").appName("StructuredSinkIceberg")

  //指定hadoop catalog,catalog名称为hadoop_prod

  .config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")

  .config("spark.sql.catalog.hadoop_prod.type", "hadoop")

  .config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://mycluster/structuredstreaming")

  .getOrCreate()

 

//2.读取Iceberg 表中的数据结果

spark.sql(

  """

    |select * from hadoop_prod.iceberg_db.iceberg_table

  """.stripMargin).show()


相关文章

Redis Sentinel与Cluster安装部署(二)

3.2cluster部署1、在对应的机器,下载、解压redis #详见sentinel部署内相关命令 2、创建对应的文件目录   mkdir -p /usr/lcoal/redis5/clust...

Kafka 手动调整分区副本存储

Kafka 手动调整分区副本存储

              在生产环境中,每台服务器的配置和性能不一致,但是Kafka只会根据自己的代码规则创建对应的分区副本,就会导致个别服务器存储压力较大。所有需要手动调整分区副本的存储。测试:创...

Linux解锁线程基本概念和线程控制,步入多线程学习的大门(1)

Linux解锁线程基本概念和线程控制,步入多线程学习的大门(1)

1、线程初识1.1线程的概念线程是进程内部的一个执行分支,线程是CPU调度的基本单位那什么是进程呢?我们之前学习了解到的进程是加载到内存中的程序进程 = 内核数据结构 + 进程代码和数据。今天我们要推...

CPU--上下文切换

CPU--上下文切换

一、概述1、Linux 是一个多任务操作系统,它支持远大于 CPU 数量的任务同时运行。当然,这些任务实际上并不是真的在同时运行,而是因为系统在很短的时间内,将 ...

企业级大数据安全架构(十一)

企业级大数据安全架构(十一)

一、Kerberos接入dophinscheduler建议将dophinscheduler集成到Ambari安装部署,在Ambari上面开启kerberos1.安装准备编译从GitHub获取dolph...

zabbix监控华为存储设备

zabbix监控华为存储设备

确认监控方式开始监控之前首先思考确认好要监控的方式。提出疑问:zabbix 监控华为存储设备推荐使用snmptrap还是snmptt呢?回答:在 Zabbix 监控华为存储设备时,您可以选择使用 SN...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。