Flinksql Kafka 接收流数据并打印到控制台

芒果11个月前技术文章1213

本文目的
使用Flink SQL创建一个流处理作业,将来自Kafka主题"dahua_picrecord"的数据写入到另一个表”print_table”控制台中。
使用sql-client前 需要启动yarn-session哦
首先需要在CREATE TABLE
CREATE TABLE test_source (
  objId STRING,
  data STRING,
  capTime STRING,
  dataType STRING,
  channelCode STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'test',
  'properties.bootstrap.servers' = '172.16.121.194:9092',
  'properties.group.id' = 'test-dataq-01',
  'format' = 'json',
  'scan.startup.mode' = 'earliest-offset'
);

CFC9E3F3-A2FA-43C5-A99C-C765F1A0ACAB.png
创建”print_table"
CREATE TABLE print_table (
  objId STRING,
  data STRING,
  capTime STRING,
  dataType STRING,
  channelCode STRING
) WITH (
  'connector' = 'print'
);
ACF330CB-6770-4C05-8D86-23C622FAD014.png
将数据从test_source 插入到 print_table 中
INSERT INTO print_table
SELECT objId, data, capTime, dataType, channelCode
FROM test_source;

接下来我们去查看yarn任务
2735D3C5-74DB-43DB-BB91-82DA077CACEB.png
点进去看看
开始向test写一些json数据
/opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 172.16.121.194:9092 --topic test
{"objId":"12345","data":"example data 1","capTime":"2023-11-07T08:00:00","dataType":"exampleType","channelCode":"ABCDEF"}
{"objId":"54321","data":"example data 2","capTime":"2023-11-07T08:15:00","dataType":"anotherType","channelCode":"GHIJKL"}
{"objId":"99999","data":"more example data","capTime":"2023-11-07T08:30:00","dataType":"additionalType","channelCode":"ZYXWVU"}
{"objId":"11111","data":"extra data","capTime":"2023-11-07T08:45:00","dataType":"extraType","channelCode":"QRSTUV"}
{"objId":"77777","data":"additional example data","capTime":"2023-11-07T09:00:00","dataType":"moreType","channelCode":"MNBVCX"}
{"objId":"88888","data":"more and more data","capTime":"2023-11-07T09:15:00","dataType":"typeX","channelCode":"POIUYT"}
{"objId":"22222","data":"different data","capTime":"2023-11-07T09:30:00","dataType":"typeY","channelCode":"LAKSDJ"}
{"objId":"66666","data":"sample data","capTime":"2023-11-07T09:45:00","dataType":"testType","channelCode":"QWERTY"}
{"objId":"44444","data":"new data","capTime":"2023-11-07T10:00:00","dataType":"newType","channelCode":"ZXCVBN"}
{"objId":"55555","data":"fresh data","capTime":"2023-11-07T10:15:00","dataType":"freshType","channelCode":"EDCRFV"}
7FBE0A68-D3AD-433C-8672-FB49C7C81FAA.png
查看flinkweb看数据过来了
2B6440A3-1BD4-4721-B9C4-3C4233BC02FF.png
输出到了控制台
AB8DC330-4763-4CE7-84D3-F09B86863507.png
完成


标签: 大数据运维

相关文章

ES运维(三)架构与规划(阿里云)

ES运维(三)架构与规划(阿里云)

1、 阿里云Elasticsearch架构图阿⾥云Elasticsearch和Kibana容器化运⾏在ECS中,监控agent(独⽴进程)负责收集监控指标,通过SLS发送给云监控完成监控报警。实例之间...

Mac安装Hadoop文档-保姆级操作(二)

Mac安装Hadoop文档-保姆级操作(二)

配置hadoop进入hadoop的目录:cd /opt/homebrew/Cellar/hadoop/3.3.6/libexec/etc/hadoop修改core-site.xml<config...

EMR-flinksql运行失败问题

EMR-flinksql运行失败问题

运行flinksqlsql-client.sh报错:[root@emr1 bin]# ./sql-client.shSLF4J: Class path contains multiple SLF4J...

大数据高可用系列--kudu高可用应急方案

大数据高可用系列--kudu高可用应急方案

1 设置机架感知1.1 前置说明    1.9版本后的kudu已经支持机架感知(cdh6之后的版本中的kudu已支持),由于kudu的每个Tablet一般是三副...

CDH实操--kudumaster迁移

CDH实操--kudumaster迁移

1 概述本次kudumaster迁移,中间不需要停kudu集群(会涉及滚动重启kudu角色); 注:若因为任务持续运行导致kudu停止超时可手动一台台停止-启动2 master迁移将cdh2中的ma...

深度解读|云掣《云运维服务白皮书》全方位解析!

深度解读|云掣《云运维服务白皮书》全方位解析!

如今全球各行各业纷纷进行数字化变革,为适应数字经济环境下企业生存发展和市场变化的需要,企业选择进行主动的、系统性、整体性的数字化转型升级。大数据、云计算、人工智能、区块链等新一代信息通信技术为企业的数...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。