Flink关于HiveCatalog

浩客4个月前技术文章199

HiveCatalog

HiveCatalog 有两个用途:作为原生 Flink 元数据的持久化存储,以及作为读写现有 Hive 元数据的接口。

配置

  1. 在flink-sql-connector-hive-1.2.2_2.12-1.15.0.jar包上传到lfink 的lib目录下


  2. 开启hive的元数据服务

    nohup  hive --service metastore >> metastore.log 2>&1 &
  3. 删除flink-table-planner-loader-1.15.0.jar

    rm -rf flink-table-planner-loader-1.15.0.jar
  4. 将flink opt目录下的link-table-planner_2.12-1.15.0.jar 复制到flink的lib目录下

    cp /usr/local/soft/flink-1.15.0/opt/flink-table-planner_2.12-1.15.0.jar /usr/local/soft/flink-1.15.0/lib/
  5. 重启yarn-session和sql-client

    yarn-session.sh -d
    sql-client.sh

如何创建 Flink 表并将其注册到 Catalog

创建hive catalog 

'hive-conf-dir  hive-site.xml文件所在的位置

 CREATE CATALOG hive_catalog WITH (
 'type' = 'hive',
 'hive-conf-dir' = '/opt/hive-3.1.2/conf'
);

切换catalog

use catalog hive_catalog

创建表

create table student
(
id  string,
name string,
age int,
gender string,
clazz string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS textfile
location '/data/student/';

在flink中就可以读取这一张表

在flink创建表,可以将表的元数据存放在hive中

CREATE TABLE student_kafka_proc (
   id STRING,
   name STRING,
   age INT,
   gender STRING,
   clazz STRING,
   user_action_time  as PROCTIME() -- 处理时间
) WITH (
 'connector' = 'kafka',
 'topic' = 'student',
 'properties.bootstrap.servers' = 'master:9092',
 'properties.group.id' = 'testGroup',
 'scan.startup.mode' = 'earliest-offset',
 'format' = 'csv'
);

表的元数据被保存在中的hive元数据中,所以在hive中可以看到这个表,但是在hive中不能对这个表进行查询,

使用hive catalog保存flink的元数据,元数据不会自动删除

相关文章

PostgreSQL 锁等待排查

PostgreSQL 锁等待排查

说明在数据库中,常用 锁 和 MVCC 来保障事务的一致性及提高并发性。锁问题的定位和排查也是数据库运维人员必会的技能,本篇文章介绍 PostgreSQL 如何排查定位锁堵塞问题。1. Postgre...

Apache hive 对接达梦数据库

Apache hive 对接达梦数据库

1、背景由于国产化需求,客户需要使用dm数据库作为hive的元数据库。需要进行对应适配2、配置本次使用的环境hive 3.1.3 hadoop 3.2.4 ranger 2.3.0 Spark ...

WAF 透明接入模式

WAF 透明接入模式

透明接入模式只需将需要防护的网站信息添加到WAF,无需修改域名的DNS解析设置,即可实现WAF防护。如果您的源站服务器为ECS服务器或者部署在阿里云公网SLB上,那么除了使用CNAME接入模式,还可以...

Docker镜像是有仓库

在Docker中,当我们执行 docker pull xxx 的时候 ,它实际上是从 hub.docker.com 这个地址去查找,这就是 Docker 公司为我们提供的公共仓库。在工作中,我们不可能...

使用 cgroups为impala设置 CPU 限制

使用 cgroups为impala设置 CPU 限制

有时应用会占用大量 CPU 时间,这可能会对环境的整体健康状况造成负面影响。使用 /sys/fs/ 虚拟文件系统,利用 控制组版本 (cgroups) 为应用配置 CPU 限制。先决条件您有 roo...

MySQL性能优化(二)优化排序操作

MySQL性能优化(二)优化排序操作

排序是数据库的基本功能。一个例子SELECT * FROM audit_log  WHERE user_id = xxx AND&nb...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。