diff --git a/src/oebuild/app/plugins/neo_generate/neo_generate.py b/src/oebuild/app/plugins/neo_generate/neo_generate.py new file mode 100644 index 0000000000000000000000000000000000000000..12a79050d4e846fa1cffe514e16986af47bd8071 --- /dev/null +++ b/src/oebuild/app/plugins/neo_generate/neo_generate.py @@ -0,0 +1,497 @@ +""" +Copyright (c) 2023 openEuler Embedded +oebuild is licensed under Mulan PSL v2. +You can use this software according to the terms and conditions of the Mulan PSL v2. +You may obtain a copy of Mulan PSL v2 at: + http://license.coscl.org.cn/MulanPSL2 +THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +See the Mulan PSL v2 for more details. +""" + +import argparse +import os +import pathlib +import sys +import textwrap +from shutil import rmtree + +from menuconfig_generator import NeoMenuconfigGenerator +from prettytable import HRuleStyle, PrettyTable, TableStyle, VRuleStyle +from ruamel.yaml.scalarstring import LiteralScalarString + +import oebuild.util as oebuild_util +from oebuild.app.plugins.generate.generate import get_docker_image +from oebuild.app.plugins.generate.parses import parsers +from oebuild.command import OebuildCommand +from oebuild.configure import Configure +from oebuild.m_log import logger +from oebuild.nightly_features import ( + FeatureResolutionError, + FeatureResolver, + NeoFeatureError, + NightlyFeatureRegistry, +) +from oebuild.parse_template import ( + BaseParseTemplate, + FeatureTemplate, + ParseTemplate, +) + + +class NeoGenerate(OebuildCommand): + """Neo-generate handles the refreshed feature workflow starting point.""" + + help_msg = 'neo generate command' + description = textwrap.dedent(""" + Neo-generate manages build preparation with nightly-features state + while keeping other generate options stable. + """) + + def __init__(self): + self.configure = Configure() + self.params = {} + self.yocto_dir = None + self.feature_registry = None + super().__init__('neo-generate', self.help_msg, self.description) + + def do_add_parser(self, parser_adder) -> argparse.ArgumentParser: + parser = self._parser(parser_adder, usage="""%(prog)s""") + parser = parsers(parser, include_features=True) + menu_group = parser.add_mutually_exclusive_group() + menu_group.add_argument( + '--menuconfig', + dest='menuconfig', + action='store_true', + help=""" + Launch an interactive menuconfig to pick nightly features. + """, + ) + menu_group.add_argument( + '--no-menuconfig', + dest='menuconfig', + action='store_false', + help='Skip the interactive menuconfig step and rely on explicit feature IDs.', + ) + parser.set_defaults(menuconfig=True) + return parser + + def do_run(self, args: argparse.ArgumentParser, unknown=[]): + if self.pre_parse_help(args, unknown): + sys.exit(0) + + parsed_args = args.parse_args(unknown) + + self._validate_environment() + + if parsed_args.list: + self.list_info() + return + + if parsed_args.menuconfig: + menu_selection = self._run_menuconfig(parsed_args) + parsed_args.platform = menu_selection.platform + parsed_args.features = menu_selection.features + + try: + resolution = self._resolve_features( + parsed_args.platform, parsed_args.features or [] + ) + except FeatureResolutionError as err: + logger.error(str(err)) + sys.exit(1) + + build_dir = self._init_build_dir(parsed_args) + if build_dir is None: + sys.exit(1) + + self.params = self._collect_params(parsed_args, build_dir) + self.params['resolved_features'] = resolution.features + self._log_summary(build_dir, parsed_args) + self._generate_compile_conf( + args=parsed_args, + build_dir=build_dir, + resolved_features=resolution.features, + ) + + def _validate_environment(self): + if not self.configure.is_oebuild_dir(): + logger.error('Your current directory had not finished init') + sys.exit(-1) + + yocto_dir = self.configure.source_yocto_dir() + self.yocto_dir = yocto_dir + if not self.check_support_oebuild(yocto_dir): + logger.error( + 'yocto-meta-openeuler does not contain valid oebuild metadata ' + 'Update .oebuild/config and re-run `oebuild update`.' + ) + sys.exit(-1) + + try: + nightly_dir = pathlib.Path(yocto_dir, '.oebuild', 'nightly-features') + self.feature_registry = NightlyFeatureRegistry(nightly_dir) + except NeoFeatureError as err: + logger.error(str(err)) + sys.exit(-1) + + def _run_menuconfig(self, args): + platform_dir = pathlib.Path(self.yocto_dir, '.oebuild', 'platform') + try: + generator = NeoMenuconfigGenerator( + registry=self.feature_registry, + platform_dir=platform_dir, + default_platform=args.platform, + ) + except ValueError as exc: + logger.error('Menuconfig setup failed: %s', exc) + sys.exit(-1) + try: + return generator.run_menuconfig() + except KeyboardInterrupt: + raise + except Exception as exc: + logger.error('Menuconfig failed: %s', exc) + sys.exit(-1) + + def _collect_params(self, args, build_dir): + return { + 'platform': args.platform, + 'build_dir': build_dir, + 'build_in': args.build_in, + 'directory': args.directory, + 'nativesdk_dir': args.nativesdk_dir or None, + 'toolchain_dir': args.toolchain_dir or None, + 'llvm_toolchain_dir': args.llvm_toolchain_dir or None, + 'sstate_mirrors': args.sstate_mirrors, + 'sstate_dir': args.sstate_dir, + 'tmp_dir': args.tmp_dir, + 'cache_src_dir': args.cache_src_dir, + 'datetime': args.datetime, + 'no_fetch': args.no_fetch, + 'no_layer': args.no_layer, + 'auto_build': args.auto_build, + 'nativesdk': args.nativesdk, + 'gcc': args.gcc, + 'gcc_name': args.gcc_name or [], + 'llvm': args.llvm, + 'llvm_lib': args.llvm_lib, + 'resolved_features': [], + } + + def _log_summary(self, build_dir, args): + summary = textwrap.dedent(f""" + neo-generate pre-flight completed. + Build directory: {build_dir} + Platform: {args.platform} + Build mode: {args.build_in} + Auto build: {'enabled' if args.auto_build else 'disabled'} + + Feature selection will follow nightly-features state resolution next. + """) + logger.info(summary) + + def _resolve_features(self, platform, requested): + resolver = FeatureResolver(self.feature_registry, platform) + return resolver.resolve(requested) + + def _generate_compile_conf( + self, args, build_dir, resolved_features + ): + parser_template = ParseTemplate(yocto_dir=self.yocto_dir) + yocto_oebuild_dir = pathlib.Path(self.yocto_dir, '.oebuild') + try: + self._add_platform_template( + args=args, + yocto_oebuild_dir=yocto_oebuild_dir, + parser_template=parser_template, + ) + except BaseParseTemplate as b_t: + logger.error(str(b_t)) + sys.exit(-1) + except ValueError as v_e: + logger.error(str(v_e)) + sys.exit(-1) + + self._add_nightly_features_template( + parser_template, resolved_features + ) + + compile_yaml_path = pathlib.Path(build_dir, 'compile.yaml') + if compile_yaml_path.exists(): + compile_yaml_path.unlink() + + docker_image = get_docker_image( + yocto_dir=self.configure.source_yocto_dir(), + docker_tag='', + configure=self.configure, + ) + + param = parser_template.get_default_generate_compile_conf_param() + param['nativesdk_dir'] = self.params.get('nativesdk_dir') + param['toolchain_dir'] = self.params.get('toolchain_dir') + param['llvm_toolchain_dir'] = self.params.get('llvm_toolchain_dir') + param['build_in'] = args.build_in + param['sstate_mirrors'] = self.params.get('sstate_mirrors') + param['sstate_dir'] = self.params.get('sstate_dir') + param['tmp_dir'] = self.params.get('tmp_dir') + param['datetime'] = args.datetime + param['no_fetch'] = self.params.get('no_fetch') + param['no_layer'] = self.params.get('no_layer') + param['docker_image'] = docker_image + param['src_dir'] = self.configure.source_dir() + param['compile_dir'] = build_dir + param['cache_src_dir'] = self.params.get('cache_src_dir') + oebuild_util.write_yaml( + compile_yaml_path, + parser_template.generate_compile_conf(param), + ) + + self._print_generate(build_dir) + + def _add_platform_template( + self, args, yocto_oebuild_dir, parser_template: ParseTemplate + ): + platform_path = pathlib.Path(yocto_oebuild_dir, 'platform') + platform_files = [f.name for f in platform_path.iterdir() if f.is_file()] + target_file = args.platform + '.yaml' + if target_file in platform_files: + try: + platform_file = platform_path / target_file + parser_template.add_template(platform_file) + except BaseParseTemplate as b_t: + raise b_t + else: + logger.error( + 'Invalid platform. Run `oebuild neo-generate -l` to list supported platforms.' + ) + sys.exit(-1) + + + def _add_nightly_features_template( + self, parser_template: ParseTemplate, resolved_features + ): + for feature in resolved_features: + local_conf = self._local_conf_from_lines( + feature.config.local_conf + ) + parser_template.feature_template.append( + FeatureTemplate( + feature_name=LiteralScalarString(feature.full_id), + repos=( + feature.config.repos if feature.config.repos else None + ), + layers=( + feature.config.layers + if feature.config.layers + else None + ), + local_conf=None + if local_conf is None + else LiteralScalarString(local_conf), + support=feature.machines or [], + ) + ) + + @staticmethod + def _local_conf_from_lines(lines): + if not lines: + return None + return '\n'.join(lines) + + def _init_build_dir(self, args): + build_dir_path = pathlib.Path(self.configure.build_dir()) + if not build_dir_path.exists(): + build_dir_path.mkdir(parents=True) + + if args.directory is None or args.directory == '': + build_dir = build_dir_path / args.platform + else: + build_dir = build_dir_path / args.directory + + if ( + not pathlib.Path(build_dir) + .absolute() + .is_relative_to(build_dir_path.absolute()) + ): + logger.error('Build path must be in oebuild workspace') + return None + + if build_dir.exists(): + logger.warning('the build directory %s already exists', build_dir) + while not args.yes: + in_res = input(f""" + Overwrite {build_dir.name}? This will replace generated assets and delete conf/ + Enter Y=yes, N=no, C=create : """) + if in_res not in [ + 'Y', + 'y', + 'yes', + 'N', + 'n', + 'no', + 'C', + 'c', + 'create', + ]: + print('Invalid input') + continue + if in_res in ['N', 'n', 'no']: + return None + if in_res in ['C', 'c', 'create']: + in_res = input( + 'Enter new build name (will be created under build/):' + ) + build_dir = build_dir_path / in_res + if build_dir.exists(): + continue + break + conf_dir = build_dir / 'conf' + if conf_dir.exists(): + rmtree(conf_dir) + elif build_dir.exists(): + rmtree(build_dir) + build_dir.mkdir(parents=True, exist_ok=True) + return str(build_dir) + + @staticmethod + def _get_terminal_width(): + try: + return os.get_terminal_size().columns + except OSError: + return 80 + + def _build_table(self, headers, terminal_width, title=None): + narrow_charnum, narrow_colnum = 60, 10 + max_width = max(int(terminal_width * 0.9), 20) + table = PrettyTable(headers, max_width=max_width) + table.align = 'l' + table.header = True + + col_width = max(10, max_width // max(len(headers), 1)) + for header in headers: + table.max_width[header] = col_width + + is_narrow = ( + terminal_width < narrow_charnum or col_width < narrow_colnum + ) + if is_narrow: + table.set_style(TableStyle.PLAIN_COLUMNS) + table.hrules = HRuleStyle.NONE + table.vrules = VRuleStyle.NONE + table.left_padding_width = 0 + table.right_padding_width = 0 + else: + table.set_style(TableStyle.SINGLE_BORDER) + table.hrules = HRuleStyle.FRAME + table.vrules = VRuleStyle.FRAME + table.left_padding_width = 1 + table.right_padding_width = 1 + if title: + table.title = title + return table + + def list_info(self): + self._list_platform() + self._list_feature() + + def _list_platform(self): + logger.info( + '\n================= Available Platforms =================' + ) + yocto_oebuild_dir = pathlib.Path( + self.configure.source_yocto_dir(), '.oebuild' + ) + platform_path = pathlib.Path(yocto_oebuild_dir, 'platform') + list_platform = [f for f in platform_path.iterdir() if f.is_file()] + terminal_width = self._get_terminal_width() + table = self._build_table( + ['Platform Name'], terminal_width, title='Available Platforms' + ) + for platform in list_platform: + if platform.suffix in ['.yml', '.yaml']: + table.add_row([platform.stem]) + table.sortby = 'Platform Name' + print(table) + + def _list_feature(self): + logger.info( + '\n================= Available Features ==================' + ) + + terminal_width = self._get_terminal_width() + + table = self._build_table( + ['Feature Name', 'Supported Arch'], + terminal_width, + title='Available Features', + ) + + def display_feature(feature, depth=0): + indent = ' ' * depth + + if depth == 0: + display_name = feature.full_id + else: + display_name = f"{indent}- {feature.full_id}" + + support = ( + 'all' + if not feature.machines + else ', '.join(feature.machines) + ) + + table.add_row([display_name, support]) + + features_by_category = {} + for feature in self.feature_registry.list_features(): + category = feature.category + if category not in features_by_category: + features_by_category[category] = [] + features_by_category[category].append(feature) + + # For each category, display features in hierarchical order + for category in sorted(features_by_category.keys()): + category_features = features_by_category[category] + + root_feature = None + other_features = [] + + for feature in category_features: + if feature.category == feature.leaf_id and not feature.parent_full_id: + root_feature = feature + else: + other_features.append(feature) + + if root_feature: + display_feature(root_feature, depth=0) + + for feature in sorted(other_features, key=lambda f: f.full_id): + display_feature(feature, depth=1) + else: + table.add_row([category, '']) + for feature in sorted(other_features, key=lambda f: f.full_id): + display_feature(feature, depth=1) + + print(table) + logger.info( + """* 'Supported Arch' defaults to 'all' if not specified in the feature's .yaml file.""" + ) + + def _print_generate(self, build_dir): + format_dir = f""" +generate compile.yaml successful + +Run commands below: +============================================= + +cd {build_dir} +oebuild bitbake + +============================================= +""" + logger.info(format_dir) + + def check_support_oebuild(self, yocto_dir): + return pathlib.Path(yocto_dir, '.oebuild').exists() diff --git a/src/oebuild/bb/utils.py b/src/oebuild/bb/utils.py index feef341a313346ee45fe295a827c401fc43c1e8b..d991f35a9923facdfe0365cfd09e57dcd06bf6a3 100644 --- a/src/oebuild/bb/utils.py +++ b/src/oebuild/bb/utils.py @@ -64,16 +64,16 @@ def approved_variables(): def _prepare_variable_regexes(variables, match_overrides): """Prepare regex patterns for variable matching.""" var_res = {} - override_re = r'(_[a-zA-Z0-9-_$(){}]+)?' if match_overrides else '' + override_re = r'(_[a-zA-Z0-9_$(){}-]+)?' if match_overrides else '' for var in variables: if var.endswith("()"): var_res[var] = re.compile( - fr"^(\{var[:-2].rstrip()}\{override_re})[ \t]*\([ \t]*\)[ \t]*{{" + fr"^(\{var[:-2].rstrip()}{override_re})[ \t]*\([ \t]*\)[ \t]*{{" ) else: var_res[var] = re.compile( - fr'^(\{var}\{override_re})[ \\\\\t]*[?+:.]*=[+.]*[ \\t]*(["\\])' + fr'^({var}{override_re})[ \\\\\t]*[?+:.]*=[+.]*[ \\t]*(["\\])' ) return var_res diff --git a/src/oebuild/nightly_features.py b/src/oebuild/nightly_features.py new file mode 100644 index 0000000000000000000000000000000000000000..a1136fe728f124915451a260a011197a3271f652 --- /dev/null +++ b/src/oebuild/nightly_features.py @@ -0,0 +1,644 @@ +""" +Helper classes for loading and resolving nightly-feature YAML declarations. + +The loader builds a global registry keyed by full IDs (/[/]), +and the resolver walks the dependency graph, enforces visibility rules, and +outputs the deterministic set to inject into ParseTemplate. +""" + +from __future__ import annotations + +import pathlib +from collections import defaultdict +from dataclasses import dataclass, field +from typing import Dict, Iterable, List, Optional, Set, Tuple + +import oebuild.util as oebuild_util + + +class NeoFeatureError(Exception): + """Base error for nightly feature parsing and resolution.""" # noqa: D401 + + +class FeatureResolutionError(NeoFeatureError): + """Raised when the resolver cannot satisfy a requested feature merge.""" # noqa: D401 + + +class FeatureConflictError(FeatureResolutionError): + """Raised when conflicting features are enabled simultaneously.""" + + +class FeatureNotFoundError(FeatureResolutionError): + """Raised when a requested feature identifier cannot be resolved.""" + + +class AmbiguousFeatureError(FeatureResolutionError): + """Raised when a feature identifier matches multiple candidates.""" + + +@dataclass +class FeatureConfig: + repos: List[str] = field(default_factory=list) + layers: List[str] = field(default_factory=list) + local_conf: List[str] = field(default_factory=list) + + +@dataclass +class Feature: + category: str + leaf_id: str + full_id: str + name: str + prompt: Optional[str] + machines: Optional[List[str]] + machine_set: Optional[Set[str]] + dependencies: List[str] + selects: List[str] + one_of: List[str] + default_one_of: Optional[str] + choice: List[str] + config: FeatureConfig + parent_full_id: Optional[str] + child_full_ids: List[str] = field(default_factory=list) + is_subfeature: bool = False + + def supports_machine(self, machine: str) -> bool: + normalized = machine.strip().lower() + if self.machine_set and normalized not in self.machine_set: + return False + return True + + +@dataclass +class ResolutionResult: + features: List[Feature] + + +class NightlyFeatureRegistry: + """Indexes features defined under .oebuild/nightly-features.""" + + def __init__(self, nightly_dir: pathlib.Path): + self.features_dir = pathlib.Path(nightly_dir) + if not self.features_dir.exists(): + raise NeoFeatureError( + f'Nightly feature directory not found: {self.features_dir}' + ) + self.features_by_full_id: Dict[str, Feature] = {} + self.leaf_index: Dict[str, List[Feature]] = defaultdict(list) + self.features_with_one_of: List[Feature] = [] + self.category_roots: Dict[str, Feature] = {} + self._load_features() + self._validate_references() + self._apply_machine_constraints() + self._compute_category_roots() + + # a demo logic, + # we need more effective approach to load features + def _load_features(self) -> None: + for category_dir in sorted(self.features_dir.iterdir()): + if not category_dir.is_dir(): + continue + category = category_dir.name.strip() + if not category: + continue + for feature_file in sorted(category_dir.iterdir()): + if not feature_file.is_file(): + continue + if feature_file.suffix not in ('.yaml', '.yml'): + continue + data = oebuild_util.read_yaml(feature_file) + if not isinstance(data, dict): + raise NeoFeatureError( + f'{feature_file} must contain at least one YAML mapping' + ) + self._parse_feature_file(category, data, feature_file) + + def _parse_feature_file( + self, + category: str, + data: dict, + origin: pathlib.Path, + ) -> None: + leaf_id = self._normalize_leaf(data.get('id')) + if not leaf_id: + raise NeoFeatureError( + f'{origin}: missing feature "id" field' + ) + if leaf_id == 'self': + raise NeoFeatureError( + f'{origin}: feature "id" may not be "self"' + ) + full_id = self._make_full_id(category, leaf_id) + config = self._parse_config(data.get('config')) + machines, machine_set = self._parse_machines(data.get('machines')) + feature = Feature( + category=category, + leaf_id=leaf_id, + full_id=full_id, + name=str(data.get('name') or leaf_id), + prompt=data.get('prompt'), + machines=machines, + machine_set=machine_set, + dependencies=self._normalize_reference_list( + data.get('dependencies'), full_id + ), + selects=self._normalize_reference_list( + data.get('selects'), full_id + ), + one_of=self._normalize_reference_list( + data.get('one_of'), full_id + ), + default_one_of=self._normalize_reference( + data.get('default_one_of'), full_id + ) + if data.get('default_one_of') + else None, + choice=self._normalize_reference_list( + data.get('choice'), full_id + ), + config=config, + parent_full_id=None, + is_subfeature=False, + ) + self._register_feature(feature) + sub_feats = data.get('sub_feats') or [] + if not isinstance(sub_feats, list): + raise NeoFeatureError( + f'{origin}: "sub_feats" must be a sequence' + ) + for sub in sub_feats: + if not isinstance(sub, dict): + raise NeoFeatureError( + f'{origin}: each entry of "sub_feats" must be a mapping' + ) + self._parse_sub_feature(feature, sub, origin) + + def _parse_sub_feature( + self, parent: Feature, data: dict, origin: pathlib.Path + ) -> None: + sub_id = self._normalize_leaf(data.get('id')) + if not sub_id: + raise NeoFeatureError( + f'{origin}: sub-feature is missing "id"' + ) + if sub_id == 'self': + raise NeoFeatureError( + f'{origin}: sub-feature "id" may not be "self"' + ) + # Apply syntax sugar for sub-features: if parent is a category root feature, + # sub-feature full_id should be parent.category/sub_id instead of parent.full_id/sub_id + if parent.category == parent.leaf_id: + full_id = f'{parent.category}/{sub_id}' + else: + full_id = f'{parent.full_id}/{sub_id}' + config = self._parse_config(data.get('config')) + machines, machine_set = self._parse_machines(data.get('machines')) + feature = Feature( + category=parent.category, + leaf_id=sub_id, + full_id=full_id, + name=str(data.get('name') or sub_id), + prompt=data.get('prompt'), + machines=machines, + machine_set=machine_set, + dependencies=self._normalize_reference_list( + data.get('dependencies'), full_id + ), + selects=self._normalize_reference_list( + data.get('selects'), full_id + ), + one_of=self._normalize_reference_list( + data.get('one_of'), full_id + ), + default_one_of=self._normalize_reference( + data.get('default_one_of'), full_id + ) + if data.get('default_one_of') + else None, + choice=self._normalize_reference_list( + data.get('choice'), full_id + ), + config=config, + parent_full_id=parent.full_id, + is_subfeature=True, + ) + self._register_feature(feature) + parent.child_full_ids.append(feature.full_id) + + def _parse_config(self, config_block: Optional[dict]) -> FeatureConfig: + if not isinstance(config_block, dict): + config_block = {} + return FeatureConfig( + repos=self._normalize_sequence(config_block.get('repos')), + layers=self._normalize_sequence(config_block.get('layers')), + local_conf=self._normalize_local_conf( + config_block.get('local_conf') + ), + ) + + def _parse_machines( + self, value + ) -> Tuple[Optional[List[str]], Optional[Set[str]]]: + if value is None: + return None, None + if isinstance(value, str): + normalized = [value.strip()] + elif isinstance(value, Iterable): + normalized = [ + str(item).strip() for item in value if item is not None + ] + else: + normalized = [str(value).strip()] + normalized = [m for m in normalized if m] + if not normalized: + return None, None + return normalized, {m.lower() for m in normalized} + + def _normalize_sequence(self, value) -> List[str]: + if value is None: + return [] + if isinstance(value, str): + return [value] + if isinstance(value, Iterable): + return [str(item) for item in value if item is not None] + return [str(value)] + + def _normalize_local_conf(self, value) -> List[str]: + if value is None: + return [] + if isinstance(value, str): + return [value] + if isinstance(value, Iterable): + return [str(item) for item in value if item is not None] + return [str(value)] + + def _normalize_reference_list( + self, entries, parent_full_id: str + ) -> List[str]: + if entries is None: + return [] + if isinstance(entries, str): + entries = [entries] + if not isinstance(entries, Iterable): + return [] + normalized = [] + for entry in entries: + if entry is None: + continue + normalized.append( + self._normalize_reference(entry, parent_full_id) + ) + return normalized + + def _normalize_reference( + self, entry, parent_full_id: str + ) -> str: + value = str(entry).strip() + if not value: + raise NeoFeatureError( + f'Empty reference found in {parent_full_id}' + ) + if value == 'self': + return parent_full_id + if value.startswith('self/'): + remainder = value[5:].strip() + if not remainder: + raise NeoFeatureError( + f'Invalid self reference "{value}" in {parent_full_id}' + ) + value = f'{parent_full_id}/{remainder}' + return self._normalize_identifier(value) + + def _register_feature(self, feature: Feature) -> None: + if feature.full_id in self.features_by_full_id: + raise NeoFeatureError( + f'Duplicate feature id detected: {feature.full_id}' + ) + self.features_by_full_id[feature.full_id] = feature + self.leaf_index[feature.leaf_id].append(feature) + if feature.one_of: + self.features_with_one_of.append(feature) + + def _make_full_id(self, category: str, leaf_id: str) -> str: + # Apply syntax sugar rule: when category equals leaf_id, + # the feature becomes the category root feature with full_id = category + if category == leaf_id: + return category + return f'{category}/{leaf_id}' + + def _normalize_leaf(self, value) -> str: + if value is None: + return '' + return str(value).strip().lower() + + def _normalize_identifier(self, value: str) -> str: + return value.strip().lower() + + def _validate_references(self) -> None: + for feature in self.features_by_full_id.values(): + feature.dependencies = self._canonicalize_reference_list( + feature, feature.dependencies + ) + feature.selects = self._canonicalize_reference_list( + feature, feature.selects + ) + feature.one_of = self._canonicalize_reference_list( + feature, feature.one_of + ) + feature.choice = self._canonicalize_reference_list( + feature, feature.choice + ) + if feature.default_one_of: + feature.default_one_of = self._canonicalize_reference( + feature, feature.default_one_of + ) + + def _canonicalize_reference_list( + self, feature: Feature, entries: List[str] + ) -> List[str]: + return [ + self._canonicalize_reference(feature, entry) for entry in entries + ] + + def _canonicalize_reference( + self, feature: Feature, entry: str + ) -> str: + if entry in self.features_by_full_id: + return entry + try: + resolved = self.resolve_identifier(entry) + except FeatureResolutionError as err: + raise NeoFeatureError( + f'{feature.full_id} references unknown feature {entry}' + ) from err + return resolved.full_id + + def _apply_machine_constraints(self) -> None: + cache: Dict[str, Optional[Set[str]]] = {} + + def compute(feature: Feature) -> Optional[Set[str]]: + if feature.full_id in cache: + return cache[feature.full_id] + candidate_sets: List[Optional[Set[str]]] = [] + if feature.machine_set is not None: + candidate_sets.append(set(feature.machine_set)) + if feature.parent_full_id: + parent_feature = self.features_by_full_id[feature.parent_full_id] + candidate_sets.append(compute(parent_feature)) + for dep_id in feature.dependencies: + dep_feature = self.features_by_full_id[dep_id] + candidate_sets.append(compute(dep_feature)) + result = self._intersect_machine_sets(candidate_sets) + cache[feature.full_id] = result + feature.machine_set = result + if result is None: + feature.machines = feature.machines if feature.machines else None + else: + feature.machines = sorted(result) + return result + + for feature_obj in list(self.features_by_full_id.values()): + compute(feature_obj) + + def _intersect_machine_sets( + self, sets: List[Optional[Set[str]]] + ) -> Optional[Set[str]]: + result: Optional[Set[str]] = None + for entry in sets: + if entry is None: + continue + if result is None: + result = set(entry) + else: + result &= entry + if result is not None and not result: + break + if result is None: + return None + return set(result) + + def _compute_category_roots(self) -> None: + for feature in self.features_by_full_id.values(): + if not feature.is_subfeature and feature.category == feature.leaf_id: + self.category_roots[feature.category] = feature + + def _match_leaf_candidates( + self, identifier: str, leaf_candidates: List[Feature] + ) -> Optional[Feature]: + if not leaf_candidates: + return None + if len(leaf_candidates) == 1: + return leaf_candidates[0] + top_level = [ + feat for feat in leaf_candidates if not feat.is_subfeature + ] + if len(top_level) == 1: + return top_level[0] + if top_level: + raise AmbiguousFeatureError( + f"Ambiguous feature ID: '{identifier}'. Candidates: " + + ', '.join(f.full_id for f in top_level) + ) + sub_candidates = [ + feat for feat in leaf_candidates if feat.is_subfeature + ] + if len(sub_candidates) == 1: + return sub_candidates[0] + if sub_candidates: + raise AmbiguousFeatureError( + f"Ambiguous feature ID: '{identifier}'. Candidates: " + + ', '.join(f.full_id for f in sub_candidates) + ) + return None + + def _resolve_category_alias(self, normalized: str) -> Optional[Feature]: + if not normalized: + return None + segments = normalized.split('/') + if not segments: + return None + root_id = segments[0] + root_feature = self.category_roots.get(root_id) + if not root_feature: + return None + if len(segments) == 1: + return root_feature + alias_full_id = '/'.join([root_id, root_id, *segments[1:]]) + return self.features_by_full_id.get(alias_full_id) + + def resolve_identifier(self, identifier: str) -> Feature: + normalized = self._normalize_identifier(identifier) + + if normalized in self.features_by_full_id: + return self.features_by_full_id[normalized] + + alias_feature = self._resolve_category_alias(normalized) + has_context = '/' in normalized + + if alias_feature and has_context: + return alias_feature + + leaf_key = normalized.split('/')[-1] + leaf_candidates = self.leaf_index.get(leaf_key, []) + leaf_match = self._match_leaf_candidates(identifier, leaf_candidates) + if leaf_match: + if alias_feature and not has_context and ( + alias_feature.full_id != leaf_match.full_id + ): + raise AmbiguousFeatureError( + f"Ambiguous feature ID: '{identifier}'. Candidates: " + f"{alias_feature.full_id}, {leaf_match.full_id}" + ) + return leaf_match + + if alias_feature: + return alias_feature + + raise FeatureNotFoundError( + f"Unknown feature '{identifier}'. Use --list to see available features." + ) + + def list_features(self) -> List[Feature]: + return sorted( + self.features_by_full_id.values(), key=lambda feat: feat.full_id + ) + + +class FeatureResolver: + """Resolves machine-aware dependency trees for nightly features.""" + + def __init__(self, registry: NightlyFeatureRegistry, machine: str): + self.registry = registry + self.machine = machine.strip() + self.enabled: Dict[str, Feature] = {} + self.enabled_order: List[str] = [] + self.explicit_features: Set[str] = set() + self._context_stack: List[tuple[Feature, str]] = [] + + def resolve(self, requested: Iterable[str]) -> ResolutionResult: + for identifier in requested or []: + feature = self.registry.resolve_identifier(identifier) + self._enable_feature(feature, source='user') + self._resolve_one_of_groups() + return ResolutionResult( + features=[self.enabled[full_id] for full_id in self.enabled_order] + ) + + def _enable_feature(self, feature: Feature, source: str) -> None: + if feature.full_id in self.enabled: + return + frame = (feature, source) + self._context_stack.append(frame) + try: + self._ensure_machine_support(feature) + self.enabled[feature.full_id] = feature + self.enabled_order.append(feature.full_id) + if source == 'user': + self.explicit_features.add(feature.full_id) + for dependency in feature.dependencies: + dep_feature = self.registry.features_by_full_id[dependency] + self._enable_feature(dep_feature, source='dependency') + for selection in feature.selects: + sel_feature = self.registry.features_by_full_id[selection] + self._enable_feature(sel_feature, source='select') + if feature.is_subfeature and feature.parent_full_id: + parent_feature = self.registry.features_by_full_id[ + feature.parent_full_id + ] + self._enable_feature(parent_feature, source='parent') + finally: + self._context_stack.pop() + + def _ensure_machine_support(self, feature: Feature) -> None: + current = feature + while current: + if current.machine_set and self.machine.lower() not in current.machine_set: + self._raise_machine_error(current) + if not current.parent_full_id: + break + current = self.registry.features_by_full_id[current.parent_full_id] + + def _raise_machine_error(self, feature: Feature) -> None: + trace_lines = [] + reason_labels = { + 'user': 'Requested', + 'dependency': 'Dependency', + 'select': 'Selected', + 'parent': 'Parent', + 'default': 'Default', + } + detail_labels = { + 'user': ' (User Input)', + 'select': ' (Auto Select)', + 'dependency': ' (Dependency)', + 'parent': ' (Parent Auto)', + 'default': ' (Default)', + } + for frame_feature, source in self._context_stack: + label = reason_labels.get(source, 'Activated') + detail = detail_labels.get(source, '') + trace_lines.append( + f' - {label}: {frame_feature.full_id}{detail}' + ) + machines = ( + f'[{", ".join(feature.machines)}]' + if feature.machines + else '[]' + ) + lines = [ + f"[Error] Feature '{feature.full_id}' is not supported on machine " + f"'{self.machine}'.", + 'Trace:', + *trace_lines, + f' - Constraint: {feature.leaf_id} requires machine {machines}', + ] + raise FeatureResolutionError('\n'.join(lines)) + + def _resolve_one_of_groups(self) -> None: + for feature in self.registry.features_with_one_of: + if feature.full_id not in self.enabled: + continue + if not feature.one_of: + continue + selected = [ + option for option in feature.one_of if option in self.enabled + ] + if len(selected) > 1: + self._raise_one_of_conflict(feature, selected) + if not selected and feature.default_one_of: + default_id = feature.default_one_of + if default_id not in self.enabled: + default_feature = self.registry.features_by_full_id[ + default_id + ] + self._enable_feature(default_feature, source='default') + for feature in self.registry.features_with_one_of: + if feature.full_id not in self.enabled: + continue + selected = [ + option for option in feature.one_of if option in self.enabled + ] + if len(selected) > 1: + self._raise_one_of_conflict(feature, selected) + + def _raise_one_of_conflict( + self, feature: Feature, selected: List[str] + ) -> None: + leaf_names = [ + self.registry.features_by_full_id[opt].leaf_id for opt in selected + ] + if len(leaf_names) == 2 and all( + opt in self.explicit_features for opt in selected + ): + detail = 'You requested both ' + else: + detail = 'These options ' + detail += ( + "'" + "' and '".join(leaf_names) + "'" + if leaf_names + else '' + ) + raise FeatureConflictError( + f"[Error] Conflict in feature '{feature.full_id}':\n" + f"{detail} cannot be enabled together (one_of)." + )