flink1.14.5使用CDH6.3.2的yarn提交作业

使用CDH6.3.2安装了hadoop集群,但是CDH不支持flink的安装,网上有CDH集成flink的文章,大都比较麻烦;但其实我们只需要把flink的作业提交到yarn集群即可,接下来以CDH yarn为基础,flink on yarn模式的配置步骤。

一、部署flink
1、下载解压

官方下载地址:Downloads | Apache Flink

注意:CDH6.3.2是使用的scala版本是2.11(可以去CHD中spark目录lib下,看一下scala版本),所以下载的flink也要scala_2.11版本的。

2、解压

cd /data/softs tar -zxvf flink-1.14.5-bin-scala_2.11.tgz

#修改名称

mv softs/flink-1.14.5 /data/flink-yarn

3、修改flink配置

vim conf/flink-conf.yaml

#配置java环境变量

env.java.home: /usr/local/jdk1.8.0_281/

#以下为高可用配置

yarn.application-attempts: 3
high-availability: zookeeper
high-availability.storageDir: hdfs://master1:8020/flink/yarn/ha
high-availability.zookeeper.quorum: master1:2181,node1:2181,node2:2181
high-availability.zookeeper.path.root: /flink-yarn
high-availability.cluster-id: /cluster_flink_yarn

4、修改操作用户(针对以session模式启动flink)

vim bin/yarn-session.sh

#操作hdfs的用户

export HADOOP_USER_NAME=hdfs

5、分发到其它节点

将配置好的flink分发到其它两个节点(我的集群是三个节点)

scp -r flink-yarn node1:/data/

scp -r flink-yarn node2:/data/

6、配置全局环境变量

想要让 Flink 服务运行与 YARN 之上,首先需要让 Flink 能够发现 YARN 和 HDFS 的相关配置,因此,需要通过HADOOP_CLASSPATH、HADOOP_CONF_DIR 属性来指定 Hadoop 配置文件所在目录;

因此需要在各个节点配置这两个属性的去全局变量。

vim /etc/profile

#添加如下两行

export HADOOP_CLASSPATH=`hadoop classpath`

export HADOOP_CONF_DIR=/etc/hadoop/conf.cloudera.yarn/

#刷新

source /etc/profile

7、设置归属用户

因为flink需要将作业提交到yarn集群上,即需要访问或者操作hadoop集群,所以需要有hdfs用户的权限(CDH集群默认hdfs用户有操作hadoop的权限),所以要将flink的归属用户设置为hdfs,且后续都必须用hdfs用户提交flink的作业。在各个节点执行如下操作:

chown -R hdfs:hdfs flink-yarn

二、提交flink作业
1、上传作业jar包

这里使用的是一个单词统计的jar包,使用时需要传入一个服务器IP作为监听的对象

rz flink-on-k8s-demo-1.0-SNAPSHOT.jar

2、在被监听服务器上发送消息

#在172.16.12.103 这台服务器上执行,并输入单词

nc -lk 7777

3、使用application模式启动flink作业

 ./bin/flink run-application -t yarn-application \   #指定flink作业的启动方式
 -c com.yale.StreamWordCount  \                      #指定程序的入口类
 ../softs/flink-on-k8s-demo-1.0-SNAPSHOT.jar  \      #程序jar包
 172.16.12.103                                                         #入参(被监听的服务器IP)

4、查看作业执行情况

打开yarn的webUI

可以看到一个正在运行的任务,点击 applicationId 进去,可以看到有两个容器,

点击logs进去

再点击taskmanager.out,可以看到单词统计的结果,说明成功了!!

三、遇到的问题
1、org.apache.flink.client.deployment.ClusterDeploymentException

答:flink的scala版本和CDH的scala版本不一致,将flink换成scala_2.11版本。

2、Please specify JAVA_HOME. Either in Flink config ./conf/flink-conf.yaml or as system-wide JAVA_HOME

答:在flink-conf.yaml文件中添加env.java.home属性指定java home。

相关推荐

  1. flink启动错误(使用YARN

    2024-01-17 15:00:02       38 阅读
  2. flink-cdc使用小结

    2024-01-17 15:00:02       52 阅读
  3. flink 作业动态维护更新,不重启flink,不提交作业

    2024-01-17 15:00:02       34 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-01-17 15:00:02       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-01-17 15:00:02       100 阅读
  3. 在Django里面运行非项目文件

    2024-01-17 15:00:02       82 阅读
  4. Python语言-面向对象

    2024-01-17 15:00:02       91 阅读

热门阅读

  1. ubuntu开发技巧总结

    2024-01-17 15:00:02       43 阅读
  2. 微信小程序---封装uni.$showMsg()方法

    2024-01-17 15:00:02       57 阅读
  3. 产品经理与产品运营的区别和联系

    2024-01-17 15:00:02       53 阅读
  4. C++嵌入式编程:硬件控制与物联网

    2024-01-17 15:00:02       57 阅读
  5. 基于WebFlux的websocket的分组和群发实现

    2024-01-17 15:00:02       51 阅读
  6. 深入浅出 Golang 中的参数传递机制

    2024-01-17 15:00:02       41 阅读