"""
Author: Leonardo de Sousa Marques
Affiliation: Embedded Computing Lab (ECL), Federal University of Santa Catarina (UFSC)
Description:
This module aggregates utility functions for saving speedup data.
"""
import json
from pathlib import Path
from typing import Any, Dict, List
import numpy as np
from prettytable import PrettyTable
[docs]
def save_consolidated_speedup_json(
baseline_codec: str,
output_dir: Path,
speedup_data: Dict,
) -> None:
"""Saves a consolidated JSON file with speedup values for all codecs and lightfields.
:param baseline_codec: Name of the baseline codec
:type baseline_codec: str
:param output_dir: Output directory for the JSON file
:type output_dir: Path
:param speedup_data: Speedup data dictionary
:type speedup_data: Dict
:return: None
:rtype: None
"""
consolidated_data = {
"baseline": baseline_codec or "None",
"codecs": {}
}
for lf_op_type, codec_speedups in speedup_data.items():
lf_name, op_type = lf_op_type.rsplit('_', 1)
for (codec_name, _), bpp_data in codec_speedups.items():
if codec_name not in consolidated_data["codecs"]:
consolidated_data["codecs"][codec_name] = {"encoder": {}, "decoder": {}}
consolidated_data["codecs"][codec_name][op_type][lf_name] = {
bpp: {
"speedup": speedup
}
for bpp, speedup in bpp_data.items()
}
json_file = output_dir / "speedup_data.json"
json_file.parent.mkdir(parents=True, exist_ok=True)
with open(json_file, "w") as f:
json.dump(consolidated_data, f, indent=4)
print(f"Saved consolidated speedup JSON: {json_file}")
[docs]
def create_codec_comparison_matrices(speedup_data: Dict, plot_config: Dict) -> None:
"""Creates comparison matrices between all codecs (per lightfield/BPP and averaged).
:param speedup_data: Speedup data dictionary
:type speedup_data: Dict
:param plot_config: Plot configuration
:type plot_config: Dict
:return: None
:rtype: None
"""
print("\nCreating codec comparison matrices...")
# Setup output directory
output_dir = Path(plot_config["results_path"]) / "comparison_matrices"
output_dir.mkdir(parents=True, exist_ok=True)
# First calculate all speedups
if not speedup_data:
print(" No speedup data available for matrices")
return
# Get baseline codec from plot_config
baseline_codec = plot_config.get("baseline_codec", None)
# Process each lightfield and operation type
for lf_op_type, codec_speedups in speedup_data.items():
lf_name, op_type = lf_op_type.rsplit('_', 1)
# Get all unique BPPs and codecs
all_bpps = set()
codec_list = []
# First, add baseline if it exists in speedup_data (baseline should have speedup=1.0)
# But baseline might not be in speedup_data since it's calculated relative to baseline
# So we need to add it with speedup=1.0 for all BPPs
baseline_bpp_data = {}
if baseline_codec:
# Find baseline in codec_speedups or create it with speedup=1.0
baseline_found = False
for (codec_name, title), bpp_data in codec_speedups.items():
if codec_name == baseline_codec:
baseline_found = True
# Calculate baseline data (should be 1.0, but might be stored differently)
for bpp in bpp_data.keys():
speedup_value = bpp_data[bpp]
# Convert to single value if it's a list
if isinstance(speedup_value, list):
baseline_bpp_data[bpp] = 1.0 # Baseline is always 1.0
else:
baseline_bpp_data[bpp] = 1.0 # Baseline is always 1.0
all_bpps.update(bpp_data.keys())
codec_list.append((codec_name, title))
break
if not baseline_found:
# Baseline not in speedup_data, we'll need to infer BPPs from other codecs
pass
# Add all other codecs
for (codec_name, title), bpp_data in codec_speedups.items():
if codec_name != baseline_codec or not baseline_codec: # Skip if already added as baseline
codec_list.append((codec_name, title))
all_bpps.update(bpp_data.keys())
# If baseline not found, use BPPs from first codec to create baseline data
if baseline_codec and not baseline_bpp_data and bpp_data:
baseline_bpp_data = {bpp: 1.0 for bpp in bpp_data.keys()}
# Insert baseline at the beginning
baseline_title = "Baseline" # Default title
codec_list.insert(0, (baseline_codec, baseline_title))
sorted_bpps = sorted(all_bpps)
# Create a lookup dict for speedups, including baseline
speedup_lookup = {}
for (codec_name, title), bpp_data in codec_speedups.items():
speedup_lookup[(codec_name, title)] = bpp_data
# Add baseline to lookup if not found in speedup_data
if baseline_codec:
baseline_key_in_lookup = None
for codec_key in speedup_lookup.keys():
if codec_key[0] == baseline_codec:
baseline_key_in_lookup = codec_key
break
if not baseline_key_in_lookup and baseline_bpp_data:
# Find baseline title from codec_list
baseline_title = "Baseline"
for codec_key, title in codec_list:
if codec_key == baseline_codec:
baseline_title = title
break
speedup_lookup[(baseline_codec, baseline_title)] = baseline_bpp_data
# Create directory for this lightfield and operation
lf_op_dir = output_dir / op_type / lf_name
lf_op_dir.mkdir(parents=True, exist_ok=True)
# Initialize data structure for average matrix
avg_matrix = np.zeros((len(codec_list), len(codec_list)))
avg_counts = np.zeros((len(codec_list), len(codec_list)))
# Create a matrix for each BPP
for bpp in sorted_bpps:
# Initialize matrix for this BPP
matrix = np.ones((len(codec_list), len(codec_list)))
# Fill matrix with speedup ratios
for i, (codec_i, title_i) in enumerate(codec_list):
for j, (codec_j, title_j) in enumerate(codec_list):
if i == j:
continue # Diagonal remains 1
# Get speedups for both codecs at this BPP from lookup
codec_i_key = (codec_i, title_i)
codec_j_key = (codec_j, title_j)
speedup_i_data = speedup_lookup.get(codec_i_key, {}).get(bpp, None)
speedup_j_data = speedup_lookup.get(codec_j_key, {}).get(bpp, None)
# Handle both single values and lists
if speedup_i_data is not None:
speedup_i = np.mean(speedup_i_data) if isinstance(speedup_i_data, list) else speedup_i_data
else:
speedup_i = None
if speedup_j_data is not None:
speedup_j = np.mean(speedup_j_data) if isinstance(speedup_j_data, list) else speedup_j_data
else:
speedup_j = None
if speedup_i is not None and speedup_j is not None and speedup_i > 1e-9:
ratio = speedup_j / speedup_i
matrix[i][j] = ratio
# Accumulate for average matrix
avg_matrix[i][j] += ratio
avg_counts[i][j] += 1
# Create PrettyTable for this BPP
table = PrettyTable()
table.title = f"Speedup Comparison Matrix - {lf_name} {op_type} at {bpp:.3f} BPP"
table.field_names = ["Codec"] + [codec[0] for codec in codec_list]
# Add rows to table
for i, (codec_name, title) in enumerate(codec_list):
row = [codec_name] + [f"{matrix[i][j]:.2f}" for j in range(len(codec_list))]
table.add_row(row)
# Save table in multiple formats
save_comparison_table(
table=table,
output_dir=lf_op_dir,
base_name=f"comparison_matrix_{bpp:.3f}bpp",
plot_config=plot_config
)
# Calculate average matrix
with np.errstate(divide='ignore', invalid='ignore'):
avg_matrix = np.where(avg_counts > 0, avg_matrix / avg_counts, 1)
# Create PrettyTable for average matrix
avg_table = PrettyTable()
avg_table.title = f"Average Speedup Comparison Matrix - {lf_name} {op_type}"
avg_table.field_names = ["Codec"] + [codec[0] for codec in codec_list]
# Add rows to table
for i, (codec_name, title) in enumerate(codec_list):
row = [codec_name] + [f"{avg_matrix[i][j]:.2f}" for j in range(len(codec_list))]
avg_table.add_row(row)
# Save average table
save_comparison_table(
table=avg_table,
output_dir=lf_op_dir,
base_name="average_comparison_matrix",
plot_config=plot_config
)
[docs]
def save_comparison_table(
table: PrettyTable,
output_dir: Path,
base_name: str,
plot_config: Dict,
) -> None:
"""Saves a comparison table in multiple formats (latex, html, csv, txt).
:param table: PrettyTable instance with table data
:type table: PrettyTable
:param output_dir: Directory to save the output files
:type output_dir: Path
:param base_name: Base filename (without extension)
:type base_name: str
:param plot_config: Configuration dict containing table_formats
:type plot_config: Dict
:return: None
:rtype: None
"""
formats = plot_config.get("table_formats", ["latex", "txt"])
for fmt in formats:
filename = output_dir / f"{base_name}.{fmt}"
if fmt == "latex":
content = table.get_latex_string()
elif fmt == "html":
content = table.get_html_string()
elif fmt == "csv":
content = table.get_csv_string()
else:
content = table.get_string()
with open(filename, 'w') as f:
f.write(content)
print(f" Saved {fmt.upper()} matrix: {filename}")