"""
Author: Ismael Seidel
Affiliation: Embedded Computing Lab (ECL), Federal University of Santa Catarina (UFSC)
Contributors:
- André Filipe da Silva Fernandes
- Leonardo de Sousa Marques
Description:
Read and resolve toolkit configuration files, aliases and lightfield
configurations used across the toolkit.
This module provides the `ConfigurationReader` class which:
- Loads and merges common and user JSON configuration files;
- Resolves and deep-merges codec declarations (including inheritance
and inline overrides) and selects the codecs to run;
- Loads lightfield configuration files;
- Reads and merges quality metric declarations.
"""
import itertools
import json
import re
import sys
from pathlib import Path
from typing import Any, Dict, Iterator, List, Optional, Pattern, Tuple
from lfc_toolkit.src import LFC_TOOLKIT_PATH
CONTAINS_ALIAS = re.compile(r"\$\{[\w\d.-]*\}")
[docs]
class ConfigurationReader:
"""
Read and resolve toolkit configuration files, aliases and lightfield configurations.
Attributes:
aliases: Resolved alias map where CLI-provided aliases override file aliases.
configuration: Merged configuration dictionary.
lightfield_names: List of lightfield names to process.
lightfield_configurations: Mapping of lightfield name to its configuration dict.
"""
[docs]
def __init__(
self,
user_configuration_filename: str,
overriden_base_path: Optional[Path] = None,
aliases: Optional[Dict[str, str]] = None,
lightfields: Optional[List[str]] = None,
):
"""
Initialize the ConfigurationReader.
Args:
user_configuration_filename: Path to the user configuration JSON file.
overriden_base_path: Optional base path to override the default toolkit base path.
aliases: Optional aliases provided (e.g., from CLI). These take precedence over aliases defined in config files.
lightfields: Optional list of lightfields to process. These take precedence over lightfields defined in config files.
"""
base_path = LFC_TOOLKIT_PATH
if overriden_base_path:
base_path = overriden_base_path
common_configuration_filename = base_path / ".." / "data" / "common_configurations.json"
self.aliases = dict(aliases) if aliases else {}
self.aliases["base-path"] = str(base_path)
# Read common configurations using self.aliases
self.configuration = self.read_configurations(configuration_filename=common_configuration_filename)
initial_configuration_filenames = [
"codecs-configuration-file",
"quality-metrics-configuration-file",
]
# Load additional common configuration files if referenced
for configuration_filename in initial_configuration_filenames:
if configuration_filename in self.configuration:
self.configuration.update(
self.read_configurations(configuration_filename=Path(self.configuration[configuration_filename]))
)
# Load user configuration and merge
with open(user_configuration_filename, "r") as file:
self.configuration.update(json.load(file))
# Merge aliases from configuration file, but do not overwrite CLI-provided aliases.
aliases_from_configuration = self.configuration.get("aliases", {})
merged_aliases = dict(aliases_from_configuration)
merged_aliases.update(self.aliases) # CLI aliases overwrite file aliases
self.aliases = merged_aliases
# Recompile and replace aliases across the entire configuration
regex_aliases = _precompile_aliases(aliases=self.aliases)
_replace_aliases_recursive(data=self.configuration,
regex_aliases=regex_aliases)
# Prepare lightfield data
if lightfields:
self.lightfield_names = lightfields
else:
self.lightfield_names = list(self.get_lightfield_names())
self.lightfield_configurations = {}
for lightfield_name in self.lightfield_names:
self.lightfield_configurations[lightfield_name] = self.get_lightfield_configuration(name=lightfield_name)
# Configure codecs and quality metrics
self.configure_codecs()
self.read_all_quality_metrics_declarations()
[docs]
def read_configurations(self, configuration_filename: Path) -> Dict[str, Any]:
"""
Read a JSON configuration file and replace aliases using self.aliases.
Args:
configuration_filename: Path to JSON configuration file.
Returns:
Parsed configuration dictionary with aliases replaced.
"""
with open(configuration_filename, "r") as file:
configurations = json.load(file)
regex_aliases = _precompile_aliases(self.aliases)
_replace_aliases_recursive(data=configurations,
regex_aliases=regex_aliases)
return configurations
[docs]
def read_all_codec_declarations(self) -> None:
"""
Populate self.configuration['codec-declarations'] merging CTC declarations
and any custom or example codec files.
"""
self.configuration["codec-declarations"] = self.configuration.get("ctc-codecs-declaration", {}).copy()
sources = [
(self.configuration["codecs"].get("custom-filenames", []), "custom-codecs-declaration"),
(self.configuration["codecs"].get("example-filenames", []), "example-codec-result"),
]
for filenames, key in sources:
for filename in filenames:
decl = self.read_configurations(Path(filename)).get(key, {})
self.configuration["codec-declarations"].update(decl)
[docs]
def read_all_quality_metrics_declarations(self) -> None:
"""
Read and merge additional quality metric configuration files listed in configuration.
Uses self.aliases for alias replacement.
"""
custom_quality_filenames = self.configuration.get("other-quality-metrics-configuration-files", [])
for custom_quality_filename in custom_quality_filenames:
new_quality = self.read_configurations(configuration_filename=Path(custom_quality_filename)).get(
"quality", {}
)
for category, content in new_quality.items():
if category in self.configuration["quality"]:
self.configuration["quality"][category].update(content)
else:
self.configuration["quality"][category] = content.copy()
[docs]
def get_lightfield_names(self) -> Iterator[str]:
"""
Get all configured lightfield names from CTC and other lists.
Returns:
Iterator over lightfield names.
"""
lightfields = self.configuration["lightfields"]
lightfields_from_the_ctc = lightfields.get("ctc", [])
other_lightfields = lightfields.get("other", [])
lightfield_name_lists = [lightfields_from_the_ctc, other_lightfields]
return itertools.chain(*lightfield_name_lists)
[docs]
def get_target_bpps(self, name: str, category: Optional[str]) -> List[Any]:
"""
Return target bits-per-pixel rates for a given lightfield based on its category or explicit target-rates.
Args:
name: Lightfield name.
category: Category name (e.g., 'lenslets', 'synthetics', 'hdcas').
Returns:
List of target bpp values.
Raises:
Exception if category is invalid or not provided and no target-rates were defined.
"""
valid_ctc_categories = ["lenslets", "synthetics", "hdcas"]
if category in valid_ctc_categories:
key = "ctc-target-rates-for-" + category
with open(Path(self.configuration["ctc-target-rates-file"]), "r") as file:
ctc_target_rates: Dict[str, Any] = json.load(file)
target_rates = ctc_target_rates.get(key, [])
return target_rates
if category:
raise Exception("Invalid category " + category)
raise Exception(
"Target bpps not defined for lf " + name + ". You need to define either the target-rates or the category properties for this lightfield."
)
[docs]
def get_configuration_filename(self, name: str) -> Path:
"""
Determine and return the path to a lightfield's configuration file by searching configured paths.
Args:
name: Lightfield name.
Returns:
Path to the configuration file.
Raises:
Exception if no configuration file is found for the given lightfield.
"""
if name in self.configuration["lightfields"].get("ctc", []):
ctc_configuration_files_path = Path(self.configuration["ctc-lightfield-configuration-files"])
return ctc_configuration_files_path / (name + ".json")
if name in self.configuration["lightfields"].get("other", []):
other_configuration_file_paths = self.configuration.get("other-lightfield-configuration-files", [])
for path_name in other_configuration_file_paths:
configuration_filename = Path(path_name) / (name + ".json")
if configuration_filename.is_file():
return configuration_filename
search_paths: List[str] = []
if "ctc-lightfield-configuration-files" in self.configuration:
search_paths.append(self.configuration["ctc-lightfield-configuration-files"])
if "other-lightfield-configuration-files" in self.configuration:
search_paths.extend(self.configuration.get("other-lightfield-configuration-files", []))
for path_str in search_paths:
candidate = Path(path_str) / (name + ".json")
if candidate.is_file():
return candidate
raise Exception(f"Configuration file for lightfield '{name}' not found in listed paths.")
[docs]
def get_lightfield_configuration(self, name: str) -> Dict[str, Any]:
"""
Load a specific lightfield configuration file, filling in target-rates from CTC if missing.
Args:
name: Lightfield name.
Returns:
Loaded lightfield configuration dict.
Raises:
Exception if configuration file not found.
"""
cfg_file = self.get_configuration_filename(name)
if cfg_file.is_file():
with open(cfg_file, "r") as file:
lightfield_configuration: Dict[str, Any] = json.load(file)
if "target-rates" not in lightfield_configuration:
category = lightfield_configuration.get("category", None)
lightfield_configuration["target-rates"] = self.get_target_bpps(name=name, category=category)
return lightfield_configuration
raise Exception("Lightfield configuration file not found: " + str(cfg_file))
def __getitem__(self, item: str) -> Any:
return self.configuration[item]
def __iter__(self):
return iter(self.configuration.items())
[docs]
def get_dict(self) -> Dict[str, Any]:
"""
Return the internal configuration dictionary.
"""
return self.configuration
[docs]
def read_config_from_argv(overriden_base_path: Optional[Path] = None) -> ConfigurationReader:
"""
Helper to create a ConfigurationReader from sys.argv parameters.
Returns:
ConfigurationReader instance created from CLI args.
"""
import argparse
parser = argparse.ArgumentParser()
parser.add_argument(
"--configuration", "-c",
required=True,
help="Path to the JSON configuration file."
)
args = parser.parse_args()
configuration_path = args.configuration
print("Using the paths from", configuration_path)
return ConfigurationReader(configuration_path, overriden_base_path=overriden_base_path)
def _replace_aliases_recursive(data: Any, regex_aliases: Dict[Pattern, str]) -> Any:
"""
Recursively replace aliases in data structures (str, dict, list).
Args:
data: The data to process.
regex_aliases: Mapping of compiled regex patterns to replacement strings or lists.
Returns:
Data with aliases replaced.
"""
if isinstance(data, str) and CONTAINS_ALIAS.match(data):
new_string = data
for regex_alias, replacement in regex_aliases.items():
if not isinstance(replacement, str):
continue
new_string = regex_alias.sub(replacement, new_string)
return new_string
elif isinstance(data, dict):
for key, val in list(data.items()):
data[key] = _replace_aliases_recursive(val, regex_aliases)
elif isinstance(data, list):
new_data: List[Any] = []
for val in data:
has_replacement = False
if isinstance(val, str):
for regex_alias, replacement in regex_aliases.items():
if not isinstance(replacement, list):
continue
if regex_alias.fullmatch(val):
has_replacement = True
new_data.extend(replacement)
continue
if not has_replacement:
new_data.append(val)
data.clear()
data.extend(new_data)
for i, val in enumerate(data):
data[i] = _replace_aliases_recursive(val, regex_aliases)
return data
def _precompile_aliases(aliases: Dict[str, str]) -> Dict[Pattern, str]:
"""
Compile alias names into regex patterns for fast replacement.
Args:
aliases: Mapping from alias name to replacement string.
Returns:
Mapping from compiled regex Pattern to replacement string.
"""
regex_aliases: Dict[Pattern, str] = dict()
for key, val in aliases.items():
if not isinstance(key, str):
raise ValueError("The aliases names need to be a string")
if isinstance(val, str) and CONTAINS_ALIAS.match(val):
raise ValueError("The aliases values should not contain other aliases")
compiled_regex = re.compile(r"\$\{" + key + r"\}")
regex_aliases[compiled_regex] = val
return regex_aliases
[docs]
def deep_merge_dicts(dict1: Dict[str, Any], dict2: Dict[str, Any]) -> Dict[str, Any]:
"""
Deep merge dict2 into dict1, returning a new merged dictionary.
Nested dictionaries are merged recursively; other values are overwritten by dict2.
"""
merged_dict: Dict[str, Any] = dict1.copy()
for k, v in dict2.items():
if k in merged_dict and isinstance(merged_dict[k], dict) and isinstance(v, dict):
merged_dict[k] = deep_merge_dicts(merged_dict[k], v)
else:
merged_dict[k] = v
return merged_dict