Hudi集成Spark

浩客2年前技术文章1099

环境准备


安装Spark

1)Hudi支持的Spark版本

Hudi

Supported Spark 3 version

0.12.x

3.3.x3.2.x3.1.x

0.11.x

3.2.xdefault build, Spark bundle only),3.1.x

0.10.x

3.1.x(default build), 3.0.x

0.7.0-0.9.0

3.0.x

0.6.0 and prior

Not supported

2)下载Spark并安装配置好
# 拷贝编译好的包到spark的jars目录

cp /opt/hudi-0.12.0/packaging/hudi-spark-bundle/target/hudi-spark3.2-bundle_2.12-0.12.0.jar /opt/spark-3.2.2/jars

# 不自己编译,去maven里面下载对应版本的jar包放到spark的jars目录下也可以
https://search.maven.org/artifact/org.apache.hudi/hudi-spark3.3-bundle_2.12/0.13.1/jar


Spark SQL方式


创建表

1)启动spark-sql

spark-sql \ 
  --master yarn --deploy-mode client \
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
  --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
  --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

WPS图片(1).png

2)创建分区表

# 创建一个cow分区外部表,指定primaryKey和preCombineField

create table spark_hudi (
    id int, name string,price double, ts bigint
) using hudi 
tblproperties (type = 'cow', primaryKey = 'id', preCombineField = 'ts');

WPS图片2.png



3)向分区表插入数据

# 默认情况下,如果提供了preCombineKey,则insert into的写操作类型为upsert,否则使用insert。

insert into spark_hudi select 1, 'a1', 20, 1000;

WPS图片3.png

4)时间旅行查询

# 修改id为1的数据

insert into spark_hudi select 1, 'a1_1', 20,1000;

# 再基于第一次提交时间进行时间旅行查询

select * from spark_hudi timestamp as of '20231126202835692' where id = 1;

# 再次查询会发现查询结果为第一次插入的数据

WPS图片4.png

5update

# 更新操作需要指定preCombineField

update spark_hudi set price = price * 2, ts = 1111 where id = 1;

WPS图片7.png

6执行mergeinto

# 准备source表并插入数据

create table merge_source (
   id int, name string, price double, ts bigint
) using hudi 
tblproperties (primaryKey = 'id', preCombineField = 'ts'); 

insert into merge_source values (1, "old_a1", 22.22, 2900), (2, "new_a2", 33.33, 2000), (3, "new_a3", 44.44, 2000);  

merge into spark_hudi 
  as target using merge_source as source 
  on target.id = source.id 
  when matched then update 
  set * when not matched then insert *;

WPS图片8.png

7)执行delete

delete from spark_hudi where id = 1;

WPS图片9.png

8)执行bulk_insert

set hoodie.sql.bulk.insert.enable=true; 
set hoodie.sql.insert.mode=non-strict; 
insert into spark_hudi select 2, 'a1_2', 20, 1002;

WPS图片11.png

相关文章

Kubernetes原理分析--Kube-controller list&watch原理解析

Kubernetes原理分析--Kube-controller list&watch原理解析

1.list&watch流程:这里有三个 List-Watch,分别是 Controller Manager(运行在 Master),Scheduler(运行在Master),kubelet(...

使用 systemd 管理 MySQL 服务

前言systemd 是 Linux 系统推出的初始化(init)系统,MySQL 使用 RPM 或者 Debian 包安装 MySQL 会默认使用 systemd 来管理 MySQL 服务,不需要额外...

ES运维(五)聚合分析流程及精准度

ES运维(五)聚合分析流程及精准度

1、 概述ES是一个近实时的搜索引擎,提供近实时海量数据的聚合分析功能,但这个海量数据聚合分析是会损失一定的精准度来满足实时性能需要的。 2、 分布式系统的近似统计算法如下图,在分布式数据分...

es68 应用异常排查

es68 应用异常排查

问题描述es68应用拉取镜像缓慢问题解决直接从docker仓库拉取,配置了镜像加速。docker的阿里云镜像地址 (推荐使用)yum-config-manager \     --add-repo ...

emr部署hive并适配达梦数据库

emr部署hive并适配达梦数据库

一、达梦 用户、数据库初始化1、创建hive的元数据库create tablespace hive_meta datafile '/dm8/data/DAMENG/hive_meta.dbf' siz...

MySQL mgr部署文档

MySQL mgr部署文档

一、环境说明1.1服务器信息1.2目录规划1.2目录规划二、环境配置2.1 关闭防火墙和selinuxservice iptabls stop /etc/selinux/conf...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。