diff --git a/omniadvisor/src/omniadvisor/interface/hijack_recommend.py b/omniadvisor/src/omniadvisor/interface/hijack_recommend.py index cd0ab825628942c5be72cc35ee57656c3bcbb968..dd8ecba2d25aa2ac1d021bad6354d3f15d62cac0 100644 --- a/omniadvisor/src/omniadvisor/interface/hijack_recommend.py +++ b/omniadvisor/src/omniadvisor/interface/hijack_recommend.py @@ -260,6 +260,8 @@ def hijack_recommend(argv: list) -> None: :param argv: Spark任务的执行命令 :return: """ + # 对spark入参中的driver_java_options参数进行预处理 + SparkCMDParser.pre_handler_driver_java_options(argv) # 非SUBMIT动作(指kill任务/查询状态/查询版本)的提交直接回退到原生spark-submit脚本执行 不被特性所劫持 SparkCMDParser.validate_submit_arguments(argv) diff --git a/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py b/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py index 0143aa4ddb87ab8a0dfb9f41f0239e9315853839..47162de1042d261fcb646192b71d26010da9b4cf 100644 --- a/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py +++ b/omniadvisor/src/omniadvisor/service/spark_service/spark_cmd_parser.py @@ -14,6 +14,10 @@ _CMD_PREFIX_KEY = 'omniadvisor-cmd-prefix' _CMD_PRIMARY_RESOURCE = 'primary_resource' # 命令行中class字段 _CMD_CLASS_KEY = 'class' +# spark-sql命令在spark-submit中对应的的类名 +_SPARK_SQL_CLASS = 'org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver' +# spark参数driver-java-options +_DRIVER_JAVA_OPTIONS = '--driver-java-options' # 单横杠参数的字段 _SINGLE_HORIZONTAL_BAR_KEYS = ['e', 'f', 'i'] # 布尔类型的字段 @@ -28,12 +32,18 @@ _SPARK_CONF_SUPPLEMENT_MAP = { } +class SilentArgumentParser(argparse.ArgumentParser): + def error(self, message): + # 不打印错误 把异常往上层抛出 + raise argparse.ArgumentError(None, message) + + class SparkCMDParser: """ Spark 引擎的解析器 """ # 当用户提交-h和--help时需要退回原生执行 因此关闭argparse自带的--help/-h参数解析 - _parser = argparse.ArgumentParser(add_help=False) + _parser = SilentArgumentParser(add_help=False) # 参数命中包含'-'的参数,设置dest属性,避免参数解析中'-'被转换为'_' # 解析Spark CMD开头,需脚本配合 @@ -88,8 +98,20 @@ class SparkCMDParser: _parser.add_argument('-f', type=str, help='File containing SQL script.') _parser.add_argument('-i', help='Initialization SQL file') _parser.add_argument('-d', '--define') - _parser.add_argument('--database') + _parser.add_argument('-database', '--database') + @classmethod + def pre_handler_driver_java_options(cls, argv): + """ + :param argv: 对入参列表中的argv中driver-java-options的参数值进行处理 使用双引号包裹方便后续argparse模块识别 + :return: + """ + global_logger.debug(f"Before pre_handler_driver_java_options %s", argv) + driver_java_options_indices = [i for i, arg in enumerate(argv) if arg == _DRIVER_JAVA_OPTIONS] + for idx in driver_java_options_indices: + if idx + 1 < len(argv): + argv[idx + 1] = f'\"{argv[idx + 1]}\"' + global_logger.debug(f"After pre_handler_driver_java_options执行后 %s", argv) @classmethod def validate_submit_arguments(cls, args: List) -> None: @@ -107,9 +129,12 @@ class SparkCMDParser: # 待检查参数列表 params, unknown = cls._parser.parse_known_args(args=args) primary_resource = unknown - + # 非spark-submit的提交类任务 if not primary_resource and vars(params)['class'] is None: - raise TypeError("This is not a SUBMIT type spark task.") + raise TypeError("This is not a spark-submit SUBMIT type spark task.") + # 检查是否为spark-sql提交的执行类任务 + if vars(params)['class'] == _SPARK_SQL_CLASS and vars(params)['e'] is None and vars(params)['f'] is None: + raise TypeError("This is not a spark-sql SUBMIT type spark task.") @staticmethod def _normalize_value(value: Any) -> Any: