Source code for boa.config.config

from __future__ import annotations

import copy
import pathlib
from dataclasses import asdict as dc_asdict
from dataclasses import is_dataclass
from enum import Enum
from typing import ClassVar, Optional, Union

import attr
import ruamel.yaml
import ruamel.yaml.comments
from attr import asdict
from attrs import Factory, converters, define, field, fields_dict
from ax.modelbridge.dispatch_utils import choose_generation_strategy
from ax.service.utils.instantiation import TParameterRepresentation
from ax.service.utils.scheduler_options import SchedulerOptions
from ax.storage.json_store.encoder import object_to_json
from ax.utils.common.base import Base as AxBase

from boa.config.converters import (
    _convert_noton_type,
    _gen_strat_converter,
    _metric_converter,
    _parameter_normalization,
    _scheduler_converter,
)
from boa.definitions import PathLike
from boa.utils import StrEnum, deprecation
from boa.wrappers.wrapper_utils import load_jsonlike

__all__ = [
    "BOAConfig",
    "BOAObjective",
    "BOAScriptOptions",
    "BOAMetric",
    "MetricType",
    # "SchedulerOptions",
    # "GenerationStep",
]


NL = "\n"


def strip_white_space(s: str, strip_all=True):
    if not s:
        return s
    if strip_all:
        return NL.join(ln.strip() for ln in s.splitlines())
    s_ls = s.splitlines()
    min_indent = min([len(ln) - len(ln.lstrip()) for ln in s_ls if ln])
    return NL.join(ln[min_indent:] for ln in s_ls)


# def strip_white_space(s: str):
#     return NL.join(ln.strip() for ln in s.splitlines())


@define
class _Utils:
    _filtered_dict_fields: ClassVar[list[str]] = None

    def to_dict(self) -> dict:
        def vs(inst, attrib, val):
            if is_dataclass(val):
                return dc_asdict(val)
            elif not attr.has(val) and hasattr(val, "to_dict"):
                return val.to_dict()
            elif isinstance(val, Enum):
                return val.value
            elif isinstance(val, pathlib.Path):
                return str(val)
            elif isinstance(val, AxBase):
                return object_to_json(val)
            return val

        def remove_type_recurse(d):
            if isinstance(d, dict):
                return {k: remove_type_recurse(v) for k, v in d.items() if k != "__type"}
            elif isinstance(d, list):
                return [remove_type_recurse(v) for v in d]
            return d

        # d = object_to_json(self)
        # if not add_type:
        #     return remove_type_recurse(d)
        # return d

        d = {
            **asdict(
                self,
                filter=lambda attr, value: True
                if not self._filtered_dict_fields
                else attr.name not in (self._filtered_dict_fields + ["_generating_default_doc_config"]),
                value_serializer=vs,
            ),
        }

        return remove_type_recurse(d)


[docs]class MetricType(StrEnum): METRIC = "metric" BOA_METRIC = "boa_metric" SKLEARN_METRIC = "sklearn_metric" SYNTHETIC_METRIC = "synthetic_metric" PASSTHROUGH = "pass_through"
[docs]@define(kw_only=True) class BOAMetric(_Utils): metric: Optional[str] = field( default=None, metadata={ "doc": """metrics to be used for optimization. You can use list any metric in built into BOA. Those metrics can be found here: :mod:`Metrics <boa.metrics.metrics>`. If no metric is specified, a :class:`pass through<.PassThrough>` metric will be used. Which means that the metric will be computed by the user and passed to BOA. You can also use any metric from sklearn by passing in the name of the metric and metric type as `sklearn_metric`. You can also use any metric from the Ax's or BoTorch's synthetic metrics modules by passing in the name of the metric and metric type as `synthetic_metric`.""" }, ) name: Optional[str] = field( default=None, converter=converters.optional(str), metadata={"doc": "Name of the metric. This is used to identify the metric in your wrapper script."}, ) metric_type: Optional[MetricType | str] = field( default=MetricType.METRIC, converter=converters.optional(MetricType.from_str_or_enum), metadata={ "doc": """Type of metric. In built BOA metrics are of type `metric`, by using `sklearn_metric` you can use any metric from sklearn.metrics module, by using `synthetic_metric` you can use any synthetic function from Ax's or BoTorch's synthetic metrics modules. You can also specify `pass_through` to use a metric that is computed by the user.""" }, ) noise_sd: Optional[float] = field( default=0, metadata={ "doc": """Standard deviation of the noise to be added to the metric. This is useful when you want to simulate noisy metrics. If None, interpret the function as noisy with unknown noise level. Defaults to `0` (noiseless).""" }, ) minimize: Optional[bool] = field( default=True, metadata={ "doc": """Whether to minimize or maximize the metric. Defaults to `True` (minimize) for a general metric, but every in built metric in BOA (Mean, RMSE, etc.) has its own default value.""" }, ) info_only: bool = field( default=False, metadata={ "doc": """Whether the metric is only used for information purposes only but will still be reported. This means that the metric will not be used for optimization. """ }, ) weight: Optional[float] = field( default=None, metadata={ "doc": """Weight of the metric. Used in scalarized optimization, which is combining multiple metrics into one metric. Scalarized optimization is a way to cheat a multi-objective optimization problem into a single objective optimization problem and significantly reduce the computational cost.""" }, ) properties: Optional[dict] = field( default=None, metadata={ "doc": """Arbitrary properties of the metric. This is used to pass additional information about the metric to your wrapper. You can pass whatever information you want to your wrapper and use it in your wrapper functions.""" }, ) def __init__(self, *args, lower_is_better: Optional[bool] = None, **kwargs): if lower_is_better is not None: if "minimize" in kwargs: raise TypeError("Specify either `lower_is_better` or `minimize` but not both") kwargs["minimize"] = lower_is_better self.__attrs_init__(*args, **kwargs) def __attrs_post_init__(self): if not self.metric and not self.name: raise TypeError("Must specify at least metric name or metric") if self.name is None: self.name = self.metric elif self.metric is None: self.metric_type = MetricType.PASSTHROUGH
[docs]@define class BOAObjective(_Utils): """ Your objective to be optimized by BOA. This can be a single objective, scalarized objective, or a multi-objective (pareto objective). For a single objective, list a single metric in the metrics field. For a multi-objective, list multiple metrics in the metrics field. For a scalarized objective, list multiple metrics in the metrics field and specify the weights for each metric in each metrics weight field. """ metrics: list[BOAMetric] = field( converter=_metric_converter, metadata={"doc": "A list of BOAMetric objects that represent the metrics to be used in the objective."}, ) outcome_constraints: Optional[list[str]] = field( factory=list, metadata={ "doc": """String representation of outcome constraint of metrics. This bounds a metric (or linear combination of metrics) by some bound (>= or <=). (ex. ['metric1 >= 0.0', 'metric2 <= 1.0', '2*metric1 + .5*metric2 <= 1.0'])""" }, ) objective_thresholds: Optional[list[str]] = field( factory=list, metadata={ "doc": """String representation of Objective Thresholds for multi-objective optimization. An objective threshold represents the threshold for an objective metric to contribute to hypervolume calculations. A list containing the objective threshold for each metric collectively form a reference point. Because the objective thresholds are used to calculate hypervolume, they can only be used for multi-objective optimization. (ex. ['metric1 >= 0.0', 'metric2 <= 1.0'])""" }, ) minimize: Optional[bool] = field( default=None, metadata={ "doc": """A boolean that indicates whether the scalarized objective should be minimized or maximized. Only used for scalarized objectives because each metric can have its own minimize flag. Will be ignored for non scalarized objectives.""" }, )
[docs]@define class BOAScriptOptions(_Utils): rel_to_config: Optional[bool] = field( default=True, metadata={ "doc": """ Whether to use the config file as the base path for all relative paths. If True, all relative paths will be relative to the config file directory. Defaults to True if not specified. If launched through BOA CLI, this will be set to True automatically. rel_to_config and rel_to_launch cannot both be specified.""" }, ) rel_to_launch: Optional[bool] = field( default=None, metadata={ "doc": """ Whether to use the CLI launch directory as the base path for all relative paths. If True, all relative paths will be relative to the CLI launch directory. Defaults to `rel_to_config` argument if not specified. rel_to_config and rel_to_launch cannot both be specified.""" }, ) wrapper_name: str = field( default="Wrapper", metadata={ "doc": """Name of the python wrapper class. Used for python interface only. Defaults to `Wrapper` if not specified.""" }, ) wrapper_path: str = field( default="wrapper.py", metadata={ "doc": """Path to the python wrapper file. Used for python interface only. Defaults to `wrapper.py` if not specified. """ }, ) working_dir: str = field( default=".", metadata={ "doc": """Path to the working directory. Defaults to `.` (Current working directory) if not specified. """ }, ) experiment_dir: Optional[PathLike] = field( default=None, metadata={ "doc": """Path to the directory for the output of the experiment You may specify this or output_dir in your configuration file instead.""" }, ) output_dir: Optional[PathLike] = field( default=None, metadata={ "doc": """Output directory of project, If you specify output_dir, then output will be saved in output_dir / experiment_name Because of this only either experiment_dir or output_dir may be specified. (if neither experiment_dir nor output_dir are specified, output_dir defaults to whatever pwd returns (and equivalent on windows))""" }, ) exp_name: Optional[str] = field( default="boa_runs", metadata={ "doc": """name of the experiment. Used with output_dir to create the experiment directory if experiment_dir is not specified.""" }, ) append_timestamp: bool = field( default=True, metadata={ "doc": """Whether to append a timestamp to the output directory to ensure uniqueness. Defaults to `True` if not specified. """ }, ) run_model: Optional[str] = field( default=None, metadata={ "doc": """Shell command to run the model. Used for the language-agnostic interface only. this is what BOA will do to launch your script. it will also pass as a command line argument the current trial directory that is be parameterized by BOA. `run_model` is the only needed shell command of these 4, because you can use it also to write your config, run your model, set your trial status, and fetch your trial data all in one script if you so choose. The other scripts are provided as a convenience to segment out your logic. This can either be a relative path or absolute path. """ }, ) write_configs: Optional[str] = field( default=None, metadata={"doc": "Shell command to write your configs out. See `run_model` for more details. "}, ) set_trial_status: Optional[str] = field( default=None, metadata={"doc": "Shell command to set your trial status. See `run_model` for more details. "}, ) fetch_trial_data: Optional[str] = field( default=None, metadata={"doc": "Shell command to fetch your trial data. See `run_model` for more details. "}, ) base_path: Optional[PathLike] = field( default=".", ) def __init__(self, **config): rel_to_config = config.get("rel_to_config", None) rel_to_launch = config.get("rel_to_launch", None) if rel_to_config and rel_to_launch: raise TypeError("Must specify exactly one of rel_to_here or rel_to_config") if rel_to_launch: config["rel_to_config"] = False if rel_to_config: config["rel_to_launch"] = False self.__attrs_init__(**config) def __attrs_post_init__(self): if not self.base_path: if self.rel_to_config: raise TypeError( "Must specify path to config directory in `BOAScriptOptions` dataclass when using rel-to-config" " as the `base_path` argument." " If you are an external user, the easiest way to ensure this, is to use the Config.from_jsonlike" " class constructor, with either `rel_to_config` set to true in the config file or rel_to_config" " set to true as an argument (to override what is in the config). This will automatically set " " the `base_path` argument for `BOAScriptOptions`." ) self.base_path = pathlib.Path.cwd() self.wrapper_path = self._make_path_absolute(self.base_path, self.wrapper_path) self.working_dir = self._make_path_absolute(self.base_path, self.working_dir) self.experiment_dir = self._make_path_absolute(self.base_path, self.experiment_dir) @staticmethod def _make_path_absolute(base_path, path): if not path: return path path = pathlib.Path(path) if not path.is_absolute(): path = base_path / path return path.resolve()
[docs]@define(kw_only=True) class BOAConfig(_Utils): """Base doc string""" objective: dict | BOAObjective = field( converter=_convert_noton_type(lambda d: BOAObjective(**d), type_=BOAObjective), metadata={ "default_doc": dict(metrics=[dict(name="metric1", metric="RMSE")]), "doc_strip_all": False, "doc": BOAObjective.__doc__, }, ) parameters: dict[str, dict] | list[TParameterRepresentation] = field( converter=_parameter_normalization, metadata={ "default_doc": { "x1": {"type": "range", "bounds": [0, 1], "value_type": "float"}, "x2": {"type": "choice", "values": ["a", "b", "c"]}, "x3": 4.0, }, "doc_strip_all": False, "doc": """ Parameters to optimize over. This can be expressed in two ways. The first is a list of dictionaries, where each dictionary represents a parameter. The second is a dictionary of dictionaries, where the key is the name of the parameter and the value is the dictionary representing the parameter. .. code-block:: yaml ## Dictionary of dictionaries x1: type: range bounds: [0, 1] value_type: float x2: type: range bounds: [0.0, 1.0] # value_type is inferred from bounds .. code-block:: yaml ## List of dictionaries - name: x1 type: range bounds: [0, 1] value_type: float .. code-block:: yaml ## Fixed Types x3: 4.0 # Fixed type, value is 4.0 x4: type: fixed value: "some string" # Fixed type, value is "some string" ## Choice Options x5: type: choice values: ["a", "b"] """, # noqa: W291 }, ) generation_strategy: Optional[dict] = field( factory=dict, converter=_gen_strat_converter, metadata={ "doc_strip_all": False, "doc": f""" Your generation strategy is how new trials will be generated, that is, what acquisition function will be used to select the next trial, what kernel will be used to model the objective function, as well as other options such as max parallelism. This is an optional section. If not specified, Ax will choose a generation strategy for you. Based on your objective, parameters, and other options. You can pass options to how Ax chooses a generation strategy by passing options under `generation_strategy`. Taken from Ax's documentation: {strip_white_space(choose_generation_strategy.__doc__, strip_all=False)} See https://ax.dev/tutorials/generation_strategy.html and https://ax.dev/api/modelbridge.html#ax.modelbridge.dispatch_utils.choose_generation_strategy For specific options. If you want to specify your own generation strategy, you can do so by passing a list of steps under `generation_strategy.steps` .. code-block:: yaml generation_strategy: # Use Ax's SAASBO algorithm, which is particularly well suited for high dimensional problems use_saasbo: true max_parallelism_cap: 10 # Maximum number of trials allowed to run in parallel Other options are possible, see https://ax.dev/tutorials/generation_strategy.html#1A.-Manually-configured-generation-strategy and Models from ax.modelbridge.registry.py for more options Some options include SOBOL, GPEI, Thompson, GPKG (knowledge gradient), and others. See https://ax.dev/api/modelbridge.html#ax.modelbridge.generation_node.GenerationStep For specific options you can pass to each step .. code-block:: yaml generation_strategy: steps: - model: SOBOL num_trials: 20 - model: GPEI # Gaussian Process with Expected Improvement num_trials: -1 max_parallelism: 10 # Maximum number of trials allowed to run in parallel """, # noqa: W291 }, ) scheduler: Optional[dict | SchedulerOptions] = field( default=None, converter=_convert_noton_type(_scheduler_converter, type_=SchedulerOptions, default_if_none=SchedulerOptions), metadata={ "default_doc": dict(n_trials=100), "doc_strip_all": False, "doc": SchedulerOptions.__doc__ + ( """ n_trials: Only run this many trials, in contrast to `total_trials` which is a hard limit, even after reloading the scheduler, this will run n_trials trials every time you reload the scheduler. Making it easier to use when reloading the scheduler and continuing to run trials. """ ), }, ) parameter_constraints: list[str] = Factory(list) model_options: Optional[dict | list] = None script_options: Optional[dict | BOAScriptOptions] = field( default=None, converter=_convert_noton_type( lambda d: BOAScriptOptions(**d), type_=BOAScriptOptions, default_if_none=BOAScriptOptions ), ) parameter_keys: Optional[str | list[Union[str, list[str], list[Union[str, int]]]]] = None config_path: Optional[PathLike] = None n_trials: Optional[int] = None mapping: Optional[dict[str, str]] = field(init=False) # we don't use this key for eq checks because with serialize and deserialize, it then gets all # default options as well orig_config: dict = field(init=False, eq=False) _filtered_dict_fields: ClassVar[str] = ["mapping", "orig_config"] def __init__(self, parameter_keys=None, **config): scheduler = config.get("scheduler", {}) n_trials = config.get("n_trials", None) if isinstance(scheduler, dict): n_trials = scheduler.pop("n_trials", n_trials) # n_trials is not a valid scheduler option so we pop it total_trials = scheduler.get("total_trials", None) else: total_trials = scheduler.total_trials if total_trials and n_trials: raise TypeError("You can specify either n_trials or total_trials, but not both") if not total_trials and not n_trials: raise TypeError("You must specify either n_trials or total_trials") if n_trials: config["n_trials"] = n_trials if "scheduler" in config: if isinstance(config["scheduler"], dict): config["scheduler"].pop("total_trials", None) elif isinstance(config["scheduler"], SchedulerOptions): if config["scheduler"].total_trials is not None: d = dc_asdict(config["scheduler"]) d.pop("total_trials", None) config["scheduler"] = SchedulerOptions(**d) else: raise TypeError(f"Scheduler must be a dict or SchedulerOptions, but is {type(config['scheduler'])}") self.orig_config = copy.deepcopy(config) # we instantiate it as None since all defined attributes from above need to exist self.mapping = None if parameter_keys: parameters, mapping = self.wpr_params_to_boa(config, parameter_keys) config["parameters"] = parameters self.mapping = mapping self.__attrs_init__(**config, parameter_keys=parameter_keys)
[docs] @classmethod def from_jsonlike(cls, file, rel_to_config: Optional[bool] = None): config_path = pathlib.Path(file).resolve() config = load_jsonlike(config_path) config = cls.convert_deprecated(configd=config) script_options = config.get("script_options") or {} rel_to_config = rel_to_config or script_options.get( "rel_to_config", fields_dict(BOAScriptOptions)["rel_to_config"].default ) if rel_to_config: if not config.get("script_options"): config["script_options"] = {} # we set rel_to_config to True in case it was passed in to override config config["script_options"]["rel_to_config"] = True config["script_options"]["rel_to_launch"] = False config["script_options"]["base_path"] = config_path.parent return cls(**config, config_path=file)
# @classmethod # def generate_default_config(cls): # ...
[docs] @classmethod def convert_deprecated(cls, configd: dict) -> dict: if "optimization_options" not in configd: return configd deprecation( "Config format is deprecated, consult documentation for current format to write your configuration file.", "1.0", ) config = {} opt_ops = configd["optimization_options"] ##################################################### # copy over config things that didn't change for key in ["parameters", "parameter_constraints", "model_options", "script_options", "parameter_keys"]: if key in configd: config[key] = configd[key] ##################################################### # copy objective over and then process slight changes config["objective"] = opt_ops["objective_options"] # rename old objectives key to metrics key config["objective"]["metrics"] = config["objective"].pop("objectives") # convert old way of doing metric types to new way for metric in config["objective"]["metrics"]: for member in MetricType: if member.value in metric and member.value != "metric": metric_type = member.value metric["metric"] = metric.pop(metric_type) metric["metric_type"] = metric_type # if weights key is present, then they need to be added to ind metrics if "weights" in config["objective"]: if len(config["objective"]["weights"]) != len(config["objective"]["metrics"]): raise ValueError( "Weights need to be the same length as objectives! You must have 1 weight for each objective" ) for metric, weight in zip(config["objective"]["metrics"], config["objective"]["weights"]): metric["weight"] = weight # delete leftover weights key del config["objective"]["weights"] ##################################################### # copy over generation strategy if "generation_strategy" in opt_ops: config["generation_strategy"] = opt_ops["generation_strategy"] ##################################################### # copy over scheduler if "scheduler" in opt_ops: config["scheduler"] = opt_ops["scheduler"] if "trials" in opt_ops or "n_trials" in opt_ops: if "scheduler" not in config: config["scheduler"] = {} config["scheduler"]["n_trials"] = opt_ops.get("n_trials") or opt_ops.get("trials") ##################################################### # copy over experiment name if "name" in opt_ops.get("experiment", {}): if not config.get("script_options"): config["script_options"] = {} config["script_options"]["exp_name"] = opt_ops["experiment"]["name"] return config
[docs] @classmethod def from_deprecated(cls, configd: dict): return cls(**cls.convert_deprecated(configd=configd))
@property def trials(self): return self.n_trials or self.scheduler.total_trials
[docs] @staticmethod def wpr_params_to_boa( params: dict, parameter_keys: str | list[Union[str, list[str], list[Union[str, int]]]] ) -> tuple[dict, dict]: """ Parameters ---------- params dictionary containing parameters parameter_keys str of key to parameters, or list of json paths to key(s) of parameters. """ # if only one key is passed in as a str, wrap it in a list if isinstance(parameter_keys, str): parameter_keys = [parameter_keys] new_params = {} mapping = {} for maybe_key in parameter_keys: path_type = [] if isinstance(maybe_key, str): key = maybe_key d = params[key] elif isinstance(maybe_key, (list, tuple)): d = params[maybe_key[0]] if len(maybe_key) > 1: for k in maybe_key[1:]: if isinstance(d, dict): path_type.append("dict") else: path_type.append("list") d = d[k] path_type.append("dict") # the last key is always a dict to the param info key = "_".join(str(k) for k in maybe_key) else: raise TypeError( "wpr_params_to_boa accepts str, a list of str, or a list of lists of str " "\nfor the keys (or paths of keys) to the AX parameters you wish to prepend." ) for parameter_name, dct in d.items(): new_key = f"{key}_{parameter_name}" key_index = 0 while new_key in new_params: new_key += f"_{key_index}" if new_key in new_params: key_index += 1 new_key = new_key[:-2] new_params[new_key] = dct mapping[new_key] = dict(path=maybe_key, original_name=parameter_name, path_type=path_type) for mapped, info in mapping.items(): # remove all old keys params.pop(info["path"][0], None) return new_params, mapping
[docs] @staticmethod def boa_params_to_wpr(params: list[dict], mapping, from_trial=True): new_params = {} for parameter in params: if from_trial: name = parameter else: name = parameter["name"] path = mapping[name]["path"] original_name = mapping[name]["original_name"] path_type = mapping[name]["path_type"] p1 = path[0] pt1 = path_type[0] if path[0] not in new_params: if pt1 == "dict": new_params[p1] = {} else: new_params[p1] = [] d = new_params[p1] if len(path) > 1: for key, typ in zip(path[1:], path_type[1:]): if (isinstance(d, list) and key + 1 > len(d)) or (isinstance(d, dict) and key not in d): if isinstance(d, list): d.extend([None for _ in range(key + 1 - len(d))]) if typ == "dict": d[key] = {} else: d[key] = [] d = d[key] if from_trial: d[original_name] = params[parameter] else: d[original_name] = {k: v for k, v in parameter.items() if k != "name"} return new_params
def generate_default_doc_config(): config = BOAConfig( **{ k: copy.deepcopy(v.metadata.get("default_doc")) for k, v in fields_dict(BOAConfig).items() if v.metadata.get("default_doc") } ) d = config.to_dict() def set_metadata_default_doc_recurse(d: dict, config): for name, attribute in d.items(): if attr.has(config) and ( (fields := fields_dict(type(config))) and fields[name].metadata.get("default_doc") ): if isinstance(d[name], dict): dd = copy.deepcopy(fields[name].metadata["default_doc"]) for k, v in d[name].items(): if k in dd: d[name][k] = dd[k] else: d[name][k] = "..." elif isinstance(d[name], list): d[name] = fields[name].metadata["default_doc"] else: if isinstance(d[name], dict): d[name] = set_metadata_default_doc_recurse(d[name], getattr(config, name)) elif isinstance(d[name], list): for i, item in enumerate(d[name]): if isinstance(item, dict): d[name][i] = set_metadata_default_doc_recurse(item, getattr(config, name)) else: d[name] = "..." return d d = set_metadata_default_doc_recurse(d, config) return d, config def add_comment_recurse(d: ruamel.yaml.comments.CommentedMap, config=None, where="before", depth=0, indent=2): fields = fields_dict(type(config)) if attr.has(config) else {} if isinstance(d, dict): for key in d: if key in fields: doc = strip_white_space( fields[key].metadata.get("doc", ""), strip_all=fields[key].metadata.get("doc_strip_all", True) ) if depth == 0: intro = "\n##" + "#" * len(key) + "\n" + key + " #\n" + "#" * len(key) + "##\n" doc = intro + doc if doc else intro + "\n" if doc: if where == "end" and len(doc.splitlines()) == 1: d.yaml_add_eol_comment(comment=doc, key=key) elif where == "before" or where == "end": d.yaml_set_comment_before_after_key(key=key, before=doc, indent=indent * depth) else: raise ValueError("where must be 'before' or 'end'") d[key] = add_comment_recurse(d[key], getattr(config, key), depth=depth + 1, indent=indent) elif isinstance(d, list): for (i, item), new_config in zip(enumerate(d), config): if isinstance(item, dict): d[i] = add_comment_recurse(item, new_config, where="end", depth=depth + 1, indent=indent) return d if __name__ == "__main__": from tests.conftest import TEST_CONFIG_DIR c = BOAConfig.from_jsonlike(pathlib.Path(TEST_CONFIG_DIR / "test_config_generic.yaml"))