Source code for src.equivalence_check

"""
Author: Leonardo de Sousa Marques
Affiliation: Embedded Computing Lab (ECL), Federal University of Santa Catarina (UFSC)

Description:
    This module checks MD5 hashes for encoded and decoded LFs among codecs.
"""

import json
import os
import sys
from pathlib import Path

from prettytable import PrettyTable

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from lfc_toolkit.src.configuration.configuration_reader import ConfigurationReader, read_config_from_argv


[docs] def get_rate_string(rate) -> str: """Convert a bitrate value to a formatted string representation. Removes trailing '.0' and leading '0.' from float representations for cleaner display in output tables. :param rate: Bitrate value to convert :type rate: float :return: Formatted string representation of the bitrate :rtype: str """ s = str(rate) if s.endswith(".0"): s = s[:-2] if s.startswith("0."): s = s[2:] return s
[docs] def md5_from_logs(codec_results: Path, lf: str, rate: str, op_type: str) -> str | None: """Extract MD5 hash from codec execution logs. Searches for the most recently modified execution log in the codec results directory and extracts the MD5 hash for a specific light field, bitrate, and operation type (encoded or decoded). :param codec_results: Path to codec results directory :type codec_results: Path :param lf: Light field name :type lf: str :param rate: Target bitrate value :type rate: str :param op_type: Operation type ('encoded' or 'decoded') :type op_type: str :return: MD5 hash string or None if not found :rtype: str | None """ log_candidates = list(codec_results.glob("execution_log*.json")) if not log_candidates: return None log_file = max(log_candidates, key=lambda p: p.stat().st_mtime) try: data = json.loads(log_file.read_text()) except Exception: return None if isinstance(data, list) and len(data) > 0: data = data[-1] if "results" not in data or lf not in data["results"]: return None lf_data = data["results"][lf] key = str(rate) if op_type == "encoded": encoder = lf_data.get("encoder", {}) if key in encoder: return encoder[key].get("md5_of_encoded") elif op_type == "decoded": decoder = lf_data.get("decoder", {}) if key in decoder: return decoder[key].get("decoded_md5") return None
[docs] def main(configuration: ConfigurationReader = None) -> None: """Check equivalence of MD5 hashes across multiple codec runs. Compares MD5 checksums for encoded and decoded light fields across multiple codec implementations according to configured equivalence rules. Generates comparison tables in multiple formats (LaTeX, HTML, JSON, text). :param configuration: Configuration reader instance (read from argv if None) :type configuration: ConfigurationReader :return: None :rtype: None """ if not configuration: configuration = read_config_from_argv() try: eq_obj = configuration["equivalence_check"] except KeyError: return results_dir = Path(eq_obj.get("equivalence_results", "results/equivalence_check")) rules = eq_obj.get("rules", []) table_formats = eq_obj.get("table_formats", ["latex", "json", "text"]) if not rules: print("No equivalence rules configured.") return results_dir.mkdir(exist_ok=True) lightfields = configuration.lightfield_names for rule in rules: op_types = rule["equivalence"] codecs = [c for c in rule["codecs"] if c in configuration["codecs-to-run"]] rule_name = rule.get("name", "rule") op_type_str = op_types[0] if isinstance(op_types, list) and op_types else op_types if isinstance(op_types, str) else "encoded" print(f"\nRule: {rule_name} | Type: {op_type_str}") print("Codecs:", codecs) rule_dir = results_dir / rule_name rule_dir.mkdir(exist_ok=True) for lf in lightfields: rates = configuration.lightfield_configurations[lf]["target-rates"] table = PrettyTable() table.field_names = ["Codec"] + [str(rate) for rate in rates] md5_matrix = {codec: [] for codec in codecs} match_row = [] for rate in rates: md5_by_codec = {} for codec in codecs: codec_conf = configuration["codecs"]["configuration"][codec] codec_results = Path(codec_conf["results"]) md5 = md5_from_logs(codec_results, lf, rate, op_type_str) md5_by_codec[codec] = md5 if md5 else "MISSING" md5_matrix[codec].append(md5 if md5 else "MISSING") hashes = [h for h in md5_by_codec.values() if h != "MISSING"] if len(hashes) < len(codecs): match_row.append("Incomplete") elif len(set(hashes)) == 1: match_row.append("Yes") else: match_row.append("No") for codec in codecs: table.add_row([codec] + md5_matrix[codec]) table.add_row(["Matched?"] + match_row) print(f"LF: {lf} | Type: {op_type_str}") print(table) for fmt in table_formats: fname_base = rule_dir / f"table_{lf}_{rule_name}_{op_type_str}" if fmt == "latex": with open(f"{fname_base}.tex", "w") as f: f.write(table.get_latex_string()) elif fmt == "html": with open(f"{fname_base}.html", "w") as f: f.write(table.get_html_string()) elif fmt == "json": table_dict = { "field_names": table.field_names, "rows": table._rows } import json with open(f"{fname_base}.json", "w") as f: json.dump(table_dict, f, indent=2) elif fmt == "text": with open(f"{fname_base}.txt", "w") as f: f.write(table.get_string())
if __name__ == "__main__": main()