CDH实操--CDH集成flink 1.13.6(二)
一、编译flink
1、下载flink1.13.6源码包
wget https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.13.6/flink-1.13.6-src.tgz
2、修改flink的pom.xml文件
#修改hadoop版本:
#修改hive版本:
#添加Cloudera源头
<profile> <id>vendor-repos</id> <activation> <property> <name>vendor-repos</name> </property> </activation> <!-- Add vendor maven repositories --> <repositories> <!-- Cloudera --> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos</url> </repository> <!-- MapR --> <repository> <id>mapr-releases</id> <url>https://repository.mapr.com/maven/</url> <snapshots><enabled>false</enabled></snapshots> <releases><enabled>true</enabled></releases> </repository> </repositories> </profile>
编译flink:
#编译后文件在 cd /mnt/flink-1.13.6/flink-dist/target/flink-1.13.6-bin/flink-1.13.6/
[root@cdh03 flink]# mvn clean install -DskipTests -Dfast -Drat.skip=true -Dhaoop.version=3.0.0-cdh6.3.2 -Dinclude-hadoop -Pvendor-repos -Dscala-2.11 -T4C
修改flink-sql-commector-hive-2.2.0的pom.xml
[root@cdh03 flink-1.13.6]# vim flink-connectors/flink-sql-connector-hive-2.2.0/pom.xml
修改hive配置:
新增Cloudera源:
<repositories> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos</url> </repository> </repositories>
#编译flink-connector-hive-2.2.0(sql_on_hive需要做这步) #编译后文件在cd flink-connectors/flink-sql-connector-hive-2.2.0/target/ [root@cdh03 flink-sql-connector-hive-2.2.0]# mvn clean install -DskipTests -Dscala-2.11 -T4C
3、拷贝相关jar包到flink/lib
# 拷贝flink-sql-connector-hive到flink的lib目录下 # 拷贝hive-exec-2.1.1-cdh6.3.2.jar、libfb303-0.9.3.jar cp -rp flink-sql-connector-hive-2.2.0_2.12-1.13.6.jar /data/flink-1.13.6/lib/ cp /opt/cloudera/parcels/CDH/jars/hive-exec-2.1.1-cdh6.3.2.jar /data/flink-1.13.6/lib/ cp /opt/cloudera/parcels/CDH/jars/libfb303-0.9.3.jar /data/flink-1.13.6/lib/
上传oracle CDC连接器包到flink/lib下:
4、打包准备好的flink源码:
tar -czvf flink-1.13.6-bin-scala_2.12.tgz flink-1.13.6 ##最好包直接打包到带httpd服务的/var/www/html下
二、制作Flink的parcel包和csd文件
1、下载制作脚本
# 克隆源码 git clone https://github.com/pkeropen/flink-parcel.git cd flink-parcel
2、修改参数:
#FLINK 下载地址 #FLINK_URL=https://downloads.apache.org/flink/flink-1.14.4/flink-1.14.4-bin-scala_2.11.tgz FLINK_URL=http://172.16.106.151/flink-1.13.6-bin-scala_2.12.tgz #flink版本号 FLINK_VERSION=1.13.6 #扩展版本号 EXTENS_VERSION=BIN-SCALA_2.12 #操作系统版本,以centos为例 OS_VERSION=7 #CDH 小版本 CDH_MIN_FULL=5.7 CDH_MAX_FULL=6.3.3 #CDH大版本 CDH_MIN=5 CDH_MAX=6
3、复制安装包
这里把之前编译打包好的flink的tar包上复制到flink-parcel项目的根目录。flink-parcel在制作parcel时如果根目录没有flink包会从配置文件里的地址下载flink的tar包到项目根目录。如果根目录已存在安装包则会跳过下载,使用已有tar包。
提示:注意:这里一定要用自己编译的包,不要用从链接下载的包!!!
4、编译parcel
# 赋予执行权限 chmod +x ./build.sh # 执行编译脚本 ./build.sh parcel
编译完会在flink-parcel项目根目录下生成FLINK-1.12.0-BIN-SCALA_2.11_build文件夹
5、编译csd
# 编译flink on yarn版本 ./build.sh csd_on_yarn
csd文件是组件的导航文件
编译完成后在flink-parcel项目根目录下会生成1个jar包 FLINK_ON_YARN-1.13.6.jar
6、上传文件
将编译parcel后生成的FLINK-1.13.6-BIN-SCALA_2.11_build文件夹内的3个文件复制到CDH Server所在节点的/opt/cloudera/parcel-repo目录。将编译csd生成后的FLINK_ON_YARN-1.13.6.jar复制到CDH Server所在节点的/opt/cloudera/csd目录
7、重启CDH Server
# 重启server(仅server节点执行) systemctl restart cloudera-scm-server
FAQ:
错误1:
/opt/cloudera/parcels/FLINK/lib/flink/bin/flink-yarn.sh: line 17: rotateLogFilesWithPrefix: command not found
完整日志
[root@cdh02 ~]# vim /opt/cloudera/parcels/FLINK/lib/flink/bin/config.sh ##391行新增如下内容
rotateLogFilesWithPrefix() { dir=$1 prefix=$2 while read -r log ; do rotateLogFile "$log" # find distinct set of log file names, ignoring the rotation number (trailing dot and digit) done < <(find "$dir" ! -type d -path "${prefix}*" | sed s/\.[0-9][0-9]*$// | sort | uniq) } # 旋转日志文件 rotateLogFile() { log=$1; num=$MAX_LOG_FILE_NUMBER if [ -f "$log" -a "$num" -gt 0 ]; then while [ $num -gt 1 ]; do prev=`expr $num - 1` [ -f "$log.$prev" ] && mv "$log.$prev" "$log.$num" num=$prev done mv "$log" "$log.$num"; fi }
错误2:
解决:
Flink-yarn -> 配置 -> 高级 -> Flink-yarn 服务环境高级配置代码段(安全阀)Flink-yarn(服务范围)加入以下内容即可
HADOOP_USER_NAME=flink HADOOP_CONF_DIR=/etc/hadoop/conf HADOOP_HOME=/opt/cloudera/parcels/CDH HADOOP_CLASSPATH=/opt/cloudera/parcels/CDH/jars/* HBASE_CONF_DIR=/etc/hbase/conf
三、验证Flink服务
1、运行一个WordCount测试
# 在环境变量中加入 export HADOOP_CLASSPATH=`hadoop classpath` [root@cdh03 bin]# ./flink run -t yarn-per-job /opt/cloudera/parcels/FLINK/lib/flink/examples/batch/WordCount.jar