CDH实操--CDH集成flink 1.13.6(二)

耀灵2年前技术文章971

一、编译flink

1、下载flink1.13.6源码包

wget https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.13.6/flink-1.13.6-src.tgz

2、修改flink的pom.xml文件

#修改hadoop版本:

1.png

#修改hive版本:

2.png

3.png

#添加Cloudera源头

<profile>
    <id>vendor-repos</id>
    <activation>
        <property>
            <name>vendor-repos</name>
        </property>
    </activation>
 
    <!-- Add vendor maven repositories -->
    <repositories>
        <!-- Cloudera -->
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
        </repository>
        <!-- MapR -->
        <repository>
            <id>mapr-releases</id>
            <url>https://repository.mapr.com/maven/</url>
            <snapshots><enabled>false</enabled></snapshots>
            <releases><enabled>true</enabled></releases>
        </repository>
    </repositories>
</profile>

4.png

编译flink:

#编译后文件在 cd /mnt/flink-1.13.6/flink-dist/target/flink-1.13.6-bin/flink-1.13.6/

[root@cdh03 flink]# mvn clean install -DskipTests -Dfast -Drat.skip=true -Dhaoop.version=3.0.0-cdh6.3.2 -Dinclude-hadoop -Pvendor-repos -Dscala-2.11 -T4C

5.png

修改flink-sql-commector-hive-2.2.0的pom.xml

[root@cdh03 flink-1.13.6]# vim flink-connectors/flink-sql-connector-hive-2.2.0/pom.xml

修改hive配置:

6.png

新增Cloudera源:

<repositories>
    <repository>
        <id>cloudera</id>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
    </repository>
</repositories>

7.png

#编译flink-connector-hive-2.2.0(sql_on_hive需要做这步)
#编译后文件在cd flink-connectors/flink-sql-connector-hive-2.2.0/target/
[root@cdh03 flink-sql-connector-hive-2.2.0]# mvn clean install -DskipTests -Dscala-2.11 -T4C

8.png

3、拷贝相关jar包到flink/lib

# 拷贝flink-sql-connector-hive到flink的lib目录下
# 拷贝hive-exec-2.1.1-cdh6.3.2.jar、libfb303-0.9.3.jar
cp -rp flink-sql-connector-hive-2.2.0_2.12-1.13.6.jar /data/flink-1.13.6/lib/
cp /opt/cloudera/parcels/CDH/jars/hive-exec-2.1.1-cdh6.3.2.jar /data/flink-1.13.6/lib/
cp /opt/cloudera/parcels/CDH/jars/libfb303-0.9.3.jar /data/flink-1.13.6/lib/

上传oracle CDC连接器包到flink/lib下:

9.png

4、打包准备好的flink源码:

tar -czvf flink-1.13.6-bin-scala_2.12.tgz flink-1.13.6  ##最好包直接打包到带httpd服务的/var/www/html下

二、制作Flink的parcel包和csd文件

1、下载制作脚本

# 克隆源码
git clone https://github.com/pkeropen/flink-parcel.git
cd flink-parcel

2、修改参数:

#FLINK 下载地址
#FLINK_URL=https://downloads.apache.org/flink/flink-1.14.4/flink-1.14.4-bin-scala_2.11.tgz
FLINK_URL=http://172.16.106.151/flink-1.13.6-bin-scala_2.12.tgz

#flink版本号
FLINK_VERSION=1.13.6

#扩展版本号
EXTENS_VERSION=BIN-SCALA_2.12

#操作系统版本,以centos为例
OS_VERSION=7

#CDH 小版本
CDH_MIN_FULL=5.7
CDH_MAX_FULL=6.3.3

#CDH大版本
CDH_MIN=5
CDH_MAX=6

3、复制安装包

这里把之前编译打包好的flink的tar包上复制到flink-parcel项目的根目录。flink-parcel在制作parcel时如果根目录没有flink包会从配置文件里的地址下载flink的tar包到项目根目录。如果根目录已存在安装包则会跳过下载,使用已有tar包。

提示:注意:这里一定要用自己编译的包,不要用从链接下载的包!!!

10.png

4、编译parcel

# 赋予执行权限
chmod +x ./build.sh
# 执行编译脚本
./build.sh parcel

编译完会在flink-parcel项目根目录下生成FLINK-1.12.0-BIN-SCALA_2.11_build文件夹

11.png

5、编译csd

# 编译flink on yarn版本
./build.sh  csd_on_yarn

csd文件是组件的导航文件

编译完成后在flink-parcel项目根目录下会生成1个jar包 FLINK_ON_YARN-1.13.6.jar

6、上传文件

将编译parcel后生成的FLINK-1.13.6-BIN-SCALA_2.11_build文件夹内的3个文件复制到CDH Server所在节点的/opt/cloudera/parcel-repo目录。将编译csd生成后的FLINK_ON_YARN-1.13.6.jar复制到CDH Server所在节点的/opt/cloudera/csd目录

7、重启CDH Server

# 重启server(仅server节点执行)
systemctl restart cloudera-scm-server



FAQ:

错误1:

/opt/cloudera/parcels/FLINK/lib/flink/bin/flink-yarn.sh: line 17: rotateLogFilesWithPrefix: command not found
完整日志

12.png

[root@cdh02 ~]# vim /opt/cloudera/parcels/FLINK/lib/flink/bin/config.sh  ##391行新增如下内容
rotateLogFilesWithPrefix() {
    dir=$1
    prefix=$2
    while read -r log ; do
        rotateLogFile "$log"
    # find distinct set of log file names, ignoring the rotation number (trailing dot and digit)
    done < <(find "$dir" ! -type d -path "${prefix}*" | sed s/\.[0-9][0-9]*$// | sort | uniq)
}
# 旋转日志文件
rotateLogFile() {
    log=$1;
    num=$MAX_LOG_FILE_NUMBER
    if [ -f "$log" -a "$num" -gt 0 ]; then
        while [ $num -gt 1 ]; do
            prev=`expr $num - 1`
            [ -f "$log.$prev" ] && mv "$log.$prev" "$log.$num"
            num=$prev
        done
        mv "$log" "$log.$num";
    fi
}

错误2:

解决:
Flink-yarn -> 配置 -> 高级 -> Flink-yarn 服务环境高级配置代码段(安全阀)Flink-yarn(服务范围)加入以下内容即可

HADOOP_USER_NAME=flink
HADOOP_CONF_DIR=/etc/hadoop/conf
HADOOP_HOME=/opt/cloudera/parcels/CDH
HADOOP_CLASSPATH=/opt/cloudera/parcels/CDH/jars/*
HBASE_CONF_DIR=/etc/hbase/conf

三、验证Flink服务

1、运行一个WordCount测试

13.png

# 在环境变量中加入
export HADOOP_CLASSPATH=`hadoop classpath`
[root@cdh03 bin]# ./flink run -t yarn-per-job /opt/cloudera/parcels/FLINK/lib/flink/examples/batch/WordCount.jar


相关文章

PostgreSQL 命令行工具介绍

前言psql 是 PostgreSQL 自带的命令行交互客户端工具,类似于 MySQL 的 mysql -u -p 不过相当于 MySQL 的命令行工具 psql 功能更丰富些,例如单击 tab 自动...

压测实操--TestDFSIO压测hdfs读写方案

压测实操--TestDFSIO压测hdfs读写方案

TestDFSIO主要是对hdfs的I/O性能进行测试,通过使用MapReduce作业来完成测试,作为并行读写文件进行I/O性能测试。每个map任务用于读或写每个文件,map的输出用于收集与处理文件相...

NameSpaces状态一直为Terminating

NameSpaces状态一直为Terminating

问题描述删除ingress-nginx后发现ingress-nginx的命名空间一直为销毁中,大致查看了下发现命名空间中已经没有其他资源。该状态已经持续了十几个小时强制删除命名空间```Plain T...

MySQL运维实战(2.2)忘记密码如何处理

如果忘记了一个普通用户的密码,可以使用管理员账号登录,修改其他用户的密码。但是如果所有管理员账号的密码都忘记了,应该怎么处理呢?如果忘记root密码,可以使用skip-grant-tables参数启动...

HBase导出表和备份表操作

HBase导出表和备份表操作

HBase提供了几种导出数据的方式,包括使用HBase自带的工具和使用HBase的API。本文主要是讲的使用HBase自带的工具进行导出首先我们创建一个表 插入一些数据hbase shelllistc...

网络数据链路层-MAC帧(1)

网络数据链路层-MAC帧(1)

1.数据链路层数据链路层是网络协议栈中最底层的内容,而在之前对其他层次的学习让我们知道传输层可以保证数据的可靠性问题,网络层保证数据跨网络转发的路由问题,而数据链路层解决的就是局域网内两台主机间通信的...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。