Source code for src.bundle.rd_plot_from_bundle

"""
Author: Leonardo de Sousa Marques
Affiliation: Embedded Computing Lab (ECL), Federal University of Santa Catarina (UFSC)

Description:
    This module decodes and generates RD plots from a bundle of encoded Light Field files.

Observation:
     It is crucial to have the repository cloned beforehand in the correct path so that
     the encoded file can be decoded.
     The bundle follows a hierarchical structure: bundle -> codec name -> lf name -> encoded files.
     RD report files follow the naming convention: {lf_name}_{codec_name}_rd_reports.json
"""

import json
import shutil
import sys
from pathlib import Path
from typing import Dict, List, Optional, Tuple

import numpy as np

from lfc_toolkit.src.codecwrappers.codec_wrapper import CodecWrapper
from lfc_toolkit.src.compute_quality_metrics import get_wrapper_instances
from lfc_toolkit.src.configuration.configuration_reader import (
    ConfigurationReader,
    read_config_from_argv,
)
from lfc_toolkit.src.converters.lightfield_converter import LightfieldConverter
from lfc_toolkit.src.ctc.lightfield_factory import LightFieldFactory
from lfc_toolkit.src.data_handlers.lightfield import (
    EncodedLightField,
    RAWLightFieldData,
)
from lfc_toolkit.src.quality.compound_metrics import (
    CompoundMetrics,
    resolve_bd_adjusted_metric,
)
from lfc_toolkit.src.quality.rd_curve import RDCurve
from lfc_toolkit.src.quality.rd_plot import RDCurveMatplotlibView, RDPlotMatplotlib
from lfc_toolkit.src.run_codecs import (
    clone_and_build_repo,
    create_codec_wrapper_instance,
)
from lfc_toolkit.src.yuv_and_pgx_from_ppm import convert_lightfield_to_yuv_and_pgx
from lfc_toolkit.src.data_handlers.formatters import get_formatted_filename_for_rd_report


[docs] def get_codec_wrapper( codec_name: str, configuration: ConfigurationReader, codec_configuration: Dict, ) -> CodecWrapper: """Instantiate a codec wrapper, cloning and building the repository if needed. :param codec_name: Name of the codec :param configuration: Configuration reader :param codec_configuration: Codec-specific configuration dictionary :return: Instantiated codec wrapper """ repo = clone_and_build_repo( codec_configuration=codec_configuration, global_configuration=configuration, ) return create_codec_wrapper_instance( codec=codec_name, codec_configuration=codec_configuration, repository_obj=repo, )
[docs] def get_codec_configuration( configuration: ConfigurationReader, codec_name: str ) -> Dict: """Get codec configuration with inheritance resolution. :param configuration: Configuration reader instance :param codec_name: Name of the codec :return: Codec configuration dictionary :raises ValueError: If codec not found in configuration """ if "resolved_codecs" in configuration.configuration: all_codecs = configuration["resolved_codecs"] if codec_name in all_codecs: return all_codecs[codec_name] print( f"Warning: Fetching raw configuration for {codec_name}. Inheritance might be missing." ) codecs_cfg = configuration["codecs"]["configuration"] if codec_name not in codecs_cfg: raise ValueError( f"Codec '{codec_name}' found in bundle but not defined under codecs.configuration" ) return codecs_cfg[codec_name]
def _read_json_report(report_path: Optional[str]) -> Optional[Dict]: """Read a JSON report file, returning None on any failure. :param report_path: Path to the JSON report file :return: Parsed report data or None """ if not report_path or not Path(report_path).exists(): return None try: with open(report_path, "r") as f: return json.load(f) except Exception as e: print(f"Error reading report {report_path}: {e}") return None def _iter_codec_folders(bundle_root: Path, allowed_codecs: List[str]): """Yield (codec_name, codec_folder) filtered by allowed_codecs (empty = all). :param bundle_root: Root directory containing per-codec folders :param allowed_codecs: Codec names to include; empty list means all """ for codec_folder in sorted(bundle_root.iterdir()): if not codec_folder.is_dir(): continue codec_name = codec_folder.name if allowed_codecs and codec_name not in allowed_codecs: print(f"Skipping '{codec_name}' (not in codecs list: {allowed_codecs})") continue yield codec_name, codec_folder
[docs] def compute_metrics( original_lf: RAWLightFieldData, decoded_lf: RAWLightFieldData, wrapper_instances: Dict, ) -> Dict[str, Dict]: """Compute quality metrics between original and decoded light fields. :param original_lf: Reference light field :param decoded_lf: Decoded light field :param wrapper_instances: Pre-instantiated quality tool wrappers :return: Quality metric reports keyed by tool short-name """ reports: Dict[str, Dict] = {} for short_name, quality_wrapper in wrapper_instances.items(): print(f"Computing metrics with {short_name}...") report_path = quality_wrapper.get_metrics( original_lightfield=original_lf, decoded_lightfield=decoded_lf, remove_converted_lfs_after_using=True, ) report = _read_json_report(report_path) if report is None: print(f"Warning: no report generated by {short_name}") else: reports[short_name] = report print(f"Done ({short_name})") return reports
def _extract_metric_from_report_data( bpp: float, report_data: Dict, metric: str, configuration: ConfigurationReader, ) -> Optional[float]: """Extract metric value from quality report data. :param bpp: Bitrate in bits per pixel :param report_data: Quality report dictionary with frame metrics :param metric: Quality metric name :param configuration: Configuration reader with metric definitions :return: Extracted metric value or None if not available """ if "frames" not in report_data or not report_data["frames"]: return None if CompoundMetrics.is_bd_adjusted_metric(metric, configuration): result = resolve_bd_adjusted_metric( bpp=bpp, frame_metrics=report_data["frames"], distortion_name=metric, configuration=configuration, ) return result.mean if hasattr(result, "mean") else None list_frames = CompoundMetrics.get_list_frames( all_frame_metrics=report_data["frames"], distortion_name=metric, configuration=configuration, ) return np.mean(list_frames) if list_frames else None def _get_metrics_from_config(configuration: ConfigurationReader) -> List[str]: """Collect all metrics declared across all rd_plots configurations. :param configuration: Configuration reader :return: Sorted list of unique metric names """ metrics: set = set() for rc in configuration["rd_plots"]: metrics.update(rc.get("metrics", [])) return sorted(metrics)
[docs] def compute_metrics_from_encoded_and_decoded( codecs: List[str], configuration: ConfigurationReader, encoded_files_dir: Path, results_path: Path, ) -> Dict[str, Dict[str, List[Tuple[float, Dict[str, Dict]]]]]: """For each codec: decode all encoded files across all light fields, then compute quality metrics per light field and generate RD plots and JSON reports immediately after each light field is fully processed. :param codecs: Codec names to process; empty list means all found :param configuration: Configuration reader :param encoded_files_dir: Root directory containing per-codec encoded files :param results_path: Root directory where plots and reports will be saved :return: Data organised as {codec: {lf_name: [(bpp, reports_by_wrapper)]}} """ all_data: Dict[str, Dict[str, List[Tuple[float, Dict[str, Dict]]]]] = {} for codec_name, codec_folder in _iter_codec_folders(encoded_files_dir, codecs): print(f"Codec: {codec_name}") codec_cfg = get_codec_configuration(configuration, codec_name) raw_type = codec_cfg.get("raw_type", "pgx") encoded_extension = codec_cfg.get("encoded_extension", "jpl") codec_wrapper = get_codec_wrapper( codec_name=codec_name, configuration=configuration, codec_configuration=codec_cfg, ) wrapper_instances = get_wrapper_instances( configuration, codec_cfg.get("results") ) for lf_folder in sorted(codec_folder.iterdir()): if not lf_folder.is_dir(): continue lf_name = lf_folder.name print(f"Lightfield: {lf_name}") if lf_name not in configuration.lightfield_names: configuration.lightfield_configurations[lf_name] = ( configuration.get_lightfield_configuration(lf_name) ) raw_lf = LightFieldFactory.get_raw_lightfield( lightfield_name=lf_name, configuration=configuration, raw_type=raw_type, ) try: convert_lightfield_to_yuv_and_pgx(lf_name, configuration) except Exception as e: print(f"Warning: failed to ensure raw conversions for {lf_name}: {e}") for encoded_file in sorted(lf_folder.glob(f"*.{encoded_extension}")): encoded_bits = encoded_file.stat().st_size * 8 actual_bpp = encoded_bits / raw_lf.get_number_of_pixels() print(f"Decoding {encoded_file.name} @ {actual_bpp:.4f} bpp") encoded_lf = EncodedLightField( lightfield=raw_lf, encoded_path=encoded_file, actual_bitrate=actual_bpp, target_bitrate=None, encoder_name=codec_cfg.get("rd_preferences", {}).get( "title", "Unknown" ), ) decoded_lf = codec_wrapper.decode(encoded_lf) for target in codec_cfg.get("decoded_conversions", []): if target != raw_type: decoded_lf = LightfieldConverter.convert( source=decoded_lf, destination_type=target, remove_after_using=True, ) try: reports_by_wrapper = compute_metrics( raw_lf, decoded_lf, wrapper_instances ) all_data.setdefault(codec_name, {}).setdefault(lf_name, []).append( (actual_bpp, reports_by_wrapper) ) finally: if decoded_lf and decoded_lf.raw_path.exists(): print(f"Removing decoded files at {decoded_lf.raw_path}") try: shutil.rmtree(decoded_lf.raw_path) except Exception as e: print( f"Warning: Failed to remove {decoded_lf.raw_path}: {e}" ) # Generate RD plots and JSON report for this LF as soon as it finishes if all_data.get(codec_name, {}).get(lf_name): single_lf_data = {codec_name: {lf_name: all_data[codec_name][lf_name]}} print(f"Generating RD plots for {lf_name}...") generate_rd_plots_from_encoded( single_lf_data, results_path, configuration, from_rd_reports=False ) print(f"Generating JSON report for {lf_name}...") create_json_from_bundle( single_lf_data, results_path, configuration, from_rd_reports=False ) return all_data
def _parse_rd_report_filename(filename: str) -> Optional[Tuple[str, str]]: """Parse lf_name and codec_name from a report filename. Expected format: {lf_name}_{codec_name}_rd_reports.json :param filename: Report filename (without directory) :return: (lf_name, codec_name) or None if format does not match """ stem = filename for suffix in ("_rd_reports.json", "_rd_report.json"): if stem.endswith(suffix): stem = stem[: -len(suffix)] break else: return None parts = stem.split("_", 1) return (parts[0], parts[1]) if len(parts) == 2 else None
[docs] def load_bundle_data_from_rd_reports( rd_reports_dir: Path, allowed_codecs: List[str], configuration: ConfigurationReader, ) -> Dict[str, Dict[str, List[Tuple[float, Dict[str, Dict]]]]]: """Build bundle data from pre-computed RD report JSON files. Expects files named ``{lf_name}_{codec_name}_rd_reports.json`` containing a ``results`` dict keyed by target bitrate label, each entry having a ``rate`` field and per-metric dicts with a ``mean`` value. The returned structure mirrors :func:`compute_metrics_from_encoded_and_decoded` so that :func:`generate_rd_plots_from_encoded` can consume both paths. :param rd_reports_dir: Directory containing RD report JSON files :param allowed_codecs: Codec names to include; empty list means all :param configuration: Configuration reader :return: Data organised as {codec: {lf_name: [(bpp, reports_by_wrapper)]}} :raises ValueError: If rd_reports_dir does not exist """ if not rd_reports_dir.exists(): raise ValueError(f"RD reports directory does not exist: {rd_reports_dir}") all_data: Dict[str, Dict[str, List[Tuple[float, Dict[str, Dict]]]]] = {} for report_file in sorted(rd_reports_dir.glob("*_rd_report*.json")): parsed = _parse_rd_report_filename(report_file.name) if parsed is None: print( f"Warning: Could not parse report filename '{report_file.name}', skipping." ) continue lf_name, codec_name = parsed if allowed_codecs and codec_name not in allowed_codecs: continue report = _read_json_report(str(report_file)) if report is None: continue results = report.get("results", {}) if not results: print(f"Warning: No results found in {report_file.name}, skipping.") continue points: List[Tuple[float, Dict[str, Dict]]] = [] for _label, entry in results.items(): actual_bpp = entry.get("rate") if actual_bpp is None: continue means = { k: v["mean"] for k, v in entry.items() if isinstance(v, dict) and "mean" in v } if means: points.append((actual_bpp, {"means": means})) if points: all_data.setdefault(codec_name, {}).setdefault(lf_name, []).extend(points) print( f"Loaded {len(points)} points for {lf_name} / {codec_name} from {report_file.name}" ) return all_data
def _create_rd_curve_from_bundle_points( codec_name: str, points: List[Tuple[float, Dict[str, Dict]]], metric: str, configuration: ConfigurationReader, ) -> Optional[RDCurveMatplotlibView]: """Create an R-D curve view from bundle data points. :param codec_name: Codec name :param points: List of (bpp, reports_by_wrapper) tuples :param metric: Quality metric name :param configuration: Configuration reader :return: R-D curve view or None if fewer than 2 valid points """ points.sort(key=lambda x: x[0]) unit, used_tool = configuration.get_used_quality_unit_and_tool(metric=metric) used_tool_short_name = configuration["quality"]["wrappers"][used_tool]["short_name"] bpps, values = [], [] for bpp, reports_by_wrapper in points: if "means" in reports_by_wrapper: value = reports_by_wrapper["means"].get(metric) elif used_tool_short_name in reports_by_wrapper: value = _extract_metric_from_report_data( bpp, reports_by_wrapper[used_tool_short_name], metric, configuration ) else: continue if value is not None: bpps.append(bpp) values.append(value) if len(values) < 2: return None codec_cfg = get_codec_configuration(configuration, codec_name) rd_preferences = codec_cfg.get("rd_preferences", {}) return RDCurveMatplotlibView( configuration=configuration, rd_curve=RDCurve( rates=bpps, rate_unit="bpp", distortions=values, distortion_name=metric, distortion_unit=unit, codec_name=codec_name, title=rd_preferences.get("title", codec_name), ), color=rd_preferences.get("color", "blue"), marker=rd_preferences.get("marker", "o"), ) def _build_rd_curve_views( lf_name: str, metric: str, all_data: Dict[str, Dict[str, List]], configuration: ConfigurationReader, ) -> List[RDCurveMatplotlibView]: """Collect RD curve views for every codec that has data for lf_name. :param lf_name: Light field name :param metric: Quality metric name :param all_data: Bundle data organised as {codec: {lf_name: [(bpp, reports)]}} :param configuration: Configuration reader :return: List of RD curve views """ rd_curve_views = [] for codec_name, codec_data in all_data.items(): if lf_name not in codec_data: continue rd_curve_view = _create_rd_curve_from_bundle_points( codec_name, codec_data[lf_name], metric, configuration ) if rd_curve_view: rd_curve_views.append(rd_curve_view) print(f"{codec_name}: {len(rd_curve_view.rd_curve.rate)} points") return rd_curve_views def _metrics_from_rd_reports_data( all_data: Dict[str, Dict[str, List[Tuple[float, Dict]]]], ) -> List[str]: """Collect all metric names present across all points in rd_reports data. :param all_data: Bundle data from load_bundle_data_from_rd_reports :return: Sorted list of metric names found in the "means" dicts """ metrics: set = set() for codec_data in all_data.values(): for points in codec_data.values(): for _bpp, reports in points: if "means" in reports: metrics.update(reports["means"].keys()) return sorted(metrics)
[docs] def generate_rd_plots_from_encoded( all_data: Dict[str, Dict[str, List[Tuple[float, Dict[str, Dict]]]]], results_path: Path, configuration: ConfigurationReader, from_rd_reports: bool = False, ) -> None: """Generate R-D plots from bundle data for all configured light fields and metrics. When from_rd_reports is True, metrics are taken from the report data itself and rd_plots config is used only for plot styling (a single pass). :param all_data: Bundle data organised as {codec: {lf_name: [(bpp, reports)]}} :param results_path: Root directory where plots will be saved :param configuration: Configuration reader :param from_rd_reports: If True, derive metrics from report data instead of rd_plots config """ rd_plot_configs = configuration["rd_plots"] if not rd_plot_configs: print("No rd_plots configuration found.") return all_lightfields = {lf for codec_data in all_data.values() for lf in codec_data} rd_plots_path = results_path / "rd_plots" rd_plots_path.mkdir(parents=True, exist_ok=True) print("Generating RD Plots") if from_rd_reports: metrics = _metrics_from_rd_reports_data(all_data) rd_plot_config = rd_plot_configs[0] plot_iterations = [(metrics, rd_plot_config)] else: plot_iterations = [(rc["metrics"], rc) for rc in rd_plot_configs] for metrics, rd_plot_config in plot_iterations: for lf_name in sorted(all_lightfields): if lf_name not in configuration.lightfield_names: configuration.lightfield_configurations[lf_name] = ( configuration.get_lightfield_configuration(lf_name) ) lightfield = LightFieldFactory.get_lightfield( configuration=configuration.lightfield_configurations[lf_name] ) for metric in metrics: print(f"\n{lf_name}{metric}") rd_curve_views = _build_rd_curve_views( lf_name, metric, all_data, configuration ) if not rd_curve_views: print("No data available — skipping.") continue file_extension = rd_plot_config.get("format", "pdf") bpp_logscale = rd_plot_config.get("bpp_logscale", True) filename = rd_plots_path / f"{lf_name}_{metric}.{file_extension}" target_bpps = sorted( { round(bpp, 3) for codec_data in all_data.values() for bpp, _ in codec_data.get(lf_name, []) } ) plot = RDPlotMatplotlib(configuration, rd_plot_config, lightfield) plot.plot( rd_curves=rd_curve_views, anchor=None, title=lf_name, bpp_logscale=bpp_logscale, target_bpps=target_bpps, interpolation=rd_plot_config.get("interpolation", None), filename=str(filename), generate_bd_report=False, ) print(f"Saved: {filename}")
[docs] def create_json_from_bundle( all_data: Dict[str, Dict[str, List[Tuple[float, Dict[str, Dict]]]]], results_path: Path, configuration: ConfigurationReader, from_rd_reports: bool = False, ) -> None: """Generate JSON reports from bundle data containing rate-distortion information. Creates detailed JSON files containing R-D values, metadata, and codec information organized by target bitrate and quality metric, mirroring the structure produced by the original create_json function. :param all_data: Bundle data organised as {codec: {lf_name: [(bpp, reports)]}} :param results_path: Root directory where JSON reports will be saved :param configuration: Configuration reader :param from_rd_reports: If True, data comes from pre-computed RD reports """ rd_reports_path = results_path / "rd_reports" rd_reports_path.mkdir(parents=True, exist_ok=True) quality_config = configuration["quality"] # Use metrics declared in rd_plots config — same source as the original create_json metrics = _get_metrics_from_config(configuration) for codec_name, codec_data in all_data.items(): print(f"Generating RD JSON for {codec_name}") try: codec_cfg = get_codec_configuration(configuration, codec_name) codec_title = codec_cfg.get("rd_preferences", {}).get("title", codec_name) except ValueError: codec_cfg = {} codec_title = codec_name print(f"Warning: Codec {codec_name} not found in configuration, using defaults") for lf_name, points in codec_data.items(): points.sort(key=lambda x: x[0]) codec_json_data = { "lightfield_name": lf_name, "codec": codec_name, "codec_title": codec_title, "raw_md5": None, "metrics": {}, "results": {}, } # Build metrics metadata — mirrors create_json exactly for metric in metrics: if CompoundMetrics.is_weighted_metric(metric, configuration): metric_cfg = quality_config.get("weighted-metrics", {}).get(metric, {}) metric_name = metric_cfg.get("name", metric) label = metric_cfg.get("label", metric) tex_label = metric_cfg.get("tex_label", label) if "metrics" in metric_cfg: quality_wrapper = {"metrics": metric_cfg["metrics"]} elif "weights" in metric_cfg: quality_wrapper = {"weighted": metric_cfg["weights"]} else: quality_wrapper = None elif CompoundMetrics.is_bd_adjusted_metric(metric, configuration): metric_cfg = quality_config.get("bd-adjusted-metrics", {}).get(metric, {}) metric_name = metric_cfg.get("name", metric) label = metric_cfg.get("label", metric) tex_label = metric_cfg.get("tex_label", label) quality_wrapper = {"origin": metric_cfg.get("origin")} else: metric_cfg = quality_config.get("metrics", {}).get(metric, {}) metric_name = metric_cfg.get("name", metric) label = metric_cfg.get("label", metric) tex_label = metric_cfg.get("tex_label", label) quality_wrapper = metric_cfg.get("quality-wrapper") unit, _ = configuration.get_used_quality_unit_and_tool(metric=metric) codec_json_data["metrics"][metric] = { "name": metric_name, "label": label, "tex_label": tex_label, "quality_wrapper": quality_wrapper, "unit": unit, } # Build results per bitrate point for bpp, reports in points: rate_key = f"{bpp:.3f}" rate_data = {"rate": bpp, "md5_of_encoded": None} if from_rd_reports and "means" in reports: for metric, value in reports["means"].items(): if metric in codec_json_data["metrics"]: rate_data[metric] = {"mean": value} else: for metric in metrics: unit, used_tool = configuration.get_used_quality_unit_and_tool( metric=metric ) used_tool_short_name = quality_config["wrappers"][used_tool]["short_name"] if used_tool_short_name not in reports: continue value = _extract_metric_from_report_data( bpp, reports[used_tool_short_name], metric, configuration ) if value is not None: rate_data[metric] = {"mean": value} codec_json_data["results"][rate_key] = rate_data json_filename = rd_reports_path / get_formatted_filename_for_rd_report(lf_name, codec_name) with open(json_filename, "w") as json_file: json.dump(codec_json_data, json_file, indent=2) print(f"Saved JSON report to {json_filename}")
[docs] def main() -> None: """Decode and generate R-D plots from an encoded light field bundle. Reads configuration from command-line arguments, then either decodes encoded files and computes quality metrics, or loads pre-computed RD report JSON files, before generating rate-distortion plots. Usage: python rd_plots_from_bundle.py <configuration.json> :raises SystemExit: If configuration or required paths are invalid """ if len(sys.argv) < 2: print("Usage: rd_plots_from_bundle.py <configuration.json>") sys.exit(1) configuration = read_config_from_argv() bundle = configuration["rd_plots_from_bundle"] codecs: List[str] = bundle.get("codecs", []) print(f"Codecs filter: {codecs if codecs else '(all)'}") if not codecs: print("No codecs specified — processing all codecs found in the bundle.") results_path = Path(bundle.get("results_path", "")) if bundle.get("compute_metrics_from_encoded", True): print("Generating RD plots from encoded light field files...") encoded_files_dir = Path(bundle.get("encoded_files_dir", "")) if not encoded_files_dir.exists(): print(f"Error: encoded_files_dir does not exist: {encoded_files_dir}") sys.exit(1) # RD plots and JSON reports are generated per-LF inside this call all_data = compute_metrics_from_encoded_and_decoded( codecs, configuration, encoded_files_dir, results_path ) else: print("Generating RD plots from RD report files...") rd_reports_dir = Path(bundle.get("rd_reports_dir", "")) if not rd_reports_dir.exists(): print(f"Error: rd_reports_dir does not exist: {rd_reports_dir}") sys.exit(1) all_data = load_bundle_data_from_rd_reports( rd_reports_dir, codecs, configuration ) if not all_data: print("No data collected.") sys.exit(1) # For the rd_reports path, generate everything at once after loading generate_rd_plots_from_encoded( all_data, results_path, configuration, from_rd_reports=True ) create_json_from_bundle( all_data, results_path, configuration, from_rd_reports=True ) if not all_data: print("No data collected.") sys.exit(1) print("Processing complete!")
if __name__ == "__main__": main()