CDH实操--CDH集成flink 1.13.6(二)

耀灵1年前技术文章697

一、编译flink

1、下载flink1.13.6源码包

wget https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.13.6/flink-1.13.6-src.tgz

2、修改flink的pom.xml文件

#修改hadoop版本:

1.png

#修改hive版本:

2.png

3.png

#添加Cloudera源头

<profile>
    <id>vendor-repos</id>
    <activation>
        <property>
            <name>vendor-repos</name>
        </property>
    </activation>
 
    <!-- Add vendor maven repositories -->
    <repositories>
        <!-- Cloudera -->
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
        </repository>
        <!-- MapR -->
        <repository>
            <id>mapr-releases</id>
            <url>https://repository.mapr.com/maven/</url>
            <snapshots><enabled>false</enabled></snapshots>
            <releases><enabled>true</enabled></releases>
        </repository>
    </repositories>
</profile>

4.png

编译flink:

#编译后文件在 cd /mnt/flink-1.13.6/flink-dist/target/flink-1.13.6-bin/flink-1.13.6/

[root@cdh03 flink]# mvn clean install -DskipTests -Dfast -Drat.skip=true -Dhaoop.version=3.0.0-cdh6.3.2 -Dinclude-hadoop -Pvendor-repos -Dscala-2.11 -T4C

5.png

修改flink-sql-commector-hive-2.2.0的pom.xml

[root@cdh03 flink-1.13.6]# vim flink-connectors/flink-sql-connector-hive-2.2.0/pom.xml

修改hive配置:

6.png

新增Cloudera源:

<repositories>
    <repository>
        <id>cloudera</id>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
    </repository>
</repositories>

7.png

#编译flink-connector-hive-2.2.0(sql_on_hive需要做这步)
#编译后文件在cd flink-connectors/flink-sql-connector-hive-2.2.0/target/
[root@cdh03 flink-sql-connector-hive-2.2.0]# mvn clean install -DskipTests -Dscala-2.11 -T4C

8.png

3、拷贝相关jar包到flink/lib

# 拷贝flink-sql-connector-hive到flink的lib目录下
# 拷贝hive-exec-2.1.1-cdh6.3.2.jar、libfb303-0.9.3.jar
cp -rp flink-sql-connector-hive-2.2.0_2.12-1.13.6.jar /data/flink-1.13.6/lib/
cp /opt/cloudera/parcels/CDH/jars/hive-exec-2.1.1-cdh6.3.2.jar /data/flink-1.13.6/lib/
cp /opt/cloudera/parcels/CDH/jars/libfb303-0.9.3.jar /data/flink-1.13.6/lib/

上传oracle CDC连接器包到flink/lib下:

9.png

4、打包准备好的flink源码:

tar -czvf flink-1.13.6-bin-scala_2.12.tgz flink-1.13.6  ##最好包直接打包到带httpd服务的/var/www/html下

二、制作Flink的parcel包和csd文件

1、下载制作脚本

# 克隆源码
git clone https://github.com/pkeropen/flink-parcel.git
cd flink-parcel

2、修改参数:

#FLINK 下载地址
#FLINK_URL=https://downloads.apache.org/flink/flink-1.14.4/flink-1.14.4-bin-scala_2.11.tgz
FLINK_URL=http://172.16.106.151/flink-1.13.6-bin-scala_2.12.tgz

#flink版本号
FLINK_VERSION=1.13.6

#扩展版本号
EXTENS_VERSION=BIN-SCALA_2.12

#操作系统版本,以centos为例
OS_VERSION=7

#CDH 小版本
CDH_MIN_FULL=5.7
CDH_MAX_FULL=6.3.3

#CDH大版本
CDH_MIN=5
CDH_MAX=6

3、复制安装包

这里把之前编译打包好的flink的tar包上复制到flink-parcel项目的根目录。flink-parcel在制作parcel时如果根目录没有flink包会从配置文件里的地址下载flink的tar包到项目根目录。如果根目录已存在安装包则会跳过下载,使用已有tar包。

提示:注意:这里一定要用自己编译的包,不要用从链接下载的包!!!

10.png

4、编译parcel

# 赋予执行权限
chmod +x ./build.sh
# 执行编译脚本
./build.sh parcel

编译完会在flink-parcel项目根目录下生成FLINK-1.12.0-BIN-SCALA_2.11_build文件夹

11.png

5、编译csd

# 编译flink on yarn版本
./build.sh  csd_on_yarn

csd文件是组件的导航文件

编译完成后在flink-parcel项目根目录下会生成1个jar包 FLINK_ON_YARN-1.13.6.jar

6、上传文件

将编译parcel后生成的FLINK-1.13.6-BIN-SCALA_2.11_build文件夹内的3个文件复制到CDH Server所在节点的/opt/cloudera/parcel-repo目录。将编译csd生成后的FLINK_ON_YARN-1.13.6.jar复制到CDH Server所在节点的/opt/cloudera/csd目录

7、重启CDH Server

# 重启server(仅server节点执行)
systemctl restart cloudera-scm-server



FAQ:

错误1:

/opt/cloudera/parcels/FLINK/lib/flink/bin/flink-yarn.sh: line 17: rotateLogFilesWithPrefix: command not found
完整日志

12.png

[root@cdh02 ~]# vim /opt/cloudera/parcels/FLINK/lib/flink/bin/config.sh  ##391行新增如下内容
rotateLogFilesWithPrefix() {
    dir=$1
    prefix=$2
    while read -r log ; do
        rotateLogFile "$log"
    # find distinct set of log file names, ignoring the rotation number (trailing dot and digit)
    done < <(find "$dir" ! -type d -path "${prefix}*" | sed s/\.[0-9][0-9]*$// | sort | uniq)
}
# 旋转日志文件
rotateLogFile() {
    log=$1;
    num=$MAX_LOG_FILE_NUMBER
    if [ -f "$log" -a "$num" -gt 0 ]; then
        while [ $num -gt 1 ]; do
            prev=`expr $num - 1`
            [ -f "$log.$prev" ] && mv "$log.$prev" "$log.$num"
            num=$prev
        done
        mv "$log" "$log.$num";
    fi
}

错误2:

解决:
Flink-yarn -> 配置 -> 高级 -> Flink-yarn 服务环境高级配置代码段(安全阀)Flink-yarn(服务范围)加入以下内容即可

HADOOP_USER_NAME=flink
HADOOP_CONF_DIR=/etc/hadoop/conf
HADOOP_HOME=/opt/cloudera/parcels/CDH
HADOOP_CLASSPATH=/opt/cloudera/parcels/CDH/jars/*
HBASE_CONF_DIR=/etc/hbase/conf

三、验证Flink服务

1、运行一个WordCount测试

13.png

# 在环境变量中加入
export HADOOP_CLASSPATH=`hadoop classpath`
[root@cdh03 bin]# ./flink run -t yarn-per-job /opt/cloudera/parcels/FLINK/lib/flink/examples/batch/WordCount.jar


相关文章

Python functools 模块

1、reduce 方法reduce 方法,顾名思义就是减少,map reduce 应用:大数据语法: reduce(function, sequence[, initial]) -> value...

使用Sqoop将数据从Hive导入MySQL(一)

使用Sqoop将数据从Hive导入MySQL(一)

使用Sqoop将数据从Hive导入MySQL首先查看csv数据类型创建类似的hive表并导入数据CREATE TABLE data (    province STRING,    code INT,...

DBeaver连接Trino

DBeaver连接Trino

1、背景trino 开启https,需要通过dbeaver进行连接DBeaver版本:21.2.02、解决办法下载安装dbeaver打开选择选择trino填写主机如果trino开启https,则可以使...

Hadoop3.2.4纠删码实操(三)

Hadoop3.2.4纠删码实操(三)

1、纠删码实操1.在HDFS中建立以下三个目录,并都设置为XOR-2-1-1024k策略。[root@hd2 hadoop]# hadoop fs -mkdir /ec_xor_s SLF4J: C...

数据库经验之谈-数据库join时必须使用索引

数据库join时必须使用索引,否则效率急剧下降。当执行数据库 JOIN 操作时,如果没有使用索引,则数据库需要执行全表扫描(Full Table Scan)来查找匹配的行。这意味着数据库将检查表中的每...

ES运维(四)扩容方式迁移

ES运维(四)扩容方式迁移

1 迁移概述本次模拟es在线迁移方式:集群扩容-->数据迁移-->老节点下线-->服务重启刷新配置。 中间master替换的时候会有短暂的不可用。 另外业务测需注意:老节点下线前...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。