Source code for src.performance.auxiliary.speedup_utils

"""
Author: Leonardo de Sousa Marques
Affiliation: Embedded Computing Lab (ECL), Federal University of Santa Catarina (UFSC)

Description:
    This module aggregates utility functions for saving speedup data.
"""

import json
from pathlib import Path
from typing import Any, Dict, List

import numpy as np
from prettytable import PrettyTable


[docs] def save_consolidated_speedup_json( baseline_codec: str, output_dir: Path, speedup_data: Dict, ) -> None: """Saves a consolidated JSON file with speedup values for all codecs and lightfields. :param baseline_codec: Name of the baseline codec :type baseline_codec: str :param output_dir: Output directory for the JSON file :type output_dir: Path :param speedup_data: Speedup data dictionary :type speedup_data: Dict :return: None :rtype: None """ consolidated_data = { "baseline": baseline_codec or "None", "codecs": {} } for lf_op_type, codec_speedups in speedup_data.items(): lf_name, op_type = lf_op_type.rsplit('_', 1) for (codec_name, _), bpp_data in codec_speedups.items(): if codec_name not in consolidated_data["codecs"]: consolidated_data["codecs"][codec_name] = {"encoder": {}, "decoder": {}} consolidated_data["codecs"][codec_name][op_type][lf_name] = { bpp: { "speedup": speedup } for bpp, speedup in bpp_data.items() } json_file = output_dir / "speedup_data.json" json_file.parent.mkdir(parents=True, exist_ok=True) with open(json_file, "w") as f: json.dump(consolidated_data, f, indent=4) print(f"Saved consolidated speedup JSON: {json_file}")
[docs] def create_codec_comparison_matrices(speedup_data: Dict, plot_config: Dict) -> None: """Creates comparison matrices between all codecs (per lightfield/BPP and averaged). :param speedup_data: Speedup data dictionary :type speedup_data: Dict :param plot_config: Plot configuration :type plot_config: Dict :return: None :rtype: None """ print("\nCreating codec comparison matrices...") # Setup output directory output_dir = Path(plot_config["results_path"]) / "comparison_matrices" output_dir.mkdir(parents=True, exist_ok=True) # First calculate all speedups if not speedup_data: print(" No speedup data available for matrices") return # Get baseline codec from plot_config baseline_codec = plot_config.get("baseline_codec", None) # Process each lightfield and operation type for lf_op_type, codec_speedups in speedup_data.items(): lf_name, op_type = lf_op_type.rsplit('_', 1) # Get all unique BPPs and codecs all_bpps = set() codec_list = [] # First, add baseline if it exists in speedup_data (baseline should have speedup=1.0) # But baseline might not be in speedup_data since it's calculated relative to baseline # So we need to add it with speedup=1.0 for all BPPs baseline_bpp_data = {} if baseline_codec: # Find baseline in codec_speedups or create it with speedup=1.0 baseline_found = False for (codec_name, title), bpp_data in codec_speedups.items(): if codec_name == baseline_codec: baseline_found = True # Calculate baseline data (should be 1.0, but might be stored differently) for bpp in bpp_data.keys(): speedup_value = bpp_data[bpp] # Convert to single value if it's a list if isinstance(speedup_value, list): baseline_bpp_data[bpp] = 1.0 # Baseline is always 1.0 else: baseline_bpp_data[bpp] = 1.0 # Baseline is always 1.0 all_bpps.update(bpp_data.keys()) codec_list.append((codec_name, title)) break if not baseline_found: # Baseline not in speedup_data, we'll need to infer BPPs from other codecs pass # Add all other codecs for (codec_name, title), bpp_data in codec_speedups.items(): if codec_name != baseline_codec or not baseline_codec: # Skip if already added as baseline codec_list.append((codec_name, title)) all_bpps.update(bpp_data.keys()) # If baseline not found, use BPPs from first codec to create baseline data if baseline_codec and not baseline_bpp_data and bpp_data: baseline_bpp_data = {bpp: 1.0 for bpp in bpp_data.keys()} # Insert baseline at the beginning baseline_title = "Baseline" # Default title codec_list.insert(0, (baseline_codec, baseline_title)) sorted_bpps = sorted(all_bpps) # Create a lookup dict for speedups, including baseline speedup_lookup = {} for (codec_name, title), bpp_data in codec_speedups.items(): speedup_lookup[(codec_name, title)] = bpp_data # Add baseline to lookup if not found in speedup_data if baseline_codec: baseline_key_in_lookup = None for codec_key in speedup_lookup.keys(): if codec_key[0] == baseline_codec: baseline_key_in_lookup = codec_key break if not baseline_key_in_lookup and baseline_bpp_data: # Find baseline title from codec_list baseline_title = "Baseline" for codec_key, title in codec_list: if codec_key == baseline_codec: baseline_title = title break speedup_lookup[(baseline_codec, baseline_title)] = baseline_bpp_data # Create directory for this lightfield and operation lf_op_dir = output_dir / op_type / lf_name lf_op_dir.mkdir(parents=True, exist_ok=True) # Initialize data structure for average matrix avg_matrix = np.zeros((len(codec_list), len(codec_list))) avg_counts = np.zeros((len(codec_list), len(codec_list))) # Create a matrix for each BPP for bpp in sorted_bpps: # Initialize matrix for this BPP matrix = np.ones((len(codec_list), len(codec_list))) # Fill matrix with speedup ratios for i, (codec_i, title_i) in enumerate(codec_list): for j, (codec_j, title_j) in enumerate(codec_list): if i == j: continue # Diagonal remains 1 # Get speedups for both codecs at this BPP from lookup codec_i_key = (codec_i, title_i) codec_j_key = (codec_j, title_j) speedup_i_data = speedup_lookup.get(codec_i_key, {}).get(bpp, None) speedup_j_data = speedup_lookup.get(codec_j_key, {}).get(bpp, None) # Handle both single values and lists if speedup_i_data is not None: speedup_i = np.mean(speedup_i_data) if isinstance(speedup_i_data, list) else speedup_i_data else: speedup_i = None if speedup_j_data is not None: speedup_j = np.mean(speedup_j_data) if isinstance(speedup_j_data, list) else speedup_j_data else: speedup_j = None if speedup_i is not None and speedup_j is not None and speedup_i > 1e-9: ratio = speedup_j / speedup_i matrix[i][j] = ratio # Accumulate for average matrix avg_matrix[i][j] += ratio avg_counts[i][j] += 1 # Create PrettyTable for this BPP table = PrettyTable() table.title = f"Speedup Comparison Matrix - {lf_name} {op_type} at {bpp:.3f} BPP" table.field_names = ["Codec"] + [codec[0] for codec in codec_list] # Add rows to table for i, (codec_name, title) in enumerate(codec_list): row = [codec_name] + [f"{matrix[i][j]:.2f}" for j in range(len(codec_list))] table.add_row(row) # Save table in multiple formats save_comparison_table( table=table, output_dir=lf_op_dir, base_name=f"comparison_matrix_{bpp:.3f}bpp", plot_config=plot_config ) # Calculate average matrix with np.errstate(divide='ignore', invalid='ignore'): avg_matrix = np.where(avg_counts > 0, avg_matrix / avg_counts, 1) # Create PrettyTable for average matrix avg_table = PrettyTable() avg_table.title = f"Average Speedup Comparison Matrix - {lf_name} {op_type}" avg_table.field_names = ["Codec"] + [codec[0] for codec in codec_list] # Add rows to table for i, (codec_name, title) in enumerate(codec_list): row = [codec_name] + [f"{avg_matrix[i][j]:.2f}" for j in range(len(codec_list))] avg_table.add_row(row) # Save average table save_comparison_table( table=avg_table, output_dir=lf_op_dir, base_name="average_comparison_matrix", plot_config=plot_config )
[docs] def save_comparison_table( table: PrettyTable, output_dir: Path, base_name: str, plot_config: Dict, ) -> None: """Saves a comparison table in multiple formats (latex, html, csv, txt). :param table: PrettyTable instance with table data :type table: PrettyTable :param output_dir: Directory to save the output files :type output_dir: Path :param base_name: Base filename (without extension) :type base_name: str :param plot_config: Configuration dict containing table_formats :type plot_config: Dict :return: None :rtype: None """ formats = plot_config.get("table_formats", ["latex", "txt"]) for fmt in formats: filename = output_dir / f"{base_name}.{fmt}" if fmt == "latex": content = table.get_latex_string() elif fmt == "html": content = table.get_html_string() elif fmt == "csv": content = table.get_csv_string() else: content = table.get_string() with open(filename, 'w') as f: f.write(content) print(f" Saved {fmt.upper()} matrix: {filename}")