hadoop集群集成Iceberg操作指导

南墨2年前技术文章2468

hadoop集群集成Iceberg操作指导书

一、    准备工作

1.       大数据集群运行正常,完成hive、trino、spark、flink部署,并可以正常使用相关组件引擎提交任务

2.     通过https://iceberg.apache.org/releases/下载相关iceberg jar包,主要jar包为spark、hive、flink三种,请根据需要集成的版本进行下载(spark需要注意scala版本)

3.       Flink可以使用yarn-session模式和pre-job模式成功提交任务

Trino可对接hive正常进行查询操作

二、    Hive集成iceberg

1.       下载相关jar包,具体链接参考1.2

2.       拷贝libfb303-0.9.3.jar至hadoop目录下

cp $HIVE_HOME/lib/libfb303-0.9.3.jar /opt/hadoop/share/hadoop/mapreduce/lib

3.       修改hive-site.xml相关配置

cd $HIVE_HOME/conf
vim hive-site.xml
#添加如下配置
<!--以下配置为应用程序全局启用Hive支持-->
<property>
    <name>iceberg.engine.hive.enabled</name>
    <value>true</value>
</property>
<!--设置白名单可在客户端set iceberg和yarn相关参数-->
<property>
  <name>hive.security.authorization.sqlstd.confwhitelist</name>
  <value>mapred.*|hive.*|mapreduce.*|spark.*|iceberg.*i|yarn.*</value>
</property>
<property>
 <name>hive.security.authorization.sqlstd.confwhitelist.append</name>
  <value>mapred.*|hive.*|mapreduce.*|spark.*|iceberg.*|yarn.*</value>
</property>
<!--Tez相关参数设置-->
<property>
     <name>tez.mrreader.config.update.properties</name>
     <value>hive.io.file.readcolumn.names,hive.io.file.readcolumn.ids</value>
  </property>
<!--关闭向量化-->
<property>
     <name>hive.vectorized.execution.enabled</name>
     <value>false</value>
  </property>
<!--设置iceberg引用jar包路径-->
<property>
    <name>hive.aux.jars.path</name>
    <value>/opt/hive/iceberg-hive-runtime-1.0.0.jar</value>
</property>

4.       重启hive和metastore服务

bash $HIVE_HOME/bin/stop_hive.sh
bash $HIVE_HOME/bin/start_hive.sh

5.       通过beeline进入

beeline -u 'jdbc:hive2://host:10000/default;principal=hive/principal@realm'

6.       执行如下命令选择catalog

注:Hive本身没有Catalog的概念,但是Iceberg有Catalog。所以Hive将Catalog的信息用键值对的属性来实现,这样建表的时候就可以直接使用创建的Catalog,Hive集成Iceberg支持Hive Catalog和Hadoop Catalog。

 

注册名为hadoop的HadoopCatalog(标红部分注册名可自定义)


此处请修改cluster_nameservice为实际集群的nameservice

SET iceberg.catalog.hadoop.type=hadoop;
SET iceberg.catalog.hadoop.warehouse=hdfs://cluster_nameservice/user/hive/warehouse;

注册名为another_hive的HiveCatalog(标红部分注册名可自定义)

注:此处请修改第二配置值和第四配置值为实际集群值

SET iceberg.catalog.another_hive.type=hive;
SET iceberg.catalog.another_hive.uri=thrift://example.com:9083;
SET iceberg.catalog.another_hive.clients=10;
SET iceberg.catalog.another_hive.warehouse=hdfs://example.com:8020/warehouse;

7.       在hive里创建iceberg表(此处以hadoop catalog为例进行操作)

 #内表

CREATE TABLE table_g (
  id bigint,
name string
) STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
location '/user/hive/warehouse/default/table_g'
TBLPROPERTIES ('iceberg.catalog'='hadoop');
 
#外表
CREATE EXTERNAL TABLE z (
i int
) STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
location '/user/hive/warehouse/z'
TBLPROPERTIES ('iceberg.catalog'='hadoop');

8.       查看创建的表,确认为iceberg表

1.png

9.       测试插入和查询

#设置计算引擎为mr,tez引擎无法在hive3版本调用iceberg,在hive4版本可以
set hive.execution.engine=mr;
insert into table_g values(1,'aaa');
select * from table_g;

     10.    验证截图如下:

插入

查询:

1.png

 三、    Spark集成iceberg

1.       将iceberg-spark-runtime-3.3_2.12-x.x.x.jar放入spark目录下的jars目录中,并修改属组和权限,和其他jar包保持一致

(请根据实际情形修改标红部分)

cp /opt/iceberg-spark-runtime-3.3_2.12-*.jar $SPARK_HOME/jars
chown spark:spark iceberg-spark-runtime-*.jar
chmod xxx iceberg-spark-runtime-*.jar

2.       运行spark-sql进入sparksql客户端

标红部分修改即可为hive即可加载hive类型的catalog

spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:1.0.0\
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
    --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
    --conf spark.sql.catalog.spark_catalog.type=hadoop \
    --conf spark.sql.catalog.spark_catalog.warehouse=/user/hive/warehouse \
    --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.local.type=hadoop \
    --conf spark.sql.catalog.local.warehouse=/user/hive/warehouse

此命令含义是为表在/user/hive/warehouse路径下的表注册一个名为local的基于路径的catalog,catalog的类型为hadoop

 

3.       查询刚才在hive中创建的hadoop catalog的表

1.png

 

四、    Flink集成iceberg

1.       从以下地址下载flinksql-hive驱动包

https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.2_2.12/1.14.6/

具体版本连接地址

https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.2_2.12/1.14.6/flink-sql-connector-hive-3.1.2_2.12-1.14.6.jar

 

2.       将以下jar包添加到/opt/flink/lib下


hive-exec-3.1.0.jar(hive的jar包目录中存在)

iceberg-flink-runtime-1.14-1.0.0.jar

flink-sql-connector-hive-3.1.2_2.11-1.14.4.jar

libfb303-0.9.3.jar(hive的jar包目录中存在)


3.       登陆flinksql客户端

 ./sql-client.sh embedded shell

4.       加载catalog(每次重新连接客户端均需重新加载)

CREATE CATALOG hadoop WITH (
  'type'='iceberg',
  'catalog-type'='hadoop',
  'warehouse'='/user/hive/warehouse',
  'property-version'='1'
);

5.       进入名为hadoop的catalog

 use catalog hadoop;

6.       设置flinksql提交模式

 set 'execution.target' = 'yarn-per-job';

7.       可以查询到刚才在hive中创建的表

1.png

8.       执行查询语句,等待job完成后即可查询出结果(此处需要等待1-2min)

1.png

五、    Trino集成iceberg

1.       在/opt/trino/etc/catalog下添加文件iceberg.properties

配置如下:(部分参数需要根据实际情况修改

Trino不支持类型为hadoop的catalog

connector.name=iceberg
hive.metastore.uri=thrift://hadoop1.dtstack.com:9083
hive.config.resources=/opt/hadoop/etc/hadoop/core-site.xml,/opt/hadoop/etc/hadoop/hdfs-site.xml
hive.metastore.authentication.type=kerberos
hive.metastore.thrift.impersonation.enabled=false
hive.metastore.service.principal=hive/hadoop1.dtstack.com@DTSTACK.COM
hive.metastore.client.principal=hive/hadoop1.dtstack.com@DTSTACK.COM
hive.metastore.client.keytab=/opt/keytab/hive.keytab
hive.hdfs.authentication.type=KERBEROS
hive.hdfs.impersonation.enabled=false
hive.hdfs.trino.principal=trino/_HOST@DTSTACK.COM
hive.hdfs.trino.keytab=/opt/keytab/trino.keytab
iceberg.security=ALLOW_ALL

2.       查询在其他引擎创建的类型为hive的catalog表

1.png

相关文章

IDC:疫情下,第三方云管理服务市场逆势增长!

IDC:疫情下,第三方云管理服务市场逆势增长!

IDC预测,中国第三方云管理服务在2019年到2023年间将保持54.7%的增长率,2023年市场规模预计达到32.1亿美元。2019年第三方云管理服务市场呈现出如下特点:云管理服务成为众多服务商的战...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。