Flink关于HiveCatalog
HiveCatalog
HiveCatalog
有两个用途:作为原生 Flink 元数据的持久化存储,以及作为读写现有 Hive 元数据的接口。
配置
在flink-sql-connector-hive-1.2.2_2.12-1.15.0.jar包上传到lfink 的lib目录下
开启hive的元数据服务
nohup hive --service metastore >> metastore.log 2>&1 &
删除flink-table-planner-loader-1.15.0.jar
rm -rf flink-table-planner-loader-1.15.0.jar
将flink opt目录下的link-table-planner_2.12-1.15.0.jar 复制到flink的lib目录下
cp /usr/local/soft/flink-1.15.0/opt/flink-table-planner_2.12-1.15.0.jar /usr/local/soft/flink-1.15.0/lib/
重启yarn-session和sql-client
yarn-session.sh -d
sql-client.sh
如何创建 Flink 表并将其注册到 Catalog
创建hive catalog
'hive-conf-dir hive-site.xml文件所在的位置
CREATE CATALOG hive_catalog WITH (
'type' = 'hive',
'hive-conf-dir' = '/opt/hive-3.1.2/conf'
);
切换catalog
use catalog hive_catalog
创建表
create table student
(
id string,
name string,
age int,
gender string,
clazz string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS textfile
location '/data/student/';
在flink中就可以读取这一张表
在flink创建表,可以将表的元数据存放在hive中
CREATE TABLE student_kafka_proc (
id STRING,
name STRING,
age INT,
gender STRING,
clazz STRING,
user_action_time as PROCTIME() -- 处理时间
) WITH (
'connector' = 'kafka',
'topic' = 'student',
'properties.bootstrap.servers' = 'master:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
);
表的元数据被保存在中的hive元数据中,所以在hive中可以看到这个表,但是在hive中不能对这个表进行查询,
使用hive catalog保存flink的元数据,元数据不会自动删除