Hudi集成Spark

浩客1年前技术文章784

环境准备


安装Spark

1)Hudi支持的Spark版本

Hudi

Supported Spark 3 version

0.12.x

3.3.x3.2.x3.1.x

0.11.x

3.2.xdefault build, Spark bundle only),3.1.x

0.10.x

3.1.x(default build), 3.0.x

0.7.0-0.9.0

3.0.x

0.6.0 and prior

Not supported

2)下载Spark并安装配置好
# 拷贝编译好的包到spark的jars目录

cp /opt/hudi-0.12.0/packaging/hudi-spark-bundle/target/hudi-spark3.2-bundle_2.12-0.12.0.jar /opt/spark-3.2.2/jars

# 不自己编译,去maven里面下载对应版本的jar包放到spark的jars目录下也可以
https://search.maven.org/artifact/org.apache.hudi/hudi-spark3.3-bundle_2.12/0.13.1/jar


Spark SQL方式


创建表

1)启动spark-sql

spark-sql \ 
  --master yarn --deploy-mode client \
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
  --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
  --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

WPS图片(1).png

2)创建分区表

# 创建一个cow分区外部表,指定primaryKey和preCombineField

create table spark_hudi (
    id int, name string,price double, ts bigint
) using hudi 
tblproperties (type = 'cow', primaryKey = 'id', preCombineField = 'ts');

WPS图片2.png



3)向分区表插入数据

# 默认情况下,如果提供了preCombineKey,则insert into的写操作类型为upsert,否则使用insert。

insert into spark_hudi select 1, 'a1', 20, 1000;

WPS图片3.png

4)时间旅行查询

# 修改id为1的数据

insert into spark_hudi select 1, 'a1_1', 20,1000;

# 再基于第一次提交时间进行时间旅行查询

select * from spark_hudi timestamp as of '20231126202835692' where id = 1;

# 再次查询会发现查询结果为第一次插入的数据

WPS图片4.png

5update

# 更新操作需要指定preCombineField

update spark_hudi set price = price * 2, ts = 1111 where id = 1;

WPS图片7.png

6执行mergeinto

# 准备source表并插入数据

create table merge_source (
   id int, name string, price double, ts bigint
) using hudi 
tblproperties (primaryKey = 'id', preCombineField = 'ts'); 

insert into merge_source values (1, "old_a1", 22.22, 2900), (2, "new_a2", 33.33, 2000), (3, "new_a3", 44.44, 2000);  

merge into spark_hudi 
  as target using merge_source as source 
  on target.id = source.id 
  when matched then update 
  set * when not matched then insert *;

WPS图片8.png

7)执行delete

delete from spark_hudi where id = 1;

WPS图片9.png

8)执行bulk_insert

set hoodie.sql.bulk.insert.enable=true; 
set hoodie.sql.insert.mode=non-strict; 
insert into spark_hudi select 2, 'a1_2', 20, 1002;

WPS图片11.png

相关文章

MySQL 小数类型介绍

MySQL 小数类型介绍

前言对于保证精度的数字,MySQL 也有对应的小数类型,下图是 MySQL 中小数类型概览。 浮点:小数点非固定的数,可表示数据范围较广,整数,小数都可表示。定点:小数点固定,可表示整数,小数。int...

bind服务-1

bind服务-1

DNS:域名系统(英文:Domain Name System)是一个域名系统,是万维网上作为域名和IP地址相互映射的一个分布式数据库,能够使用户更方便的访问互联网,而不用去记住能够被机器直接读取的IP...

rds pg10 ssd云盘升级磁盘类型方案

rds pg10 ssd云盘升级磁盘类型方案

1、升级方案一原实例变更配置方案优缺点:优点:操作方便,升级后能保证数据和存储过程都是正常的。缺点:停机时间较长,整个变更配置期间业务不可用。变更步骤参考:在配置信息区域单击变更配置。(仅包年包月实例...

达梦数据库初始化

达梦数据库初始化

1、dm数据库初始化认证dm数据库默认开启了ssl,在第一次登录时,使用SYSDBA用户登录,需要通过命令指定对应的ssl文件。不然会出现ssl 初始化失败的报错./disql SYSDBA/SYSD...

Kubernetes安全--基于sa和user的rbac认证机制

前言Kubernetes中的用户K8S中有两种用户(User)——服务账号(ServiceAccount)和普通意义上的用户(User)ServiceAccount是由K8S管理的,而User通常是在...

Flume使用详解

Flume使用详解

一、Flume概念Flume 是 Cloudera 提供的日志收集系统,具有分布式、高可靠、高可用性等特点,对海量 日志采集、聚合和传输,Flume 支持在日志系统中定制各类数据发送方,同时,Flum...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。