Hudi集成Spark

浩客2年前技术文章1374

环境准备


安装Spark

1)Hudi支持的Spark版本

Hudi

Supported Spark 3 version

0.12.x

3.3.x3.2.x3.1.x

0.11.x

3.2.xdefault build, Spark bundle only),3.1.x

0.10.x

3.1.x(default build), 3.0.x

0.7.0-0.9.0

3.0.x

0.6.0 and prior

Not supported

2)下载Spark并安装配置好
# 拷贝编译好的包到spark的jars目录

cp /opt/hudi-0.12.0/packaging/hudi-spark-bundle/target/hudi-spark3.2-bundle_2.12-0.12.0.jar /opt/spark-3.2.2/jars

# 不自己编译,去maven里面下载对应版本的jar包放到spark的jars目录下也可以
https://search.maven.org/artifact/org.apache.hudi/hudi-spark3.3-bundle_2.12/0.13.1/jar


Spark SQL方式


创建表

1)启动spark-sql

spark-sql \ 
  --master yarn --deploy-mode client \
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
  --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
  --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

WPS图片(1).png

2)创建分区表

# 创建一个cow分区外部表,指定primaryKey和preCombineField

create table spark_hudi (
    id int, name string,price double, ts bigint
) using hudi 
tblproperties (type = 'cow', primaryKey = 'id', preCombineField = 'ts');

WPS图片2.png



3)向分区表插入数据

# 默认情况下,如果提供了preCombineKey,则insert into的写操作类型为upsert,否则使用insert。

insert into spark_hudi select 1, 'a1', 20, 1000;

WPS图片3.png

4)时间旅行查询

# 修改id为1的数据

insert into spark_hudi select 1, 'a1_1', 20,1000;

# 再基于第一次提交时间进行时间旅行查询

select * from spark_hudi timestamp as of '20231126202835692' where id = 1;

# 再次查询会发现查询结果为第一次插入的数据

WPS图片4.png

5update

# 更新操作需要指定preCombineField

update spark_hudi set price = price * 2, ts = 1111 where id = 1;

WPS图片7.png

6执行mergeinto

# 准备source表并插入数据

create table merge_source (
   id int, name string, price double, ts bigint
) using hudi 
tblproperties (primaryKey = 'id', preCombineField = 'ts'); 

insert into merge_source values (1, "old_a1", 22.22, 2900), (2, "new_a2", 33.33, 2000), (3, "new_a3", 44.44, 2000);  

merge into spark_hudi 
  as target using merge_source as source 
  on target.id = source.id 
  when matched then update 
  set * when not matched then insert *;

WPS图片8.png

7)执行delete

delete from spark_hudi where id = 1;

WPS图片9.png

8)执行bulk_insert

set hoodie.sql.bulk.insert.enable=true; 
set hoodie.sql.insert.mode=non-strict; 
insert into spark_hudi select 2, 'a1_2', 20, 1002;

WPS图片11.png

相关文章

nginx分发算法

nginx分发算法

一、分发算法介绍如何将用户请求按照一定的规律分发给业务服务器。主要分为Nginx集群默认算法和基于请求头分发算法。二、nginx集群默认算法nginx的upstream 目前支持4种方式的分配轮询(默...

系统RDSCPU打满问题分析报告

系统RDSCPU打满问题分析报告

1. 问题概述在2023年9月01日09点13分,玳数运维组侧接收到业务侧反馈系统响应缓慢,与此同时运维群内新系统RDS 发出CPU打满的告警,告警通知如下: 2. 问题分析a. 数据库会话管理核查玳...

kafka性能关键参数配置指导

本文为kafka调优过程中主要参数以及参数相关释意,在遇到kafak性能问题时可优先调整一下参数1.Broker参数指导KAFKA_HEAP_OPTS:-Xmx6G   ...

Oracle数据库恢复演练

1、演练目的验证核心系统数据库备份的有效性,在极端数据库故障情况下保证数据库存在一份可用的备份文件,为业务数据的安全提供保障。 2、演练准备提供一台2C16G本地60G的阿里ecs服务器,操...

ES架构模型

ES架构模型

1.整体架构Elasticsearch是一个高度可扩展的开源全文搜索和分析引擎。它允许您快速、近实时地存储、搜索和分析大量数据。它通常用作支持具有复杂搜索功能和需求的应用程序的底层引擎/技术。Elas...

MySQL 8.0 新特性:innodb_dedicated_server

MySQL 8.0 新特性:innodb_dedicated_server

一、前言Innodb Dedicated Server 是 8.0 版本推出的一个参数,开启之后可以根据服务器的配置自适应 innodb 引擎中的一些重要影响性能的参数,默认是关闭的。二、参数测试使用...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。