FlinkOnYarn 监控 flink任务

Flink任务一般为实时不断运行的任务,如果没有任务监控,
任务异常时无法第一时间处理会比较麻烦。
这里通过调用API接口方式来获取参数,实现任务监控。

Flink任务监控(基于API接口编写shell脚本)
一 flink-on-yarn 模式
二 编写shell 脚本 

获取所有application

curl -s http://XXX:8088/ws/v1/cluster/apps

获取 state值为 RUNNING 的application任务

curl -s http://XXX:8088/ws/v1/cluster/apps?state=RUNNING 

获取这个任务单个信息 

curl -s http://XXX:8088/ws/v1/cluster/apps/application_1619074605427_0063 |jq .app.state

jq,是linux一个很方便的json处理工具

通俗的说就是一个能够接受json,处理json,输出json的程序,反正很好用。

安装起来也非常的方便,直接使用yum即可安装。linux下离线安装jq工具 - 代码天地 (codetd.com)

yum install jq

编写shell脚本

由于公司离线yarn和实时yarn 采用是分开的方式。
只需要监控实时yarn 任务有没有处于RUNNING,达到监控的目的
这里shell脚本也只记录,flink-on-yarn 这种部署方式任务监控
shell脚本水平有限,大家多多谅解,欢迎指导

shell脚本实现功能:
获取线运行job任务,记录到日志文件。下一次脚本调用时候读取日志文件,判断状态。
不是RUNNING,就告警同时重新记录日志。

#!/bin/bash

Joblist=`cat /opt/shell/logs/flink_job.log`    #获取记录job的log文件
let i=0  #获取任务数
let log_count=0  #获取日志中的任务数
start_count=RUNNING  #判断任务是否存在异常

############## 1 判断日志文件内容是否为空,为空时自动读取flink任务并记录到日志文件 #########
if [ -z "$Joblist" ]
then
	while :
	do
		job_id[$i]=`curl -s http://XXX:8088/ws/v1/cluster/apps?state=RUNNING |jq .apps.app[$i].id`

		if [ ${job_id[$i]} = "null" ];then
			break
		else
			echo ${job_id[$i]}
			echo ${job_id[$i]}>>/opt/shell/logs/flink_job.log
			let i++
		fi
	done
fi


############## 2 读取文件中JOB任务 ##################

let i=0
while read line
do
	JOB[$i]=$line
	let i++
done</opt/shell/logs/flink_job.log

log_count=$i #获取日志中的任务数


########### 3  判断任务状态,是否为RUNNIG,不是则邮件告警   ###############
for ((j=0;j<i;j++))
do
	JOB_ID=${JOB[$j]//\"}
	JOB_status=`curl -s http://XXXX:8088/ws/v1/cluster/apps/$JOB_ID  | jq .app.state`
	JOB_NAME=`curl -s http://XXX:8088/ws/v1/cluster/apps/$JOB_ID  | jq .app.name`
	START=$[`curl -s http://XXX:8088/ws/v1/cluster/apps/$JOB_ID | jq  .app.startedTime` / 1000]

#	echo "JOB_NAME: "$JOB_NAME
#	echo 启动时间: `date -d @$START +"%F %H:%M:%S"`
#	echo "JOB_status: " ${JOB_status//\"}

#echo -e "【$JOB_NAME】 \n JOB_ID: $JOB_ID \n 启动时间: `date -d @$START +"%F %H:%M:%S"` \n 检查时间: `date "+%Y-%m-%d %H:%M:%S"` \n 目前状态: $JOB_status"
#echo "=============================================="

	if [ ${JOB_status//\"} != "RUNNING" ];then
		SUBJECT="【异常告警】Flink任务异常"
		TEXT="Flink任务 【$JOB_NAME】 异常故障 \n\nJOB_ID: $JOB_ID\n\n启动时间: `date -d @$START +"%F %H:%M:%S"` \n\n检查时间: `date "+%Y-%m-%d %H:%M:%S"`  \n\n目前状态: $JOB_status"
		echo -e $TEXT | mail -s $SUBJECT     邮箱地址
		start_count=erron
	fi
done


########### 4  出现任务异常,重新读取job 任务记录到日志文件   ###############

let i=0
if [ $start_count == "erron" ];then


echo '重新写入日志文件'
	while :
	do
		job_id[$i]=`curl -s http://XXXX:8088/ws/v1/cluster/apps?state=RUNNING |jq .apps.app[$i].id`

		if [ ${job_id[$i]} = "null" ];then
			break
		elif  [ $i == 0 ]; then
			echo ${job_id[$i]}>/opt/shell/logs/flink_job.log

		else
			echo ${job_id[$i]}>>/opt/shell/logs/flink_job.log
		fi
		let i++
	done
	start_count=RUNNING
fi

########### 5  判断线上任务数是否一致,是否有新任务增加   ###############



let i=0
while :
do
	job_id[$i]=`curl -s http://XXX:8088/ws/v1/cluster/apps?state=RUNNING |jq .apps.app[$i].id`

	if [ ${job_id[$i]} = "null" ];then
		break
	else

		let i++
	fi
done
let count=$i #线上任务数
echo "==========================线上最新RUNNING状态任务数: "$count
echo "==========================日志RUNNING状态任务数: "$log_count



if [ ! $count -eq $log_count ]; then
	echo "现有RUNNING状态任务数不相等于已记录的任务数"
	echo  ${job_id[0]} >/opt/shell/logs/flink_job.log
	for ((i=1;i<count;i++))
	do
		echo "重新写入JOB: "${job_id[$i]}
		echo ${job_id[$i]}>> /opt/shell/logs/flink_job.log

	done

fi

echo "======================当前时间: `date "+%Y-%m-%d %H:%M:%S"`======================================="
echo  ================================================================================================
echo  =====================================本次crontab监控结束========================================
echo  ================================================================================================

Yarn REST API 使用指南-阿里云开发者社区

相关推荐

  1. FlinkOnYarn 监控 flink任务

    2024-01-11 15:52:02       58 阅读
  2. Flink 任务指标监控

    2024-01-11 15:52:02       60 阅读
  3. 总结:Flink任务执行

    2024-01-11 15:52:02       22 阅读
  4. Flink面试整理-Flink监控和日志收集

    2024-01-11 15:52:02       32 阅读
  5. Flink命令行启动Job任务

    2024-01-11 15:52:02       36 阅读
  6. Flink 任务调度策略:Eager 模式详解

    2024-01-11 15:52:02       31 阅读
  7. Flink 任务启动常用命令

    2024-01-11 15:52:02       27 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-01-11 15:52:02       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-01-11 15:52:02       100 阅读
  3. 在Django里面运行非项目文件

    2024-01-11 15:52:02       82 阅读
  4. Python语言-面向对象

    2024-01-11 15:52:02       91 阅读

热门阅读

  1. Docker 网络

    2024-01-11 15:52:02       50 阅读
  2. html面试题

    2024-01-11 15:52:02       37 阅读
  3. Android亮度调节的几种实现方法

    2024-01-11 15:52:02       62 阅读
  4. Android - 串口通讯(SerialPort)

    2024-01-11 15:52:02       47 阅读
  5. CNCF之CoreDNS

    2024-01-11 15:52:02       54 阅读
  6. R语言【base】——apply():在数组边距上应用函数

    2024-01-11 15:52:02       47 阅读
  7. Polars使用指南(一)

    2024-01-11 15:52:02       50 阅读
  8. 【Machine Learning】Supervised Learning

    2024-01-11 15:52:02       34 阅读
  9. 服务器需要做哪方面的维护?

    2024-01-11 15:52:02       59 阅读
  10. OpenSSL升级版本

    2024-01-11 15:52:02       57 阅读
  11. 网络安全导论知识要点

    2024-01-11 15:52:02       63 阅读