flink sql 批处理

浩客2年前技术文章2230

进入flink sql命令行

sql-client.sh

Source 表 

       与所有 SQL 引擎一样,Flink 查询操作是在表上进行。与传统数据库不同,Flink 不在本地管理静态数据;相反,它的查询在外部表上连续运行。

       Flink 数据处理流水线开始于 source 表。source 表产生在查询执行期间可以被操作的行;它们是查询时 FROM 子句中引用的表。这些表可能是 Kafka 的 topics,数据库,文件系统,或者任何其它 Flink 知道如何消费的系统。

       可以通过 SQL 客户端或使用环境配置文件来定义表。SQL 客户端支持类似于传统 SQL 的 SQL DDL 命令。标准 SQL DDL 用于创建修改删除表。Flink 支持不同的连接器格式相结合以定义表。

# Source表  -- hdfs source

-- 将数据上传到hdfs
hadoop dfs -mkdir -p /data/peoples
hadoop dfs -put student.txt /data/peoples

CREATE TABLE student (
    id STRING,
    name STRING,
    age INT,
    gender STRING,
    clazz STRING
) WITH ( 
    'connector' = 'filesystem',
    'path' = 'hdfs://master:9000/data/peoples',
    'format' = 'csv'
);

连续查询 

       虽然最初设计时没有考虑流语义,但 SQL 是用于构建连续数据流水线的强大工具。Flink SQL 与传统数据库查询的不同之处在于,Flink SQL 持续消费到达的行并对其结果进行更新。

        一个连续查询永远不会终止,并会产生一个动态表作为结果。动态表是 Flink 中 Table API 和 SQL 对流数据支持的核心概念。连续流上的聚合需要在查询执行期间不断地存储聚合的结果。

select clazz,count(1) as c  
from student
group by clazz;

Sink 表 

        当运行此查询时,SQL 客户端实时但是以只读方式提供输出。存储结果,作为报表或仪表板的数据来源,需要写到另一个表。这可以使用 INSERT INTO 语句来实现。本节中引用的表称为 sink 表。INSERT INTO 语句将作为一个独立查询被提交到 Flink 集群中。

# sink表(保存查询数据) --  hdfs sink 

CREATE TABLE clazz_num (
    clazz STRING,
    c BIGINT
) WITH ( 
    'connector' = 'filesystem',
    'path' = 'hdfs://master:9000/data/clazz_num',
    'format' = 'csv'
);


# 将连续查询的结果插入到sink表中
batch是输出最终的结果,streamg模式输出连续结果

-- 如果连续查询的返回的动态表是一个更新的表
-- 插入语句的返回的字段和类型和sink表一致
SET 'execution.runtime-mode' = 'batch';   使用批处理模式写入hdfs

insert into clazz_num
select clazz,count(1) as c  
from student
group by clazz;

hadoop dfs -ls /data/clazz_num
hadoop dfs -cat /data/clazz_num/*

提交后,它将运行并将结果直接存储到 sink 表中,而不是将结果加载到系统内存中。

相关文章

RabbitMQ 集群部署

RabbitMQ 集群部署

1. 两种模式说到集群,小伙伴们可能第一个问题是,如果我有一个 RabbitMQ 集群,那么是不是我的消息集群中的每一个实例都保存一份呢?这其实就涉及到 RabbitMQ 集群的两种模式:1)普通集群...

PG的pg_stat_statements插件

pg_stat_statements可追踪一个服务器所执行的所有 SQL 语句的执行统计信息,可以用于统计数据库的资源开销,分析TOP SQL。一、插件安装1、编译安装进入postgresql源码目录...

win内存使用率过高但是资源监视器查看不到进程,排查思路

win内存使用率过高但是资源监视器查看不到进程,排查思路

问题现象:服务器:某云服务器 内存使用率持续打高,但是通过任务管理器查不到占用内存很高的进程排查步骤:1、通过任务管理器分析核查目标主机的内存使用趋势情况,近7天内存使用情况如下:通过任务管理器排查内...

Kafka日志管理

Kafka在运行时会生成大量的日志记录信息,包含了运行状态、错误信息、性能指标等。这些日志文件会占用很大的磁盘空间,过多的日志文件也会影响Kafka的性能,因此需要采取一些日志管理措施来清理无用的日志...

Kubernetes安全--securityContext介绍

securityContext是用来控制容器内的用户权限,你想用什么用户去执行程序或者执行操作等等。1. securityContext介绍安全上下文(Security Context)定义 Pod...

在kubernetes中,让某个node成为专属节点

如何让node 去”选择”只有谁(pod)能部署到自身上面?看了下现有的Node Selectors、Node Affinity、Node Taints, 经过比对,发现Node Taints 更适合...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。