flink源码分析 - 命令行参数解析-CommandLineParser

flink版本: flink-1.11.2

调用位置:   

org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint#main

代码位置:

flink核心命令行解析器:

        org.apache.flink.runtime.entrypoint.parser.CommandLineParser 

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.runtime.entrypoint.parser;

import org.apache.flink.runtime.entrypoint.FlinkParseException;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;

import javax.annotation.Nonnull;

/**
 * Command line parser which produces a result from the given
 * command line arguments.
 */
public class CommandLineParser<T> {

	@Nonnull
	private final ParserResultFactory<T> parserResultFactory;

	public CommandLineParser(@Nonnull ParserResultFactory<T> parserResultFactory) {
		// TODO_MA 注释: parserResultFactory = EntrypointClusterConfigurationParserFactory
		this.parserResultFactory = parserResultFactory;
	}

	public T parse(@Nonnull String[] args) throws FlinkParseException {
		final DefaultParser parser = new DefaultParser();
		final Options options = parserResultFactory.getOptions();

		final CommandLine commandLine;
		try {

			/*************************************************
			 * TODO_MA 马中华 https://blog.csdn.net/zhongqi2513
			 *  注释: 解析参数
			 */
			commandLine = parser.parse(options, args, true);

		} catch (ParseException e) {
			throw new FlinkParseException("Failed to parse the command line arguments.", e);
		}

		// TODO_MA 注释: 创建 EntrypointClusterConfiguration 返回
		return parserResultFactory.createResult(commandLine);
	}

	public void printHelp(@Nonnull String cmdLineSyntax) {
		final HelpFormatter helpFormatter = new HelpFormatter();
		helpFormatter.setLeftPadding(5);
		helpFormatter.setWidth(80);
		helpFormatter.printHelp(cmdLineSyntax, parserResultFactory.getOptions(), true);
	}
}

        其中核心方法是构造方法及parse方法。

        构造方法主要用于从外界获取parserResultFactory变量,用于后期解析;

        parse(@Nonnull String[] args) 方法用于解析参数。 其中args即为从外部命令行传入的参数。

解析过程中用到的核心对象是 

final DefaultParser parser = new DefaultParser();

该对象来源于 Apache Common Cli包,具体用法参考(内部包含官方文档地址):

使用Apache commons-cli包进行命令行参数解析的示例代码-CSDN博客

核心解析步骤是:

  commandLine = parser.parse(options, args, true);  以及

return parserResultFactory.createResult(commandLine);。

其中parser对象进行参数解析。  parserResultFactory主要为parser对象提供需要解析的命令行选项options,及通过 parserResultFactory.createResult(commandLine) 从parser的解析结果commandLine中拿到命令行选项对应值,并构造出相应结果。

ParserResultFactory的接口定义:
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.runtime.entrypoint.parser;

import org.apache.flink.runtime.entrypoint.FlinkParseException;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;

import javax.annotation.Nonnull;

/**
 * Parser result factory used by the {@link CommandLineParser}.
 *
 * @param <T> type of the parsed result
 */
public interface ParserResultFactory<T> {

	/**
	 * Returns all relevant {@link Options} for parsing the command line
	 * arguments.
	 *
	 * @return Options to use for the parsing
	 */
	Options getOptions();

	/**
	 * Create the result of the command line argument parsing.
	 *
	 * @param commandLine to extract the options from
	 * @return Result of the parsing
	 * @throws FlinkParseException Thrown on failures while parsing command line arguments
	 */
	T createResult(@Nonnull CommandLine commandLine) throws FlinkParseException;
}

其下具体实现类如图所示:

截取两个具体实现供参考:

ClusterConfigurationParserFactory:
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.runtime.entrypoint;

import org.apache.flink.runtime.entrypoint.parser.ParserResultFactory;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;

import javax.annotation.Nonnull;

import java.util.Properties;

import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.CONFIG_DIR_OPTION;
import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.DYNAMIC_PROPERTY_OPTION;

/**
 * Parser factory which generates a {@link ClusterConfiguration} from the given
 * list of command line arguments.
 */
public class ClusterConfigurationParserFactory implements ParserResultFactory<ClusterConfiguration> {

	public static Options options() {
		final Options options = new Options();
		options.addOption(CONFIG_DIR_OPTION);
		options.addOption(DYNAMIC_PROPERTY_OPTION);

		return options;
	}

	@Override
	public Options getOptions() {
		return options();
	}

	@Override
	public ClusterConfiguration createResult(@Nonnull CommandLine commandLine) {
		final String configDir = commandLine.getOptionValue(CONFIG_DIR_OPTION.getOpt());

		final Properties dynamicProperties = commandLine.getOptionProperties(DYNAMIC_PROPERTY_OPTION.getOpt());

		return new ClusterConfiguration(configDir, dynamicProperties, commandLine.getArgs());
	}
}
EntrypointClusterConfigurationParserFactory:
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.runtime.entrypoint;

import org.apache.flink.runtime.entrypoint.parser.ParserResultFactory;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;

import javax.annotation.Nonnull;

import java.util.Properties;

import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.CONFIG_DIR_OPTION;
import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.DYNAMIC_PROPERTY_OPTION;
import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.EXECUTION_MODE_OPTION;
import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.HOST_OPTION;
import static org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.REST_PORT_OPTION;

/**
 * Parser factory for {@link EntrypointClusterConfiguration}.
 */
public class EntrypointClusterConfigurationParserFactory implements ParserResultFactory<EntrypointClusterConfiguration> {

	@Override
	public Options getOptions() {
		final Options options = new Options();
		options.addOption(CONFIG_DIR_OPTION);
		options.addOption(REST_PORT_OPTION);
		options.addOption(DYNAMIC_PROPERTY_OPTION);
		options.addOption(HOST_OPTION);
		options.addOption(EXECUTION_MODE_OPTION);

		return options;
	}

	@Override
	public EntrypointClusterConfiguration createResult(@Nonnull CommandLine commandLine) {

		// TODO_MA 注释: 解析 --configDir  -c
		final String configDir = commandLine.getOptionValue(CONFIG_DIR_OPTION.getOpt());

		// TODO_MA 注释: 解析程序的 -Dkey-value参数
		final Properties dynamicProperties = commandLine.getOptionProperties(DYNAMIC_PROPERTY_OPTION.getOpt());

		// TODO_MA 注释: 解析 --webui-port -r
		final String restPortStr = commandLine.getOptionValue(REST_PORT_OPTION.getOpt(), "-1");
		final int restPort = Integer.parseInt(restPortStr);

		// TODO_MA 注释: 解析 --host -h
		final String hostname = commandLine.getOptionValue(HOST_OPTION.getOpt());

		/*************************************************
		 * TODO_MA 马中华 https://blog.csdn.net/zhongqi2513
		 *  注释: 返回一个 EntrypointClusterConfiguration 对象
		 */
		return new EntrypointClusterConfiguration(
			configDir,
			dynamicProperties,
			commandLine.getArgs(),
			hostname,
			restPort);
	}
}

        

相关推荐

  1. flink分析 - 简单解析命令参数

    2023-12-07 07:28:06       32 阅读
  2. flink分析 - flink命令启动分析

    2023-12-07 07:28:06       25 阅读
  3. SpringMVC分析(八)--参数解析

    2023-12-07 07:28:06       20 阅读
  4. SpringMVC分析(六)--参数名称解析

    2023-12-07 07:28:06       19 阅读

最近更新

  1. TCP协议是安全的吗?

    2023-12-07 07:28:06       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2023-12-07 07:28:06       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2023-12-07 07:28:06       18 阅读
  4. 通过文章id递归查询所有评论(xml)

    2023-12-07 07:28:06       20 阅读

热门阅读

  1. 【前端设计模式】之单例模式

    2023-12-07 07:28:06       42 阅读
  2. 单例模式(Singleton Pattern)

    2023-12-07 07:28:06       43 阅读
  3. Golang实践录:读取toml配置

    2023-12-07 07:28:06       34 阅读
  4. ElasticSearch 了解文本相似度 TF-IDF吗?

    2023-12-07 07:28:06       30 阅读
  5. Go语言语法上的一些主要特点(1)

    2023-12-07 07:28:06       31 阅读
  6. 人工智能对现代生活的影响

    2023-12-07 07:28:06       33 阅读
  7. Apache Ofbiz XML-RPC RCE漏洞复现(CVE-2023-49070)

    2023-12-07 07:28:06       31 阅读
  8. 超越节点引擎临界:华为云NES颠覆游戏规则

    2023-12-07 07:28:06       34 阅读
  9. Linux网卡命名规则

    2023-12-07 07:28:06       32 阅读
  10. 继承与派生(1)

    2023-12-07 07:28:06       28 阅读
  11. MATLAB基础应用精讲-【数模应用】深度学习杂谈

    2023-12-07 07:28:06       38 阅读