Hudi集成Spark

浩客1年前技术文章606

环境准备


安装Spark

1)Hudi支持的Spark版本

Hudi

Supported Spark 3 version

0.12.x

3.3.x3.2.x3.1.x

0.11.x

3.2.xdefault build, Spark bundle only),3.1.x

0.10.x

3.1.x(default build), 3.0.x

0.7.0-0.9.0

3.0.x

0.6.0 and prior

Not supported

2)下载Spark并安装配置好
# 拷贝编译好的包到spark的jars目录

cp /opt/hudi-0.12.0/packaging/hudi-spark-bundle/target/hudi-spark3.2-bundle_2.12-0.12.0.jar /opt/spark-3.2.2/jars

# 不自己编译,去maven里面下载对应版本的jar包放到spark的jars目录下也可以
https://search.maven.org/artifact/org.apache.hudi/hudi-spark3.3-bundle_2.12/0.13.1/jar


Spark SQL方式


创建表

1)启动spark-sql

spark-sql \ 
  --master yarn --deploy-mode client \
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
  --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
  --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

WPS图片(1).png

2)创建分区表

# 创建一个cow分区外部表,指定primaryKey和preCombineField

create table spark_hudi (
    id int, name string,price double, ts bigint
) using hudi 
tblproperties (type = 'cow', primaryKey = 'id', preCombineField = 'ts');

WPS图片2.png



3)向分区表插入数据

# 默认情况下,如果提供了preCombineKey,则insert into的写操作类型为upsert,否则使用insert。

insert into spark_hudi select 1, 'a1', 20, 1000;

WPS图片3.png

4)时间旅行查询

# 修改id为1的数据

insert into spark_hudi select 1, 'a1_1', 20,1000;

# 再基于第一次提交时间进行时间旅行查询

select * from spark_hudi timestamp as of '20231126202835692' where id = 1;

# 再次查询会发现查询结果为第一次插入的数据

WPS图片4.png

5update

# 更新操作需要指定preCombineField

update spark_hudi set price = price * 2, ts = 1111 where id = 1;

WPS图片7.png

6执行mergeinto

# 准备source表并插入数据

create table merge_source (
   id int, name string, price double, ts bigint
) using hudi 
tblproperties (primaryKey = 'id', preCombineField = 'ts'); 

insert into merge_source values (1, "old_a1", 22.22, 2900), (2, "new_a2", 33.33, 2000), (3, "new_a3", 44.44, 2000);  

merge into spark_hudi 
  as target using merge_source as source 
  on target.id = source.id 
  when matched then update 
  set * when not matched then insert *;

WPS图片8.png

7)执行delete

delete from spark_hudi where id = 1;

WPS图片9.png

8)执行bulk_insert

set hoodie.sql.bulk.insert.enable=true; 
set hoodie.sql.insert.mode=non-strict; 
insert into spark_hudi select 2, 'a1_2', 20, 1002;

WPS图片11.png

相关文章

CPU--上下文切换

CPU--上下文切换

一、概述1、Linux 是一个多任务操作系统,它支持远大于 CPU 数量的任务同时运行。当然,这些任务实际上并不是真的在同时运行,而是因为系统在很短的时间内,将 ...

HBase Shell操作

基本操作1.进入HBase客户端命令行[root@cdh02 current]# hbase shell2.查看帮助命令hbase(main):001:0> help3.查看当前数据库中有哪些表...

helm简介

helm简介

一、Helm 是什么在没使用 helm 之前,向 kubernetes 部署应用,我们要依次部署 deployment、svc 等,步骤较繁琐。况且随着很多项目微服务化,复杂的应用在容器中部署以及管理...

CDP实操--配置Hive/Impala的Ranger策略验证(二)

CDP实操--配置Hive/Impala的Ranger策略验证(二)

1.1Hive/impala的Ranger策略验证确保hive-on-tez的配置页面里已经勾选了“Ranger Service”在terminal中,kerberos登录到hive,用如下命令登录b...

开源大数据集群部署(十二)Ranger 集成 hive

开源大数据集群部署(十二)Ranger 集成 hive

1、解压安装在hd1.dtstack.com主机上执行(一般选择hiveserver2节点)Ø 解压ranger-2.3.0-hive-plugin.tar.gz[root@hd1.dtstack.c...

Windows自带性能监控工具Perfmon使用介绍

Windows自带性能监控工具Perfmon使用介绍

一、Perfmon简介Perfmon(Performance Monitor)是一款Windows自带的性能监控工具,提供了图表化的系统性能实时监视器、性能日志和警报管理。通过添加性能计数器(Perf...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。