Hudi集成Spark

浩客11个月前技术文章463

环境准备


安装Spark

1)Hudi支持的Spark版本

Hudi

Supported Spark 3 version

0.12.x

3.3.x3.2.x3.1.x

0.11.x

3.2.xdefault build, Spark bundle only),3.1.x

0.10.x

3.1.x(default build), 3.0.x

0.7.0-0.9.0

3.0.x

0.6.0 and prior

Not supported

2)下载Spark并安装配置好
# 拷贝编译好的包到spark的jars目录

cp /opt/hudi-0.12.0/packaging/hudi-spark-bundle/target/hudi-spark3.2-bundle_2.12-0.12.0.jar /opt/spark-3.2.2/jars

# 不自己编译,去maven里面下载对应版本的jar包放到spark的jars目录下也可以
https://search.maven.org/artifact/org.apache.hudi/hudi-spark3.3-bundle_2.12/0.13.1/jar


Spark SQL方式


创建表

1)启动spark-sql

spark-sql \ 
  --master yarn --deploy-mode client \
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
  --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
  --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

WPS图片(1).png

2)创建分区表

# 创建一个cow分区外部表,指定primaryKey和preCombineField

create table spark_hudi (
    id int, name string,price double, ts bigint
) using hudi 
tblproperties (type = 'cow', primaryKey = 'id', preCombineField = 'ts');

WPS图片2.png



3)向分区表插入数据

# 默认情况下,如果提供了preCombineKey,则insert into的写操作类型为upsert,否则使用insert。

insert into spark_hudi select 1, 'a1', 20, 1000;

WPS图片3.png

4)时间旅行查询

# 修改id为1的数据

insert into spark_hudi select 1, 'a1_1', 20,1000;

# 再基于第一次提交时间进行时间旅行查询

select * from spark_hudi timestamp as of '20231126202835692' where id = 1;

# 再次查询会发现查询结果为第一次插入的数据

WPS图片4.png

5update

# 更新操作需要指定preCombineField

update spark_hudi set price = price * 2, ts = 1111 where id = 1;

WPS图片7.png

6执行mergeinto

# 准备source表并插入数据

create table merge_source (
   id int, name string, price double, ts bigint
) using hudi 
tblproperties (primaryKey = 'id', preCombineField = 'ts'); 

insert into merge_source values (1, "old_a1", 22.22, 2900), (2, "new_a2", 33.33, 2000), (3, "new_a3", 44.44, 2000);  

merge into spark_hudi 
  as target using merge_source as source 
  on target.id = source.id 
  when matched then update 
  set * when not matched then insert *;

WPS图片8.png

7)执行delete

delete from spark_hudi where id = 1;

WPS图片9.png

8)执行bulk_insert

set hoodie.sql.bulk.insert.enable=true; 
set hoodie.sql.insert.mode=non-strict; 
insert into spark_hudi select 2, 'a1_2', 20, 1002;

WPS图片11.png

相关文章

Doris FE节点故障恢复

Doris FE节点故障恢复

FE故障恢复现象:FE由于元数据损坏导致无法启动            &nbs...

Pod终止-preStop

由于 Pod 所代表的是在集群中节点上运行的进程,当不再需要这些进程时允许其体面地 终止一般不应武断地使用 KILL 信号终止它们设计的目标是令你能够请求删除进程,并且知道进程何时被终止,同时也能够确...

Python 类型注解和参数类型检查

1、类型注解1.1 函数定义的弊端Python 是动态语言,变量随时可以被赋值,且能赋值为不同的类型。Python 不是静态编译型语言,变量类型是在运行期决定的。动态语言很灵活,但是这种特性也是弊端。...

开源大数据集群部署(二十)Trino部署

开源大数据集群部署(二十)Trino部署

2.9.1 解压trino的包到opt目录cd /root/bigdata tar -xzvf trino-server-389.tar.gz -C /opt/ ln -s /opt/trino-...

pg_probackup

一、pg_probackup概述pg_probackup 是一款免费的postgres数据库集群备份工具,与其他备份工具相比,它主要有如下一些优势:提供增量备份,增量备份一定程度上可以节省磁盘空间的使...

副本集的同步、心跳、选举、回滚

一、副本集同步过程   MongoDB的复制功能是依赖与oplog来实现的。   1.数据写入主节点同时在local.oplog.rs集合写入对应的oplog   2.备份节点第一次同步数据时,会先进...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。