flink sql 批处理

浩客1年前技术文章1243

进入flink sql命令行

sql-client.sh

Source 表 

       与所有 SQL 引擎一样,Flink 查询操作是在表上进行。与传统数据库不同,Flink 不在本地管理静态数据;相反,它的查询在外部表上连续运行。

       Flink 数据处理流水线开始于 source 表。source 表产生在查询执行期间可以被操作的行;它们是查询时 FROM 子句中引用的表。这些表可能是 Kafka 的 topics,数据库,文件系统,或者任何其它 Flink 知道如何消费的系统。

       可以通过 SQL 客户端或使用环境配置文件来定义表。SQL 客户端支持类似于传统 SQL 的 SQL DDL 命令。标准 SQL DDL 用于创建修改删除表。Flink 支持不同的连接器格式相结合以定义表。

# Source表  -- hdfs source

-- 将数据上传到hdfs
hadoop dfs -mkdir -p /data/peoples
hadoop dfs -put student.txt /data/peoples

CREATE TABLE student (
    id STRING,
    name STRING,
    age INT,
    gender STRING,
    clazz STRING
) WITH ( 
    'connector' = 'filesystem',
    'path' = 'hdfs://master:9000/data/peoples',
    'format' = 'csv'
);

连续查询 

       虽然最初设计时没有考虑流语义,但 SQL 是用于构建连续数据流水线的强大工具。Flink SQL 与传统数据库查询的不同之处在于,Flink SQL 持续消费到达的行并对其结果进行更新。

        一个连续查询永远不会终止,并会产生一个动态表作为结果。动态表是 Flink 中 Table API 和 SQL 对流数据支持的核心概念。连续流上的聚合需要在查询执行期间不断地存储聚合的结果。

select clazz,count(1) as c  
from student
group by clazz;

Sink 表 

        当运行此查询时,SQL 客户端实时但是以只读方式提供输出。存储结果,作为报表或仪表板的数据来源,需要写到另一个表。这可以使用 INSERT INTO 语句来实现。本节中引用的表称为 sink 表。INSERT INTO 语句将作为一个独立查询被提交到 Flink 集群中。

# sink表(保存查询数据) --  hdfs sink 

CREATE TABLE clazz_num (
    clazz STRING,
    c BIGINT
) WITH ( 
    'connector' = 'filesystem',
    'path' = 'hdfs://master:9000/data/clazz_num',
    'format' = 'csv'
);


# 将连续查询的结果插入到sink表中
batch是输出最终的结果,streamg模式输出连续结果

-- 如果连续查询的返回的动态表是一个更新的表
-- 插入语句的返回的字段和类型和sink表一致
SET 'execution.runtime-mode' = 'batch';   使用批处理模式写入hdfs

insert into clazz_num
select clazz,count(1) as c  
from student
group by clazz;

hadoop dfs -ls /data/clazz_num
hadoop dfs -cat /data/clazz_num/*

提交后,它将运行并将结果直接存储到 sink 表中,而不是将结果加载到系统内存中。

相关文章

数仓主流架构简介之一

数仓主流架构简介之一

一、Lambda架构Apache Storm的创建者Nathan Marz于 2011 年开发,旨在解决大规模实时数据处理的挑战。Lambda数据架构提供了一个可扩展、容错且灵活的系统来处理大量数据。...

MySQL运维实战(3.1) MySQL官方客户端使用介绍

mysql是mysql安装包默认的客户端。位于二进制安装包的bin目录。或者通过rpm安装包安装mysql-community-client。使用mysql程序linux终端下,输入mysql命令登陆...

MySQL优化器特性(四)表关联之BNL(Block Nested Loop)和Hash Join

MySQL优化器特性(四)表关联之BNL(Block Nested Loop)和Hash Join

什么是BNLMySQL表关联时,如果关联条件上没有合适的索引,则join时,对于驱动表的每一条记录,都需要全表扫描被驱动表。如果驱动表有多条数据,则需要多次全表扫描被驱动表,查询性能很差。对于这种情况...

MySQL Group Replication(二)监控篇

MySQL Group Replication(二)监控篇

说明组复制搭建成功后,为保证其正常运行,用户需要对组复制进行监控。MySQL 的 performance_schema 库中提供一些表,用于监控组复制的复制过程。[performance_schema...

大数据平台袋鼠云托管运维与自建集群运维对比

对比维度袋鼠云托管运维自建hadoop集群运维成本根据业务需求定制架构,预估业务增涨合理规划,低成本高性价比需自行预估资源,规划架构性能大量集群优化经验,根据业务需求进行点对点优化采用开源社区版本,性...

EMR-flinksql运行失败问题

EMR-flinksql运行失败问题

运行flinksqlsql-client.sh报错:[root@emr1 bin]# ./sql-client.shSLF4J: Class path contains multiple SLF4J...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。