flink源码分析 - standalone模式下jobmanager启动过程配置文件加载

flink版本: flink-1.11.2

代码位置: org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint#main

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.runtime.entrypoint;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.SignalHandler;

/*************************************************
 * TODO_MA 马中华 https://blog.csdn.net/zhongqi2513
 *  注释: flink有三种方式执行应用程序:session mode, per-job mode, applocation mode
 *  模型的区别主要包含:
 *  1. 集群生命周期和资源隔离保证
 *  2. 应用程序的main()方法是在客户机上执行还是在集群上执行
 */

/**
 * Entry point for the standalone session cluster.
 */
public class StandaloneSessionClusterEntrypoint extends SessionClusterEntrypoint {

	public StandaloneSessionClusterEntrypoint(Configuration configuration) {
		super(configuration);
	}

	@Override
	protected DefaultDispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(
		Configuration configuration) {
		/*************************************************
		 * TODO_MA 马中华 https://blog.csdn.net/zhongqi2513
		 *  注释:
		 *  1、参数是:StandaloneResourceManagerFactory 实例
		 *  2、返回值:DefaultDispatcherResourceManagerComponentFactory 实例
		 */
		return DefaultDispatcherResourceManagerComponentFactory
			.createSessionComponentFactory(StandaloneResourceManagerFactory.getInstance());
	}

	/*************************************************
	 * TODO_MA 马中华 https://blog.csdn.net/zhongqi2513
	 *  注释: 入口
	 */
	public static void main(String[] args) {

		// TODO_MA 注释:提供对 JVM 执行环境的访问的实用程序类,如执行用户(getHadoopUser())、启动选项或JVM版本。
		// startup checks and logging
		EnvironmentInformation.logEnvironmentInfo(LOG, StandaloneSessionClusterEntrypoint.class.getSimpleName(), args);

		// TODO_MA 注释:注册一些信号处理
		SignalHandler.register(LOG);

		// TODO_MA 注释: 安装安全关闭的钩子
		// TODO_MA 注释: 你的 Flink集群启动过程中,或者在启动好了之后的运行中,
		// TODO_MA 注释: 都有可能接收到关闭集群的命令
		JvmShutdownSafeguard.installAsShutdownHook(LOG);

		EntrypointClusterConfiguration entrypointClusterConfiguration = null;

		// TODO_MA 注释:
		final CommandLineParser<EntrypointClusterConfiguration> commandLineParser = new CommandLineParser<>(
			new EntrypointClusterConfigurationParserFactory());

		try {

			/*************************************************
			 * TODO_MA 马中华 https://blog.csdn.net/zhongqi2513
			 *  注释: 对传入的参数进行解析
			 *  内部通过 EntrypointClusterConfigurationParserFactory 解析配置文件,
			 *  返回 EntrypointClusterConfiguration 为 ClusterConfiguration 的子类
			 */
			entrypointClusterConfiguration = commandLineParser.parse(args);

		} catch(FlinkParseException e) {
			LOG.error("Could not parse command line arguments {}.", args, e);
			commandLineParser.printHelp(StandaloneSessionClusterEntrypoint.class.getSimpleName());
			System.exit(1);
		}

		/*************************************************
		 * TODO_MA 马中华 https://blog.csdn.net/zhongqi2513
		 *  注释: 解析配置参数, 解析 flink 的配置文件: fink-conf.ymal
		 */
		Configuration configuration = loadConfiguration(entrypointClusterConfiguration);

		/*************************************************
		 * TODO_MA 马中华 https://blog.csdn.net/zhongqi2513
		 *  注释:创建 StandaloneSessionClusterEntrypoint
		 */
		StandaloneSessionClusterEntrypoint entrypoint = new StandaloneSessionClusterEntrypoint(configuration);

		/*************************************************
		 * TODO_MA 马中华 https://blog.csdn.net/zhongqi2513
		 *  注释:启动集群的entrypoint
		 *  这个方法接受的是父类 ClusterEntrypoint,可想而知其他几种启动方式也是通过这个方法。
		 */
		ClusterEntrypoint.runClusterEntrypoint(entrypoint);
	}
}

加载配置文件主要分两步:

1.  解析命令行传入参数。核心代码:

entrypointClusterConfiguration = commandLineParser.parse(args);

原理参考:

flink源码分析 - 命令行参数解析-CommandLineParser-CSDN博客

2. flink-yaml配置加载:

核心代码:

Configuration configuration = loadConfiguration(entrypointClusterConfiguration);

其他部分略过,仅记录最关键yaml文件解析部分:  注意下方: org.apache.flink.configuration.GlobalConfiguration#loadYAMLResource

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.configuration;

import org.apache.flink.annotation.Internal;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;

/**
 * Global configuration object for Flink. Similar to Java properties configuration
 * objects it includes key-value pairs which represent the framework's configuration.
 */
@Internal
public final class GlobalConfiguration {

	private static final Logger LOG = LoggerFactory.getLogger(GlobalConfiguration.class);

	public static final String FLINK_CONF_FILENAME = "flink-conf.yaml";

	// the keys whose values should be hidden
	private static final String[] SENSITIVE_KEYS = new String[] {"password", "secret", "fs.azure.account.key"};

	// the hidden content to be displayed
	public static final String HIDDEN_CONTENT = "******";

	// --------------------------------------------------------------------------------------------

	private GlobalConfiguration() {}

	// --------------------------------------------------------------------------------------------

	/**
	 * Loads the global configuration from the environment. Fails if an error occurs during loading. Returns an
	 * empty configuration object if the environment variable is not set. In production this variable is set but
	 * tests and local execution/debugging don't have this environment variable set. That's why we should fail
	 * if it is not set.
	 * @return Returns the Configuration
	 */
	public static Configuration loadConfiguration() {
		return loadConfiguration(new Configuration());
	}

	/**
	 * Loads the global configuration and adds the given dynamic properties
	 * configuration.
	 *
	 * @param dynamicProperties The given dynamic properties
	 * @return Returns the loaded global configuration with dynamic properties
	 */
	public static Configuration loadConfiguration(Configuration dynamicProperties) {
		final String configDir = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
		if (configDir == null) {
			return new Configuration(dynamicProperties);
		}

		return loadConfiguration(configDir, dynamicProperties);
	}

	/**
	 * Loads the configuration files from the specified directory.
	 *
	 * <p>YAML files are supported as configuration files.
	 *
	 * @param configDir
	 *        the directory which contains the configuration files
	 */
	public static Configuration loadConfiguration(final String configDir) {

		// TODO_MA 注释:
		return loadConfiguration(configDir, null);
	}

	/**
	 * Loads the configuration files from the specified directory. If the dynamic properties
	 * configuration is not null, then it is added to the loaded configuration.
	 *
	 * @param configDir directory to load the configuration from
	 * @param dynamicProperties configuration file containing the dynamic properties. Null if none.
	 * @return The configuration loaded from the given configuration directory
	 */
	public static Configuration loadConfiguration(final String configDir, @Nullable final Configuration dynamicProperties) {

		if (configDir == null) {
			throw new IllegalArgumentException("Given configuration directory is null, cannot load configuration");
		}

		final File confDirFile = new File(configDir);
		if (!(confDirFile.exists())) {
			throw new IllegalConfigurationException(
				"The given configuration directory name '" + configDir +
					"' (" + confDirFile.getAbsolutePath() + ") does not describe an existing directory.");
		}

		// TODO_MA 注释: Flink 配置文件: flink-conf.yaml
		// get Flink yaml configuration file
		final File yamlConfigFile = new File(confDirFile, FLINK_CONF_FILENAME);

		if (!yamlConfigFile.exists()) {
			throw new IllegalConfigurationException(
				"The Flink config file '" + yamlConfigFile +
					"' (" + confDirFile.getAbsolutePath() + ") does not exist.");
		}

		// TODO_MA 注释: 读取 flink-conf.xml 配置文件
		Configuration configuration = loadYAMLResource(yamlConfigFile);

		if (dynamicProperties != null) {
			configuration.addAll(dynamicProperties);
		}

		return configuration;
	}

	/**
	 * Loads a YAML-file of key-value pairs.
	 *
	 * <p>Colon and whitespace ": " separate key and value (one per line). The hash tag "#" starts a single-line comment.
	 *
	 * <p>Example:
	 *
	 * <pre>
	 * jobmanager.rpc.address: localhost # network address for communication with the job manager
	 * jobmanager.rpc.port   : 6123      # network port to connect to for communication with the job manager
	 * taskmanager.rpc.port  : 6122      # network port the task manager expects incoming IPC connections
	 * </pre>
	 *
	 * <p>This does not span the whole YAML specification, but only the *syntax* of simple YAML key-value pairs (see issue
	 * #113 on GitHub). If at any point in time, there is a need to go beyond simple key-value pairs syntax
	 * compatibility will allow to introduce a YAML parser library.
	 *
	 * @param file the YAML file to read from
	 * @see <a href="http://www.yaml.org/spec/1.2/spec.html">YAML 1.2 specification</a>
	 */
	private static Configuration loadYAMLResource(File file) {

		// TODO_MA 注释: 存储 配置解析结果的 容器
		final Configuration config = new Configuration();

		/*************************************************
		 * TODO_MA 马中华 https://blog.csdn.net/zhongqi2513
		 *  注释:  读取 flink-conf.yaml 文件
		 */
		try (BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(file)))){

			String line;
			int lineNo = 0;

			// TODO_MA 注释: 读取一行
			while ((line = reader.readLine()) != null) {
				lineNo++;
				// 1. check for comments
				/*
				zhouxianfu 2023-07-30: 此处为了防止下面这种情况导致后期取值错误
				key: value ## comment  以下是示例
				high-availability.cluster-id: /flink-1.12.0_cluster_yarn   ## 注意: yarn模式下不能配置这个参数,而是由yarn自动生成
				 */
				String[] comments = line.split("#", 2);
				String conf = comments[0].trim();

				// 2. get key and value
				if (conf.length() > 0) {
					String[] kv = conf.split(": ", 2);

					// skip line with no valid key-value pair
					if (kv.length == 1) {
						LOG.warn("Error while trying to split key and value in configuration file " + file + ":" + lineNo + ": \"" + line + "\"");
						continue;
					}

					String key = kv[0].trim();
					String value = kv[1].trim();

					// sanity check
					if (key.length() == 0 || value.length() == 0) {
						LOG.warn("Error after splitting key and value in configuration file " + file + ":" + lineNo + ": \"" + line + "\"");
						continue;
					}

					LOG.info("Loading configuration property: {}, {}", key, isSensitive(key) ? HIDDEN_CONTENT : value);
					config.setString(key, value);
				}
			}
		} catch (IOException e) {
			throw new RuntimeException("Error parsing YAML configuration.", e);
		}

		// TODO_MA 注释: 返回 Configuration
		return config;
	}

	/**
	 * Check whether the key is a hidden key.
	 *
	 * @param key the config key
	 */
	public static boolean isSensitive(String key) {
		Preconditions.checkNotNull(key, "key is null");
		final String keyInLower = key.toLowerCase();
		for (String hideKey : SENSITIVE_KEYS) {
			if (keyInLower.length() >= hideKey.length()
				&& keyInLower.contains(hideKey)) {
				return true;
			}
		}
		return false;
	}
}

相关推荐

  1. flink分析 - flink命令启动分析

    2023-12-07 23:22:02       41 阅读
  2. Flink分析 | 读取HBase配置

    2023-12-07 23:22:02       75 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2023-12-07 23:22:02       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2023-12-07 23:22:02       100 阅读
  3. 在Django里面运行非项目文件

    2023-12-07 23:22:02       82 阅读
  4. Python语言-面向对象

    2023-12-07 23:22:02       91 阅读

热门阅读

  1. pcl-2 pcl结合opencv做svm分类(高程数据)

    2023-12-07 23:22:02       55 阅读
  2. jdbc4.MySQLSyntaxErrorException: Query was empty

    2023-12-07 23:22:02       56 阅读
  3. TCP通讯

    TCP通讯

    2023-12-07 23:22:02      53 阅读
  4. Mysql事务隔离级别及其底层原理

    2023-12-07 23:22:02       64 阅读
  5. chrome issue -- list

    2023-12-07 23:22:02       64 阅读
  6. 【android开发-13】android中RecycleView的详细用法介绍

    2023-12-07 23:22:02       49 阅读