Source code for src.configuration.configuration_reader

"""
Author: Ismael Seidel
Affiliation: Embedded Computing Lab (ECL), Federal University of Santa Catarina (UFSC)
Contributors:
    - André Filipe da Silva Fernandes
    - Leonardo de Sousa Marques

Description:
    Read and resolve toolkit configuration files, aliases and lightfield
    configurations used across the toolkit.

    This module provides the `ConfigurationReader` class which:
    - Loads and merges common and user JSON configuration files;
    - Resolves and deep-merges codec declarations (including inheritance
      and inline overrides) and selects the codecs to run;
    - Loads lightfield configuration files;
    - Reads and merges quality metric declarations.

"""

import itertools
import json
import re
import sys
from pathlib import Path
from typing import Any, Dict, Iterator, List, Optional, Pattern, Tuple

from lfc_toolkit.src import LFC_TOOLKIT_PATH

CONTAINS_ALIAS = re.compile(r"\$\{[\w\d.-]*\}")


[docs] class ConfigurationReader: """ Read and resolve toolkit configuration files, aliases and lightfield configurations. Attributes: aliases: Resolved alias map where CLI-provided aliases override file aliases. configuration: Merged configuration dictionary. lightfield_names: List of lightfield names to process. lightfield_configurations: Mapping of lightfield name to its configuration dict. """
[docs] def __init__( self, user_configuration_filename: str, overriden_base_path: Optional[Path] = None, aliases: Optional[Dict[str, str]] = None, lightfields: Optional[List[str]] = None, ): """ Initialize the ConfigurationReader. Args: user_configuration_filename: Path to the user configuration JSON file. overriden_base_path: Optional base path to override the default toolkit base path. aliases: Optional aliases provided (e.g., from CLI). These take precedence over aliases defined in config files. lightfields: Optional list of lightfields to process. These take precedence over lightfields defined in config files. """ base_path = LFC_TOOLKIT_PATH if overriden_base_path: base_path = overriden_base_path common_configuration_filename = base_path / ".." / "data" / "common_configurations.json" self.aliases = dict(aliases) if aliases else {} self.aliases["base-path"] = str(base_path) # Read common configurations using self.aliases self.configuration = self.read_configurations(configuration_filename=common_configuration_filename) initial_configuration_filenames = [ "codecs-configuration-file", "quality-metrics-configuration-file", ] # Load additional common configuration files if referenced for configuration_filename in initial_configuration_filenames: if configuration_filename in self.configuration: self.configuration.update( self.read_configurations(configuration_filename=Path(self.configuration[configuration_filename])) ) # Load user configuration and merge with open(user_configuration_filename, "r") as file: self.configuration.update(json.load(file)) # Merge aliases from configuration file, but do not overwrite CLI-provided aliases. aliases_from_configuration = self.configuration.get("aliases", {}) merged_aliases = dict(aliases_from_configuration) merged_aliases.update(self.aliases) # CLI aliases overwrite file aliases self.aliases = merged_aliases # Recompile and replace aliases across the entire configuration regex_aliases = _precompile_aliases(aliases=self.aliases) _replace_aliases_recursive(data=self.configuration, regex_aliases=regex_aliases) # Prepare lightfield data if lightfields: self.lightfield_names = lightfields else: self.lightfield_names = list(self.get_lightfield_names()) self.lightfield_configurations = {} for lightfield_name in self.lightfield_names: self.lightfield_configurations[lightfield_name] = self.get_lightfield_configuration(name=lightfield_name) # Configure codecs and quality metrics self.configure_codecs() self.read_all_quality_metrics_declarations()
[docs] def read_configurations(self, configuration_filename: Path) -> Dict[str, Any]: """ Read a JSON configuration file and replace aliases using self.aliases. Args: configuration_filename: Path to JSON configuration file. Returns: Parsed configuration dictionary with aliases replaced. """ with open(configuration_filename, "r") as file: configurations = json.load(file) regex_aliases = _precompile_aliases(self.aliases) _replace_aliases_recursive(data=configurations, regex_aliases=regex_aliases) return configurations
[docs] def read_all_codec_declarations(self) -> None: """ Populate self.configuration['codec-declarations'] merging CTC declarations and any custom or example codec files. """ self.configuration["codec-declarations"] = self.configuration.get("ctc-codecs-declaration", {}).copy() sources = [ (self.configuration["codecs"].get("custom-filenames", []), "custom-codecs-declaration"), (self.configuration["codecs"].get("example-filenames", []), "example-codec-result"), ] for filenames, key in sources: for filename in filenames: decl = self.read_configurations(Path(filename)).get(key, {}) self.configuration["codec-declarations"].update(decl)
[docs] def read_all_quality_metrics_declarations(self) -> None: """ Read and merge additional quality metric configuration files listed in configuration. Uses self.aliases for alias replacement. """ custom_quality_filenames = self.configuration.get("other-quality-metrics-configuration-files", []) for custom_quality_filename in custom_quality_filenames: new_quality = self.read_configurations(configuration_filename=Path(custom_quality_filename)).get( "quality", {} ) for category, content in new_quality.items(): if category in self.configuration["quality"]: self.configuration["quality"][category].update(content) else: self.configuration["quality"][category] = content.copy()
[docs] def configure_codecs(self) -> None: """ Resolve codec declarations, apply inline codec configurations, handle inheritance and select codecs to run. Uses self.aliases for reading any codec declaration files. """ codecs_to_run: Dict[str, Any] = dict() sample_codecs_to_run: Dict[str, Any] = dict() codecs_list = self.configuration["codecs"].get("run", list()) sample_codecs_list = self.configuration["codecs"].get("examples", list()) if len(codecs_list) == 0 and len(sample_codecs_list) == 0: print("There is no codec listed for running. Please specify in your configuration file.") exit(0) if len(codecs_list) > 0: print("Codecs:", codecs_list) if len(sample_codecs_list) > 0: print("Sample codecs:", sample_codecs_list) # Read all explicit codec declarations from files self.read_all_codec_declarations() # Merge file declarations with inline configurations, preserving unspecified properties with a deep merge. all_codec_definitions = self.configuration["codec-declarations"].copy() inline_codec_configs = self.configuration["codecs"].get("configuration", {}) for codec_name, inline_config in inline_codec_configs.items(): if codec_name in all_codec_definitions: all_codec_definitions[codec_name] = deep_merge_dicts(all_codec_definitions[codec_name], inline_config) else: all_codec_definitions[codec_name] = inline_config # Resolve inheritance dependencies resolved_codecs: Dict[str, Any] = {} unresolved_codecs = all_codec_definitions.copy() while unresolved_codecs: initial_unresolved_count = len(unresolved_codecs) newly_resolved: Dict[str, None] = {} for codec_name, config in list(unresolved_codecs.items()): if "inherit" in config: parent_name = config["inherit"] if parent_name in resolved_codecs: merged_config = deep_merge_dicts( resolved_codecs[parent_name], {k: v for k, v in config.items() if k != "inherit"} ) resolved_codecs[codec_name] = merged_config newly_resolved[codec_name] = None elif parent_name not in all_codec_definitions: raise ValueError(f"Parent codec '{parent_name}' not found for '{codec_name}'") else: resolved_codecs[codec_name] = config newly_resolved[codec_name] = None for codec_name in newly_resolved: unresolved_codecs.pop(codec_name, None) if len(unresolved_codecs) == initial_unresolved_count and initial_unresolved_count > 0: raise ValueError( f"Circular inheritance dependency or missing parent codec detected among: {list(unresolved_codecs.keys())}" ) self.configuration["resolved_codecs"] = resolved_codecs # Select only the codecs specified in `codecs.run` for codec_name in codecs_list: if codec_name not in resolved_codecs: raise KeyError( f"Codec '{codec_name}' listed in 'codecs.run' was not found or fully resolved in any declaration or configuration block." ) codecs_to_run[codec_name] = resolved_codecs[codec_name] # Select only the codecs specified in `codecs.example` for sample_codec in sample_codecs_list: sample_codecs_to_run[sample_codec] = resolved_codecs[sample_codec] self.configuration["codecs-to-run"] = codecs_to_run self.configuration["sample-codecs-to-run"] = sample_codecs_to_run
[docs] def get_lightfield_names(self) -> Iterator[str]: """ Get all configured lightfield names from CTC and other lists. Returns: Iterator over lightfield names. """ lightfields = self.configuration["lightfields"] lightfields_from_the_ctc = lightfields.get("ctc", []) other_lightfields = lightfields.get("other", []) lightfield_name_lists = [lightfields_from_the_ctc, other_lightfields] return itertools.chain(*lightfield_name_lists)
[docs] def get_target_bpps(self, name: str, category: Optional[str]) -> List[Any]: """ Return target bits-per-pixel rates for a given lightfield based on its category or explicit target-rates. Args: name: Lightfield name. category: Category name (e.g., 'lenslets', 'synthetics', 'hdcas'). Returns: List of target bpp values. Raises: Exception if category is invalid or not provided and no target-rates were defined. """ valid_ctc_categories = ["lenslets", "synthetics", "hdcas"] if category in valid_ctc_categories: key = "ctc-target-rates-for-" + category with open(Path(self.configuration["ctc-target-rates-file"]), "r") as file: ctc_target_rates: Dict[str, Any] = json.load(file) target_rates = ctc_target_rates.get(key, []) return target_rates if category: raise Exception("Invalid category " + category) raise Exception( "Target bpps not defined for lf " + name + ". You need to define either the target-rates or the category properties for this lightfield." )
[docs] def get_configuration_filename(self, name: str) -> Path: """ Determine and return the path to a lightfield's configuration file by searching configured paths. Args: name: Lightfield name. Returns: Path to the configuration file. Raises: Exception if no configuration file is found for the given lightfield. """ if name in self.configuration["lightfields"].get("ctc", []): ctc_configuration_files_path = Path(self.configuration["ctc-lightfield-configuration-files"]) return ctc_configuration_files_path / (name + ".json") if name in self.configuration["lightfields"].get("other", []): other_configuration_file_paths = self.configuration.get("other-lightfield-configuration-files", []) for path_name in other_configuration_file_paths: configuration_filename = Path(path_name) / (name + ".json") if configuration_filename.is_file(): return configuration_filename search_paths: List[str] = [] if "ctc-lightfield-configuration-files" in self.configuration: search_paths.append(self.configuration["ctc-lightfield-configuration-files"]) if "other-lightfield-configuration-files" in self.configuration: search_paths.extend(self.configuration.get("other-lightfield-configuration-files", [])) for path_str in search_paths: candidate = Path(path_str) / (name + ".json") if candidate.is_file(): return candidate raise Exception(f"Configuration file for lightfield '{name}' not found in listed paths.")
[docs] def get_lightfield_configuration(self, name: str) -> Dict[str, Any]: """ Load a specific lightfield configuration file, filling in target-rates from CTC if missing. Args: name: Lightfield name. Returns: Loaded lightfield configuration dict. Raises: Exception if configuration file not found. """ cfg_file = self.get_configuration_filename(name) if cfg_file.is_file(): with open(cfg_file, "r") as file: lightfield_configuration: Dict[str, Any] = json.load(file) if "target-rates" not in lightfield_configuration: category = lightfield_configuration.get("category", None) lightfield_configuration["target-rates"] = self.get_target_bpps(name=name, category=category) return lightfield_configuration raise Exception("Lightfield configuration file not found: " + str(cfg_file))
[docs] def get_used_quality_unit_and_tool(self, metric: str) -> Tuple[str, str]: """ Determine the unit and tool used for a given metric. Args: metric: Metric name. Returns: Tuple (unit, used_tool). """ unit = "dB" # defaults to dB used_tool = "vmaf_tool" # defaults to vmaf_tool all_metrics = self.configuration["quality"]["metrics"] all_bd_adjusted_metrics = self.configuration["quality"]["bd-adjusted-metrics"] all_derived_metrics = self.configuration["quality"]["weighted-metrics"] if metric in all_metrics: unit = all_metrics[metric].get("unit", unit) used_tool = all_metrics[metric].get("quality-wrapper", used_tool) elif metric in all_bd_adjusted_metrics: bd_metric_info = all_bd_adjusted_metrics[metric] unit = bd_metric_info.get("unit", unit) origin_metric = bd_metric_info.get("origin") if origin_metric not in all_metrics: raise Exception(f"Origin metric '{origin_metric}' not found in metrics.") used_tool = all_metrics[origin_metric].get("quality-wrapper", used_tool) elif metric in all_derived_metrics: unit = all_derived_metrics[metric].get("unit", unit) metrics_list = all_derived_metrics[metric].get("metrics", list()) if len(metrics_list) == 0: metrics_list = all_derived_metrics[metric].get("weights", dict()).keys() if len(metrics_list) == 0: raise Exception("Invalid metrics configuration. No metric or weights listed") used_tool_for_each_base_metric = [all_metrics[m].get("quality-wrapper", used_tool) for m in metrics_list] used_tool_for_each_base_metric = list(set(used_tool_for_each_base_metric)) if len(used_tool_for_each_base_metric) > 1: raise Exception("Currently we can only compute weighted metrics if they are computed using the same tool") if len(used_tool_for_each_base_metric) == 0: raise Exception("No used tool was detected for weighted metrics. Please check your configuration file") used_tool = used_tool_for_each_base_metric[0] else: raise Exception(f"Unknown metric {metric}.") return unit, used_tool
def __getitem__(self, item: str) -> Any: return self.configuration[item] def __iter__(self): return iter(self.configuration.items())
[docs] def get_dict(self) -> Dict[str, Any]: """ Return the internal configuration dictionary. """ return self.configuration
[docs] def read_config_from_argv(overriden_base_path: Optional[Path] = None) -> ConfigurationReader: """ Helper to create a ConfigurationReader from sys.argv parameters. Returns: ConfigurationReader instance created from CLI args. """ import argparse parser = argparse.ArgumentParser() parser.add_argument( "--configuration", "-c", required=True, help="Path to the JSON configuration file." ) args = parser.parse_args() configuration_path = args.configuration print("Using the paths from", configuration_path) return ConfigurationReader(configuration_path, overriden_base_path=overriden_base_path)
def _replace_aliases_recursive(data: Any, regex_aliases: Dict[Pattern, str]) -> Any: """ Recursively replace aliases in data structures (str, dict, list). Args: data: The data to process. regex_aliases: Mapping of compiled regex patterns to replacement strings or lists. Returns: Data with aliases replaced. """ if isinstance(data, str) and CONTAINS_ALIAS.match(data): new_string = data for regex_alias, replacement in regex_aliases.items(): if not isinstance(replacement, str): continue new_string = regex_alias.sub(replacement, new_string) return new_string elif isinstance(data, dict): for key, val in list(data.items()): data[key] = _replace_aliases_recursive(val, regex_aliases) elif isinstance(data, list): new_data: List[Any] = [] for val in data: has_replacement = False if isinstance(val, str): for regex_alias, replacement in regex_aliases.items(): if not isinstance(replacement, list): continue if regex_alias.fullmatch(val): has_replacement = True new_data.extend(replacement) continue if not has_replacement: new_data.append(val) data.clear() data.extend(new_data) for i, val in enumerate(data): data[i] = _replace_aliases_recursive(val, regex_aliases) return data def _precompile_aliases(aliases: Dict[str, str]) -> Dict[Pattern, str]: """ Compile alias names into regex patterns for fast replacement. Args: aliases: Mapping from alias name to replacement string. Returns: Mapping from compiled regex Pattern to replacement string. """ regex_aliases: Dict[Pattern, str] = dict() for key, val in aliases.items(): if not isinstance(key, str): raise ValueError("The aliases names need to be a string") if isinstance(val, str) and CONTAINS_ALIAS.match(val): raise ValueError("The aliases values should not contain other aliases") compiled_regex = re.compile(r"\$\{" + key + r"\}") regex_aliases[compiled_regex] = val return regex_aliases
[docs] def deep_merge_dicts(dict1: Dict[str, Any], dict2: Dict[str, Any]) -> Dict[str, Any]: """ Deep merge dict2 into dict1, returning a new merged dictionary. Nested dictionaries are merged recursively; other values are overwritten by dict2. """ merged_dict: Dict[str, Any] = dict1.copy() for k, v in dict2.items(): if k in merged_dict and isinstance(merged_dict[k], dict) and isinstance(v, dict): merged_dict[k] = deep_merge_dicts(merged_dict[k], v) else: merged_dict[k] = v return merged_dict