Source code for src.performance.measure_time_and_memory
"""
Author: Leonardo de Sousa Marques
Affiliation: Embedded Computing Lab (ECL), Federal University of Santa Catarina (UFSC)
Description:
Performance visualization tool to generate execution time and memory usage plots and tables.
"""
import os
import re
import sys
from collections import defaultdict
from pathlib import Path
from typing import Any, Dict, List, Optional
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
from prettytable import PrettyTable
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from lfc_toolkit.src.performance.auxiliary.execution_time_utils import (calculate_memory_overhead,
get_metric_title,
get_metric_ylabel, load_json_file,
save_json_file, save_plot)
from lfc_toolkit.src.configuration.configuration_reader import (ConfigurationReader,
read_config_from_argv)
[docs]
class PerformanceVisualizer:
def __init__(self, configuration: ConfigurationReader, performance_config: Dict) -> None:
self.configuration = configuration
self.performance_config = performance_config
# Store averaging configuration
self.averaging_config = performance_config.get("averaging", {})
self.averaging_metric = self.averaging_config.get("metric", "median")
self.averaging_groups = self.averaging_config.get("grouping", [])
self.metrics_data = defaultdict(lambda: {
'encoder': defaultdict(lambda: defaultdict(list)),
'decoder': defaultdict(lambda: defaultdict(list))
})
# Store full processed data with individual values for speedup calculations
self.full_metrics_data = defaultdict(lambda: {
'encoder': defaultdict(lambda: defaultdict(dict)),
'decoder': defaultdict(lambda: defaultdict(dict))
})
self.baseline_codec = performance_config["codecs"].get("baseline")
self.baseline_memory = {}
[docs]
def process_all_logs(self) -> None:
"""Processes all log files from configured results paths for time and memory metrics.
:return: None
:rtype: None
"""
print(f"\nProcessing log files for performance configuration...")
codecs_to_process = self.performance_config["codecs"].get("analyse", [])
# baseline memory will be identified just before processing memory logs
# (to ensure paths from configuration are available and to avoid duplicate calls)
# Process Time metrics
if "time" in self.performance_config:
time_config = self.performance_config["time"]
performance_graphs_path = Path(os.path.expandvars(time_config.get("results_path")))
output_dir = performance_graphs_path
output_dir.mkdir(parents=True, exist_ok=True)
self._process_codecs_for_metrics(codecs_to_process, output_dir, time_config, "time_ns")
# Process Memory metrics
if "memory" in self.performance_config:
memory_config = self.performance_config["memory"]
performance_graphs_path = Path(os.path.expandvars(memory_config.get("results_path")))
output_dir = performance_graphs_path
output_dir.mkdir(parents=True, exist_ok=True)
# Baseline memory identification might need all BPPs from baseline codec.
# Call it before processing memory for other codecs.
if self.baseline_codec:
try:
self._identify_baseline_memory()
except Exception as e:
print(f"Warning identifying baseline memory: {e}")
self._process_codecs_for_metrics(codecs_to_process, output_dir, memory_config, "max_memory_usage")
self._save_consolidated_overhead_json(output_dir)
def _process_codecs_for_metrics(self, codecs_to_process: List[str], output_dir: Path,
metric_plot_config: Dict, metric_key: str) -> None:
"""Processes log files for a given set of metrics (time or memory).
:param codecs_to_process: List of codec names to process
:type codecs_to_process: List[str]
:param output_dir: Output directory for results
:type output_dir: Path
:param metric_plot_config: Configuration for metric plotting
:type metric_plot_config: Dict
:param metric_key: Key identifying the metric (e.g., time_ns, max_memory_usage)
:type metric_key: str
:return: None
:rtype: None
"""
for codec_name in codecs_to_process:
codec_config = self.configuration["codecs"]["configuration"].get(codec_name, {})
if not codec_config:
print(f" Warning: No configuration found for codec {codec_name}")
continue
results_path = Path(os.path.expandvars(codec_config.get("results")))
json_files = list(results_path.glob("*.json"))
if not json_files:
print(f" Warning: No JSON log file found in {results_path}")
continue
json_file = json_files[0]
print(f" Processing log file: {json_file} for {metric_key}")
self._process_codec_log_file(json_file, codec_name, output_dir, metric_plot_config, metric_key)
def _process_codec_log_file(self, json_file: Path, codec_name: str, output_dir: Path,
metric_plot_config: Dict, current_metric_key: str) -> None:
"""Processes a single JSON log file for a codec.
:param json_file: Path to the JSON log file
:type json_file: Path
:param codec_name: Name of the codec
:type codec_name: str
:param output_dir: Output directory for results
:type output_dir: Path
:param metric_plot_config: Configuration for metric plotting
:type metric_plot_config: Dict
:param current_metric_key: Key identifying the metric being processed
:type current_metric_key: str
:return: None
:rtype: None
"""
experiments = load_json_file(json_file)
for experiment in experiments:
if isinstance(experiment, dict):
self._process_experiment_data(experiment, codec_name, output_dir, metric_plot_config, current_metric_key)
def _identify_baseline_memory(self) -> None:
"""Identifies baseline memory using baseline codec from configuration, storing per BPP.
:return: None
:rtype: None
:raises ValueError: If no baseline codec is configured or no log files are found
"""
if not self.baseline_codec:
raise ValueError("Cannot identify baseline memory - no baseline codec")
codec_config = self.configuration["codecs"]["configuration"].get(self.baseline_codec.lower(), {})
if not codec_config:
raise ValueError(f"Warning: No configuration found for baseline codec {self.baseline_codec}")
results_path = Path(os.path.expandvars(codec_config.get("results")))
json_files = list(results_path.glob("*.json"))
if not json_files:
raise ValueError(f"Warning: No JSON log file found in {results_path}")
json_file = json_files[0]
experiments = load_json_file(json_file)
for experiment in experiments:
results = experiment.get('results', {})
for lf_name, lf_data in results.items():
for op_type in ['encoder', 'decoder']:
op_data = lf_data.get(op_type, {})
if isinstance(op_data, dict):
for bpp_str, bpp_data in op_data.items():
try:
bpp = float(bpp_str)
log_data = bpp_data.get('log_data', [])
memory_values = self._extract_memory_from_repetitions(log_data)
if memory_values:
key = (lf_name, op_type, bpp)
self.baseline_memory[key] = np.median(memory_values)
except (ValueError, TypeError):
continue
def _calculate_memory_overhead(self, lf_name: str, op_type: str, bpp: float, memory_values: List[float]) -> Dict[str, float]:
"""Calculates memory overhead percentage compared to baseline at the same BPP.
:param lf_name: Light field name
:type lf_name: str
:param op_type: Operation type (encoder or decoder)
:type op_type: str
:param bpp: Bits per pixel value
:type bpp: float
:param memory_values: List of memory values from current codec
:type memory_values: List[float]
:return: Dictionary with overhead_percent, baseline_bytes, current_bytes, or empty dict
:rtype: Dict[str, float]
"""
baseline_key = (lf_name, op_type, bpp)
if baseline_key not in self.baseline_memory or not memory_values:
return {}
return calculate_memory_overhead(self.baseline_memory[baseline_key], memory_values)
def _process_experiment_data(self, experiment_data: Dict, codec_name: str, output_dir: Path,
metric_plot_config: Dict, current_metric_key: str) -> None:
"""Processes all data from a single experiment for a specific metric.
:param experiment_data: Experiment data dictionary
:type experiment_data: Dict
:param codec_name: Name of the codec
:type codec_name: str
:param output_dir: Output directory for results
:type output_dir: Path
:param metric_plot_config: Configuration for metric plotting
:type metric_plot_config: Dict
:param current_metric_key: Key identifying the metric being processed
:type current_metric_key: str
:return: None
:rtype: None
"""
results = experiment_data.get('results', {})
if not results:
return
codec_config = self.configuration["codecs"]["configuration"].get(codec_name, {})
rd_prefs = codec_config.get("rd_preferences", {})
title = rd_prefs.get("title", codec_name)
# The metric to process is passed as current_metric_key
metrics_to_process = [current_metric_key]
for lf_name, lf_data in results.items():
if not isinstance(lf_data, dict):
continue
encoder_metrics = self._process_operation_data(
lf_data.get('encoder', {}),
True,
metrics_to_process, # Pass only the current metric
lf_name,
'encoder',
codec_name
)
decoder_metrics = self._process_operation_data(
lf_data.get('decoder', {}),
False,
metrics_to_process, # Pass only the current metric
lf_name,
'decoder',
codec_name
)
self._generate_plots(
lf_name,
encoder_metrics,
decoder_metrics,
codec_name,
title,
output_dir,
metric_plot_config, # Pass the config for the current metric type
current_metric_key # Pass the specific metric key
)
def _process_operation_data(self, op_data: Dict, is_encoder: bool, metrics: List[str],
lf_name: str, op_type: str, codec_name: str) -> Dict[str, Dict]:
"""Processes operation data and returns extracted metrics.
:param op_data: Operation data dictionary
:type op_data: Dict
:param is_encoder: Whether this is encoder data
:type is_encoder: bool
:param metrics: List of metric keys to extract
:type metrics: List[str]
:param lf_name: Light field name
:type lf_name: str
:param op_type: Operation type (encoder or decoder)
:type op_type: str
:param codec_name: Name of the codec
:type codec_name: str
:return: Dictionary of metric keys to bpp-to-metrics mapping
:rtype: Dict[str, Dict]
"""
if not isinstance(op_data, dict):
return {}
result = {}
for bpp_str, data in op_data.items():
try:
bpp = float(bpp_str)
log_data = data.get('log_data', [])
for metric in metrics:
if metric in ['time_ns', 'max_memory_usage']:
# repetitions > 1
pooled_metrics = data.get('pooled_metrics', {})
if pooled_metrics and metric in pooled_metrics:
divisor = 1e9 if metric == 'time_ns' else 1e6 if metric == 'max_memory_usage' else 1
metric_dict = {
k: v / divisor
for k, v in pooled_metrics[metric].items()
if k in ['mean', 'median', 'min', 'max', 'stddev']
}
# Also store individual values from log_data when available for speedup calculations
individual_values = []
for log_entry in log_data:
if metric == 'time_ns' and 'time_ns' in log_entry:
individual_values.append(float(log_entry['time_ns']) / divisor)
elif metric == 'max_memory_usage':
memory = log_entry.get('max_memory_usage', {})
if memory and memory.get('value'):
value = float(memory['value'])
if memory.get('unit', '').lower() == 'kbytes':
value *= 1024
individual_values.append(value / divisor)
if individual_values:
metric_dict['individual_values'] = individual_values
result.setdefault(metric, {})[bpp] = metric_dict
# repetitions == 1, there is no "pooled_metrics"
else:
metric_values = []
for log_entry in log_data:
if metric == 'time_ns' and 'time_ns' in log_entry:
metric_values.append(float(log_entry['time_ns']))
elif metric == 'max_memory_usage':
memory = log_entry.get('max_memory_usage', {})
if memory and memory.get('value'):
value = float(memory['value'])
if memory.get('unit', '').lower() == 'kbytes':
value *= 1024
metric_values.append(value)
if metric_values:
divisor = 1e9 if metric == 'time_ns' else 1e6 if metric == 'max_memory_usage' else 1
metric_dict = {
'mean': np.mean(metric_values) / divisor,
'median': np.median(metric_values) / divisor,
'min': np.min(metric_values) / divisor,
'max': np.max(metric_values) / divisor
}
# Store individual values for speedup calculations
metric_dict['individual_values'] = [v / divisor for v in metric_values]
result.setdefault(metric, {})[bpp] = metric_dict
# CALCULATE MEMORY OVERHEAD if this is not baseline and we have memory data
if (metric == 'max_memory_usage' and
not self._is_baseline(codec_name) and
bpp in result.get('max_memory_usage', {}) and
self.performance_config.get("memory", {}).get("calculate_overhead", False)):
# Extract memory values from log_data
memory_values = []
for log_entry in log_data:
memory = log_entry.get('max_memory_usage', {})
if memory and memory.get('value'):
value = float(memory['value'])
if memory.get('unit', '').lower() == 'kbytes':
value *= 1024
memory_values.append(value)
if memory_values:
# Calculate overhead compared to baseline
baseline_key = (lf_name, op_type, bpp)
if baseline_key in self.baseline_memory:
baseline_value = self.baseline_memory[baseline_key]
if baseline_value > 0:
# Calculate overhead for each individual memory value
overhead_values = [((val - baseline_value) / baseline_value) * 100 for val in memory_values]
# Calculate statistics from individual overhead values
overhead_mean = np.mean(overhead_values)
overhead_median = np.median(overhead_values)
overhead_stddev = np.std(overhead_values)
current_median = np.median(memory_values)
result.setdefault('memory_overhead', {})[bpp] = {
'median': overhead_median,
'mean': overhead_mean,
'stddev': overhead_stddev,
'baseline_bytes': baseline_value,
'current_bytes': current_median,
'individual_overheads': overhead_values # Store for aggregation
}
except (ValueError, AttributeError, TypeError) as e:
print(f"Error processing metrics for {op_type} at bpp {bpp_str}: {e}")
continue
return result
def _extract_memory_from_repetitions(self, repetitions: List[Dict], multiplier: int = 1024) -> List[float]:
"""Extracts memory values from log repetition entries.
:param repetitions: List of log data dictionaries from repetitions
:type repetitions: List[Dict]
:param multiplier: Multiplier for unit conversion (e.g., kbytes to bytes), defaults to 1024
:type multiplier: int, optional
:return: List of memory values in bytes
:rtype: List[float]
"""
memory_values = []
for rep in repetitions:
if not isinstance(rep, dict):
continue
memory = rep.get('max_memory_usage', {})
if memory and isinstance(memory, dict) and memory.get('value'):
try:
value = float(memory['value'])
unit = memory.get('unit', '').lower()
if unit == 'kbytes':
value *= multiplier
memory_values.append(value)
except (ValueError, TypeError):
continue
return memory_values
def _is_baseline(self, codec_name: str) -> bool:
"""Checks if the given codec is the baseline codec.
:param codec_name: Name of the codec to check
:type codec_name: str
:return: True if codec is baseline, False otherwise
:rtype: bool
"""
return codec_name == self.baseline_codec.lower() if self.baseline_codec else False
def _generate_plots(self, lf_name: str, encoder_data: Dict, decoder_data: Dict,
codec_name: str, title: str, output_dir: Path,
metric_plot_config: Dict, current_metric_key: str) -> None:
"""Generates all plots for a lightfield.
:param lf_name: Light field name
:type lf_name: str
:param encoder_data: Encoder metrics data
:type encoder_data: Dict
:param decoder_data: Decoder metrics data
:type decoder_data: Dict
:param codec_name: Name of the codec
:type codec_name: str
:param title: Plot title
:type title: str
:param output_dir: Output directory for plots
:type output_dir: Path
:param metric_plot_config: Configuration for metric plotting
:type metric_plot_config: Dict
:param current_metric_key: Key identifying the metric
:type current_metric_key: str
:return: None
:rtype: None
"""
if current_metric_key == "max_memory_usage" and self.performance_config.get("memory", {}).get("calculate_overhead", False) and not self._is_baseline(codec_name):
self._generate_memory_overhead_plot(
lf_name,
encoder_data.get('memory_overhead', {}), # Use memory_overhead data here
decoder_data.get('memory_overhead', {}), # Use memory_overhead data here
codec_name,
title,
output_dir,
metric_plot_config
)
# STORE memory_overhead into consolidated metrics_data so average plots / json export work
mem_config = self.performance_config.get("memory", {})
if encoder_data.get('memory_overhead'):
self._store_operation_metrics(
self.metrics_data['memory_overhead']['encoder'],
lf_name, codec_name, title,
encoder_data['memory_overhead'],
mem_config
)
if decoder_data.get('memory_overhead'):
self._store_operation_metrics(
self.metrics_data['memory_overhead']['decoder'],
lf_name, codec_name, title,
decoder_data['memory_overhead'],
mem_config
)
# Store full metrics data with individual values for speedup calculations
if encoder_data.get(current_metric_key):
self.full_metrics_data[current_metric_key]['encoder'][lf_name][(codec_name, title)] = encoder_data[current_metric_key]
if decoder_data.get(current_metric_key):
self.full_metrics_data[current_metric_key]['decoder'][lf_name][(codec_name, title)] = decoder_data[current_metric_key]
# Generate single plots for the current metric
self._store_and_plot_metrics(
lf_name,
encoder_data.get(current_metric_key, {}),
decoder_data.get(current_metric_key, {}),
codec_name,
title,
output_dir,
current_metric_key,
metric_plot_config
)
def _store_and_plot_metrics(self, lf_name: str, encoder_data: Dict, decoder_data: Dict,
codec_name: str, title: str, output_dir: Path, metric: str,
metric_plot_config: Dict) -> None:
"""Stores metrics and generates plots.
:param lf_name: Light field name
:type lf_name: str
:param encoder_data: Encoder metrics data
:type encoder_data: Dict
:param decoder_data: Decoder metrics data
:type decoder_data: Dict
:param codec_name: Name of the codec
:type codec_name: str
:param title: Plot title
:type title: str
:param output_dir: Output directory for plots
:type output_dir: Path
:param metric: Metric key
:type metric: str
:param metric_plot_config: Configuration for metric plotting
:type metric_plot_config: Dict
:return: None
:rtype: None
"""
if metric in ['time_ns', 'max_memory_usage', 'memory_overhead']:
# Store data for combined plots
if encoder_data:
self._store_operation_metrics(
self.metrics_data[metric]['encoder'],
lf_name, codec_name, title,
encoder_data,
metric_plot_config
)
if decoder_data:
self._store_operation_metrics(
self.metrics_data[metric]['decoder'],
lf_name, codec_name, title,
decoder_data,
metric_plot_config
)
# Generate individual plots if enabled for this specific metric type
if metric == "time_ns":
generate_individual_plot = self.performance_config.get("time", {}).get("generate_individual_plot", False)
elif metric == "max_memory_usage":
generate_individual_plot = self.performance_config.get("memory", {}).get("generate_individual_plot", False)
if (encoder_data or decoder_data) and generate_individual_plot:
self._generate_individual_plot(
lf_name, encoder_data, decoder_data,
codec_name, title,
output_dir, metric,
metric_plot_config
)
def _generate_memory_overhead_plot(self, lf_name: str, encoder_overhead_data: Dict, decoder_overhead_data: Dict,
codec_name: str, title: str, output_dir: Path, metric_plot_config: Dict) -> None:
"""Generates side-by-side memory overhead plots comparing baseline with other codecs.
:param lf_name: Light field name
:type lf_name: str
:param encoder_overhead_data: Encoder memory overhead data
:type encoder_overhead_data: Dict
:param decoder_overhead_data: Decoder memory overhead data
:type decoder_overhead_data: Dict
:param codec_name: Name of the codec
:type codec_name: str
:param title: Plot title
:type title: str
:param output_dir: Output directory for plots
:type output_dir: Path
:param metric_plot_config: Configuration for metric plotting
:type metric_plot_config: Dict
:return: None
:rtype: None
"""
if not encoder_overhead_data and not decoder_overhead_data:
return
# Set figure size to 10x8 (width x height)
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(10, 8))
fig.suptitle(f"{lf_name} - {title} (Memory Overhead %)".strip(), fontsize=metric_plot_config.get("plots", {}).get("font_size", 8))
if encoder_overhead_data:
self._plot_memory_overhead_data(ax1, encoder_overhead_data, "Encoder Memory Overhead", metric_plot_config.get("plots"))
if decoder_overhead_data:
self._plot_memory_overhead_data(ax2, decoder_overhead_data, "Decoder Memory Overhead", metric_plot_config.get("plots"))
plt.tight_layout(rect=[0, 0, 1, 0.96])
save_plot(output_dir / "memory_overhead" / "lightfields" / lf_name,
codec_name, metric_plot_config.get("plots", {}).get("format", "pdf"), "memory_overhead")
plt.close()
def _plot_memory_overhead_data(self, ax: Any, data: Dict, title: str, metric_plot_config: Dict) -> None:
"""Plots memory overhead data on given axis with a single legend inside the plot.
:param ax: Matplotlib axis to plot on
:type ax: Any
:param data: Memory overhead data by BPP
:type data: Dict
:param title: Axis title
:type title: str
:param metric_plot_config: Configuration for plot styling
:type metric_plot_config: Dict
:return: None
:rtype: None
"""
bpps = sorted(data.keys())
overheads = [data[bpp]['median'] for bpp in bpps if 'median' in data[bpp]]
baselines = [data[bpp].get('baseline_bytes', 0) / 1e6 for bpp in bpps] # Convert to MB
currents = [data[bpp].get('current_bytes', 0) / 1e6 for bpp in bpps] # Convert to MB
# Plot all lines and collect handles/labels for a single legend
lines = []
labels = []
l1, = ax.plot(bpps, overheads, 'o-', color='blue', label='Overhead %',
markersize=metric_plot_config['point_size'], linewidth=metric_plot_config['line_width'])
lines.append(l1)
labels.append('Overhead %')
ax.set_title(title, fontsize=metric_plot_config['font_size'])
ax.set_xlabel("Target bpp", fontsize=metric_plot_config['font_size'])
ax.set_ylabel("Memory Overhead (%)", fontsize=metric_plot_config['font_size'])
ax.grid(True)
ax_twin = ax.twinx()
l2, = ax_twin.plot(bpps, baselines, 'x--', color='green', label='Baseline (MB)',
markersize=metric_plot_config['point_size'], linewidth=metric_plot_config['line_width'])
l3, = ax_twin.plot(bpps, currents, 'x--', color='red', label='Current (MB)',
markersize=metric_plot_config['point_size'], linewidth=metric_plot_config['line_width'])
lines.extend([l2, l3])
labels.extend(['Baseline (MB)', 'Current (MB)'])
ax_twin.set_ylabel('Memory (MB)', fontsize=metric_plot_config['font_size'], labelpad=15)
ax_twin.yaxis.set_label_position("right")
ax_twin.yaxis.set_ticks_position("right")
ax_twin.figure.subplots_adjust(right=0.85)
# Single legend inside the plot (bottom right by default)
if metric_plot_config.get("legend", True):
ax.legend(lines, labels, loc='best', fontsize='small', frameon=True)
if metric_plot_config.get("xscale", None):
ax.set_xscale(metric_plot_config["xscale"])
ax_twin.set_xscale(metric_plot_config["xscale"])
# Add tick formatter if log scale
if metric_plot_config["xscale"] == "log":
ax.xaxis.set_major_formatter(matplotlib.ticker.ScalarFormatter())
ax_twin.xaxis.set_major_formatter(matplotlib.ticker.ScalarFormatter())
if metric_plot_config.get("yscale", None):
ax.set_yscale(metric_plot_config["yscale"])
# ax_twin.set_yscale(metric_plot_config["yscale"]) # Memory overhead % does not usually need log scale for twin axis
def _store_operation_metrics(self, storage: Dict, lf_name: str, codec_name: str,
title: str, metrics: Dict[float, Dict[str, float]],
metric_plot_config: Dict) -> None:
"""Stores metrics for combined plots.
:param storage: Storage dictionary to update
:type storage: Dict
:param lf_name: Light field name
:type lf_name: str
:param codec_name: Name of the codec
:type codec_name: str
:param title: Codec title for legend
:type title: str
:param metrics: Dictionary of BPP to metric data
:type metrics: Dict[float, Dict[str, float]]
:param metric_plot_config: Configuration for metric plotting
:type metric_plot_config: Dict
:return: None
:rtype: None
"""
# Use averaging metric from performance config, fallback to median
metric_key_for_combining = self.averaging_metric
for bpp, metric_data in metrics.items():
# For memory_overhead, store individual overhead values if available
# to allow proper stddev calculation across repetitions
if 'individual_overheads' in metric_data:
# Store each individual overhead value
for overhead_val in metric_data['individual_overheads']:
storage[lf_name][(codec_name, title)].append((bpp, overhead_val))
else:
# Fallback to single aggregated value for other metrics
storage[lf_name][(codec_name, title)].append((bpp, metric_data[metric_key_for_combining]))
def _generate_individual_plot(self, lf_name: str, encoder_data: Dict, decoder_data: Dict,
codec_name: str, title: str, output_dir: Path, metric: str,
metric_plot_config: Dict) -> None:
"""Generates and saves individual performance graph.
:param lf_name: Light field name
:type lf_name: str
:param encoder_data: Encoder metrics data
:type encoder_data: Dict
:param decoder_data: Decoder metrics data
:type decoder_data: Dict
:param codec_name: Name of the codec
:type codec_name: str
:param title: Plot title
:type title: str
:param output_dir: Output directory for plots
:type output_dir: Path
:param metric: Metric key
:type metric: str
:param metric_plot_config: Configuration for metric plotting
:type metric_plot_config: Dict
:return: None
:rtype: None
"""
if metric == "memory_overhead" or (not encoder_data and not decoder_data):
return
plot_styles = self._get_plot_style(codec_name, metric_plot_config)
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=tuple(plot_styles['figure_size']))
fig.suptitle(f"{lf_name} - {title} ({get_metric_title(metric)})".strip(), fontsize=plot_styles['font_size'])
self._plot_operation_metrics(ax1, encoder_data, f"Encoder ({get_metric_title(metric)})", plot_styles, metric_plot_config, metric)
self._plot_operation_metrics(ax2, decoder_data, f"Decoder ({get_metric_title(metric)})", plot_styles, metric_plot_config, metric)
plt.tight_layout()
save_plot(output_dir / metric / "lightfields" / lf_name,
codec_name, metric_plot_config.get("plots", {}).get("format", "pdf"), metric)
plt.close()
def _plot_operation_metrics(
self,
ax: Any,
data: Dict,
title: str,
plot_styles: Dict,
metric_plot_config: Dict,
metric_key: str,
) -> None:
"""Plots metrics for a single operation with configurable styling.
:param ax: Matplotlib axis to plot on
:type ax: Any
:param data: Metric data by BPP
:type data: Dict
:param title: Axis title
:type title: str
:param plot_styles: Dictionary of plot style settings
:type plot_styles: Dict
:param metric_plot_config: Configuration for metric plotting
:type metric_plot_config: Dict
:param metric_key: Key identifying the metric
:type metric_key: str
:return: None
:rtype: None
"""
if not data:
ax.set_title(f"{title.split('(')[0]}(No Data) ({title.split('(')[1]}")
return
bpps = sorted(data.keys())
median = [data[bpp]['median'] for bpp in bpps]
ax.plot(
bpps, median,
marker=plot_styles['marker'],
linestyle='-',
color=plot_styles['color'],
label='Median',
markersize=plot_styles['point_size'],
linewidth=plot_styles['line_width']
)
mean = [data[bpp]['mean'] for bpp in bpps]
ax.plot(
bpps, mean,
marker=plot_styles['marker'],
linestyle='--',
color='green',
label='Mean',
markersize=plot_styles['point_size'],
linewidth=plot_styles['line_width']
)
min_vals = [data[bpp]['min'] for bpp in bpps]
ax.plot(
bpps, min_vals,
marker='x',
linestyle='',
color='red',
label='Min',
markersize=plot_styles['point_size']
)
max_vals = [data[bpp]['max'] for bpp in bpps]
ax.plot(
bpps, max_vals,
marker='x',
linestyle='',
color='blue',
label='Max',
markersize=plot_styles['point_size']
)
ax.set_title(title, fontsize=plot_styles['font_size'])
ax.set_xlabel("Target bpp", fontsize=plot_styles['font_size'])
ax.set_ylabel(get_metric_ylabel(metric_key), fontsize=plot_styles['font_size']) # Use specific metric_key here
if metric_plot_config.get("legend", True):
ax.legend(
loc='center left',
bbox_to_anchor=(1.02, 0.5),
title=plot_styles['legend_title'],
fontsize='small',
title_fontsize='small',
frameon=False,
borderaxespad=0.0
)
ax.grid(True)
if metric_plot_config.get("xscale", None):
ax.set_xscale(metric_plot_config["xscale"])
if metric_plot_config["xscale"] == "log":
ax.xaxis.set_major_formatter(matplotlib.ticker.ScalarFormatter())
if metric_plot_config.get("yscale", None):
ax.set_yscale(metric_plot_config["yscale"])
if metric_plot_config["yscale"] == "log":
ax.yaxis.set_major_locator(matplotlib.ticker.LogLocator(base=10.0, subs="auto"))
ax.yaxis.set_major_formatter(matplotlib.ticker.ScalarFormatter())
ax.ticklabel_format(style='plain', axis='y')
[docs]
def create_combined_plots(self) -> None:
"""Creates all combined plots automatically grouped by codec type.
:return: None
:rtype: None
"""
print("Creating combined plots...\n")
# Iterate over time and memory configurations
for metric_type_key in ["time", "memory"]:
if metric_type_key in self.performance_config:
metric_config = self.performance_config[metric_type_key]
metric_plot_config = metric_config.get("plots", {})
current_metric_key = "time_ns" if metric_type_key == "time" else "max_memory_usage" # The actual metric stored
if current_metric_key not in self.metrics_data:
continue
output_dir = Path(os.path.expandvars(metric_config["results_path"]))
codec_groups = defaultdict(list)
for codec_name in self.performance_config["codecs"].get("analyse", []):
if any(c in codec_name for c in ["jplm", "parallel-jplm"]): # Group all JPLM related codecs
group = "jplm"
else:
group = codec_name
codec_groups[group].append(codec_name)
for op_type in ['encoder', 'decoder']:
if op_type not in self.metrics_data[current_metric_key]:
continue
for group_name, codecs_in_group in codec_groups.items():
group_output_path = output_dir / "combined_codecs_graphs" / f"{group_name}_{op_type.lower()}_graphs"
group_output_path.mkdir(parents=True, exist_ok=True)
for lf_name, codec_data in self.metrics_data[current_metric_key][op_type].items():
filtered_data = {
k: v for k, v in codec_data.items()
if k[0] in codecs_in_group
}
if not filtered_data:
continue
sorted_configs = sorted(
filtered_data.items(),
key=lambda x: (
x[0][0].lower().startswith("jplm") and "sequential" not in x[0][0].lower(), # Put jplm first, then sequential
"sequential" in x[0][0].lower(),
int(re.search(r"(\d+)\s*threads", x[0][0]).group(1)) if re.search(r"(\d+)\s*threads", x[0][0]) else 0, # Sort by threads
x[0][0] # Lexicographical for tie-breaking
),
reverse=False # Changed to False for ascending thread count
)
plt.figure(figsize=tuple(metric_plot_config.get('figure_size', [12, 8])))
for (codec_name, title), data_points in sorted_configs:
if data_points:
data_points.sort()
bpps, values = zip(*data_points)
styles = self._get_plot_style(codec_name, metric_config)
plt.plot(
bpps, values,
marker=styles['marker'],
color=styles['color'],
label=title,
markersize=styles['point_size'],
linewidth=styles['line_width']
)
if metric_plot_config.get("xscale", None):
plt.xscale(metric_plot_config["xscale"])
ax = plt.gca()
# Example specific tick settings (can be made configurable)
ax.get_xaxis().set_major_formatter(matplotlib.ticker.ScalarFormatter())
if metric_plot_config.get("yscale", None):
plt.yscale(metric_plot_config["yscale"])
ax = plt.gca()
if metric_plot_config["yscale"] == "log":
ax.yaxis.set_major_locator(matplotlib.ticker.LogLocator(base=10.0, subs="auto"))
ax.yaxis.set_major_formatter(matplotlib.ticker.ScalarFormatter())
ax.ticklabel_format(style='plain', axis='y')
plt.title(f"{op_type.capitalize()} - {get_metric_title(current_metric_key)} - {lf_name}", fontsize=metric_plot_config['font_size'])
plt.xlabel("Target bpp", fontsize=metric_plot_config['font_size'])
plt.ylabel(get_metric_ylabel(current_metric_key), fontsize=metric_plot_config['font_size'])
if metric_plot_config.get("legend", True):
plt.legend(
loc='center left',
bbox_to_anchor=(1.02, 0.5),
title="Codecs", # More generic title, can be 'Threads' for parallel-jplm specific
fontsize='small',
title_fontsize='small',
frameon=False,
borderaxespad=0.0
)
plt.grid(True, axis='both', linestyle=':', alpha=0.7)
plt.tight_layout()
plt.axhline(0, color='black', linewidth=0.5)
output_file = group_output_path / f"{lf_name}_combined_{group_name}_{op_type}.{metric_plot_config['format']}"
plt.savefig(output_file, format=metric_plot_config["format"], bbox_inches='tight')
plt.close()
# After all time and memory combined plots, create average plots for time and memory
if "time" in self.performance_config:
self.create_average_time_plots()
if "memory" in self.performance_config:
self.create_average_memory_plots()
# Create average memory overhead plot if enabled
if self.performance_config.get("memory", {}).get("calculate_overhead", True):
self.create_average_memory_overhead_plot()
def _get_plot_style(self, codec_name: Optional[str] = None, metric_plot_config: Optional[Dict] = None) -> Dict:
"""Gets complete plot style configuration for a codec.
:param codec_name: Name of the codec, defaults to None
:type codec_name: Optional[str], optional
:param metric_plot_config: Configuration for metric plotting, defaults to None
:type metric_plot_config: Optional[Dict], optional
:return: Dictionary of plot style settings
:rtype: Dict
"""
codec_config = self.configuration["codecs"]["configuration"].get(codec_name, {})
rd_prefs = codec_config.get("rd_preferences", {})
return {
'marker': rd_prefs.get("marker", "o"),
'label': rd_prefs.get("title", codec_name),
'font_size': metric_plot_config["plots"].get("font_size", 14),
'color': rd_prefs.get("color", "black"),
'point_size': metric_plot_config["plots"].get("point_size", 8),
'line_width': metric_plot_config["plots"].get("line_width", 2),
'legend': metric_plot_config["plots"].get("legend", True),
'legend_title': metric_plot_config["plots"].get("legend_title", "Codecs"),
'figure_size': metric_plot_config["plots"].get("figure_size", [12, 8]),
'baseline_style': {
'color': 'gray',
'linestyle': '--',
'linewidth': metric_plot_config["plots"].get('line_width', 2) if metric_plot_config["plots"] else 2
}
}
[docs]
def create_average_time_plots(self) -> None:
"""Creates separate plots of average time with standard deviation for encoder and decoder.
:return: None
:rtype: None
"""
print("\nCreating average time plots with standard deviation...")
time_config = self.performance_config.get("time", {})
output_dir = Path(os.path.expandvars(time_config.get("results_path"))) / "average_plots"
output_dir.mkdir(parents=True, exist_ok=True)
metric = "time_ns"
if metric not in self.metrics_data:
print(" No time data available. Skipping average plots.")
return
# Get available lightfields from processed data
available_lfs = set()
for op_type in ["encoder", "decoder"]:
if op_type in self.metrics_data[metric]:
available_lfs.update(self.metrics_data[metric][op_type].keys())
# Process each averaging group
for group_config in self.averaging_groups:
group_name = group_config.get("name", "unknown")
group_lfs = group_config.get("lightfields", [])
# Filter to only include lightfields that exist in the data
valid_lfs = [lf for lf in group_lfs if lf in available_lfs]
if not valid_lfs:
print(f" Skipping group '{group_name}': no valid lightfields found in data")
continue
# Aggregate time data for this group by codec and BPP
encoder_time_data = defaultdict(lambda: defaultdict(list))
decoder_time_data = defaultdict(lambda: defaultdict(list))
if "encoder" in self.metrics_data[metric]:
for lf_name in valid_lfs:
if lf_name in self.metrics_data[metric]["encoder"]:
codec_data = self.metrics_data[metric]["encoder"][lf_name]
for (codec_name, title), data_points in codec_data.items():
for bpp, value in data_points:
encoder_time_data[codec_name][bpp].append(value)
if "decoder" in self.metrics_data[metric]:
for lf_name in valid_lfs:
if lf_name in self.metrics_data[metric]["decoder"]:
codec_data = self.metrics_data[metric]["decoder"][lf_name]
for (codec_name, title), data_points in codec_data.items():
for bpp, value in data_points:
decoder_time_data[codec_name][bpp].append(value)
# Generate plots for this group
if encoder_time_data:
self._generate_average_metric_plot(
encoder_time_data, "Encoder", output_dir, time_config, group_name, metric
)
if decoder_time_data:
self._generate_average_metric_plot(
decoder_time_data, "Decoder", output_dir, time_config, group_name, metric
)
[docs]
def create_average_memory_plots(self) -> None:
"""Creates separate plots of average memory usage with standard deviation for encoder and decoder.
:return: None
:rtype: None
"""
print("\nCreating average memory plots with standard deviation...")
memory_config = self.performance_config.get("memory", {})
output_dir = Path(os.path.expandvars(memory_config.get("results_path"))) / "average_plots"
output_dir.mkdir(parents=True, exist_ok=True)
metric = "max_memory_usage"
if metric not in self.metrics_data:
print(" No memory data available. Skipping average plots.")
return
# Get available lightfields from processed data
available_lfs = set()
for op_type in ["encoder", "decoder"]:
if op_type in self.metrics_data[metric]:
available_lfs.update(self.metrics_data[metric][op_type].keys())
# Process each averaging group
for group_config in self.averaging_groups:
group_name = group_config.get("name", "unknown")
group_lfs = group_config.get("lightfields", [])
# Filter to only include lightfields that exist in the data
valid_lfs = [lf for lf in group_lfs if lf in available_lfs]
if not valid_lfs:
print(f" Skipping group '{group_name}': no valid lightfields found in data")
continue
# Aggregate memory data for this group by codec and BPP
encoder_memory_data = defaultdict(lambda: defaultdict(list))
decoder_memory_data = defaultdict(lambda: defaultdict(list))
if "encoder" in self.metrics_data[metric]:
for lf_name in valid_lfs:
if lf_name in self.metrics_data[metric]["encoder"]:
codec_data = self.metrics_data[metric]["encoder"][lf_name]
for (codec_name, title), data_points in codec_data.items():
for bpp, value in data_points:
encoder_memory_data[codec_name][bpp].append(value)
if "decoder" in self.metrics_data[metric]:
for lf_name in valid_lfs:
if lf_name in self.metrics_data[metric]["decoder"]:
codec_data = self.metrics_data[metric]["decoder"][lf_name]
for (codec_name, title), data_points in codec_data.items():
for bpp, value in data_points:
decoder_memory_data[codec_name][bpp].append(value)
# Generate plots for this group
if encoder_memory_data:
self._generate_average_metric_plot(
encoder_memory_data, "Encoder", output_dir, memory_config, group_name, metric
)
if decoder_memory_data:
self._generate_average_metric_plot(
decoder_memory_data, "Decoder", output_dir, memory_config, group_name, metric
)
def _generate_average_metric_plot(self, metric_data: Dict, op_type: str, output_dir: Path,
metric_config: Dict, group_name: str, metric_key: str) -> None:
"""Generates a single average plot for either encoder or decoder for a given group and metric.
:param metric_data: Aggregated metric data by codec and BPP
:type metric_data: Dict
:param op_type: Operation type (Encoder or Decoder)
:type op_type: str
:param output_dir: Output directory for plots
:type output_dir: Path
:param metric_config: Configuration for the metric
:type metric_config: Dict
:param group_name: Name of the lightfield grouping
:type group_name: str
:param metric_key: Key identifying the metric
:type metric_key: str
:return: None
:rtype: None
"""
if not metric_data:
print(f" No {op_type.lower()} {metric_key} data available for plotting (group: {group_name})")
return
codec_stats = {}
for codec_name, bpp_data in metric_data.items():
codec_stats[codec_name] = {}
for bpp, values in bpp_data.items():
codec_stats[codec_name][bpp] = {
'mean': np.mean(values),
'stddev': np.std(values)
}
metric_plot_config = metric_config.get("plots", {})
plt.figure(figsize=tuple(metric_plot_config.get("figure_size", [12, 8])))
for codec_name, stats in codec_stats.items():
plot_styles = self._get_plot_style(codec_name, metric_config)
sorted_bpps = sorted(stats.keys())
means = [stats[bpp]['mean'] for bpp in sorted_bpps]
stddevs = [stats[bpp]['stddev'] for bpp in sorted_bpps]
plt.plot(
sorted_bpps, means,
marker=plot_styles['marker'],
color=plot_styles['color'],
label=plot_styles['label'],
markersize=plot_styles['point_size'],
linewidth=plot_styles['line_width']
)
plt.fill_between(
sorted_bpps,
np.array(means) - np.array(stddevs),
np.array(means) + np.array(stddevs),
color=plot_styles['color'],
alpha=0.3
)
plt.title(f"Average {get_metric_title(metric_key)} with std deviation - {op_type} ({group_name})",
fontsize=metric_plot_config["font_size"])
plt.xlabel("Target bpp", fontsize=metric_plot_config["font_size"])
plt.ylabel(get_metric_ylabel(metric_key), fontsize=metric_plot_config["font_size"])
plt.grid(True, axis='both', linestyle=':', alpha=0.7)
if metric_plot_config.get("legend", True):
handles, labels = plt.gca().get_legend_handles_labels()
plt.legend(
handles[::-1], labels[::-1],
loc='center left',
bbox_to_anchor=(1.02, 0.5),
title="Codecs",
fontsize='small',
title_fontsize='small',
frameon=False,
borderaxespad=0.0
)
plt.tight_layout()
if metric_plot_config.get("xscale", None):
plt.xscale(metric_plot_config["xscale"])
ax = plt.gca()
if metric_plot_config["xscale"] == "log":
ax.get_xaxis().set_major_formatter(matplotlib.ticker.ScalarFormatter())
if metric_plot_config.get("yscale", None):
plt.yscale(metric_plot_config["yscale"])
ax = plt.gca()
if metric_plot_config["yscale"] == "log":
ax.yaxis.set_major_locator(matplotlib.ticker.LogLocator(base=10.0, subs="auto"))
ax.yaxis.set_major_formatter(matplotlib.ticker.ScalarFormatter())
ax.ticklabel_format(style='plain', axis='y')
output_file = output_dir / f"average_{metric_key}_with_stddev_{op_type.lower()}_{group_name}.{metric_plot_config['format']}"
plt.savefig(output_file, format=metric_plot_config["format"], bbox_inches='tight')
plt.close()
print(f" Saved {op_type.lower()} average {metric_key} plot: {output_file}")
def _save_consolidated_overhead_json(self, output_dir: Path) -> None:
"""Saves a consolidated JSON file with overhead values for all codecs and lightfields.
:param output_dir: Output directory for the JSON file
:type output_dir: Path
:return: None
:rtype: None
"""
# This will be generated under the memory output directory
memory_output_dir = Path(os.path.expandvars(self.performance_config.get("memory", {}).get("results_path", "./temp_memory_results")))
memory_output_dir.mkdir(parents=True, exist_ok=True)
consolidated_data = {
"baseline": self.baseline_codec or "None",
"codecs": {}
}
metric = "memory_overhead"
if metric not in self.metrics_data:
return # No memory overhead data to save
for op_type in ["encoder", "decoder"]:
if op_type not in self.metrics_data[metric]:
continue
for lf_name, codec_data in self.metrics_data[metric][op_type].items():
for (codec_name, title), data_points in codec_data.items():
if codec_name not in consolidated_data["codecs"]:
consolidated_data["codecs"][codec_name] = {"encoder": {}, "decoder": {}}
overhead_values = {}
for bpp, overhead_value in data_points:
overhead_values[str(bpp)] = { # Ensure BPP is stored as string key in JSON
"memory": {
"overhead (%)": float(overhead_value),
}
}
consolidated_data["codecs"][codec_name][op_type][lf_name] = overhead_values
json_file = memory_output_dir / "memory_overhead" / "memory_overhead_data.json"
save_json_file(json_file, consolidated_data, "Saved consolidated overhead JSON")
[docs]
def create_average_memory_overhead_plot(self) -> None:
"""Creates separate plots of average memory overhead with standard deviation for encoder and decoder.
:return: None
:rtype: None
"""
print("\nCreating average memory overhead plots with standard deviation...")
memory_config = self.performance_config.get("memory", {})
output_dir = Path(os.path.expandvars(memory_config.get("results_path"))) / "average_plots"
output_dir.mkdir(parents=True, exist_ok=True)
metric = "memory_overhead"
if metric not in self.metrics_data:
print(" No memory overhead data available. Skipping average plots.")
return
# Get available lightfields from processed data
available_lfs = set()
for op_type in ["encoder", "decoder"]:
if op_type in self.metrics_data[metric]:
available_lfs.update(self.metrics_data[metric][op_type].keys())
# Process each averaging group
for group_config in self.averaging_groups:
group_name = group_config.get("name", "unknown")
group_lfs = group_config.get("lightfields", [])
# Filter to only include lightfields that exist in the data
valid_lfs = [lf for lf in group_lfs if lf in available_lfs]
if not valid_lfs:
print(f" Skipping group '{group_name}': no valid lightfields found in data")
continue
# Aggregate overhead data for this group by codec and BPP
encoder_overhead_data = defaultdict(lambda: defaultdict(list))
decoder_overhead_data = defaultdict(lambda: defaultdict(list))
if "encoder" in self.metrics_data[metric]:
for lf_name in valid_lfs:
if lf_name in self.metrics_data[metric]["encoder"]:
codec_data = self.metrics_data[metric]["encoder"][lf_name]
for (codec_name, title), data_points in codec_data.items():
for bpp, overhead in data_points:
encoder_overhead_data[codec_name][bpp].append(overhead)
if "decoder" in self.metrics_data[metric]:
for lf_name in valid_lfs:
if lf_name in self.metrics_data[metric]["decoder"]:
codec_data = self.metrics_data[metric]["decoder"][lf_name]
for (codec_name, title), data_points in codec_data.items():
for bpp, overhead in data_points:
decoder_overhead_data[codec_name][bpp].append(overhead)
# Generate plots for this group
if encoder_overhead_data:
self._generate_overhead_average_plot(
encoder_overhead_data, "Encoder", output_dir, memory_config, group_name
)
if decoder_overhead_data:
self._generate_overhead_average_plot(
decoder_overhead_data, "Decoder", output_dir, memory_config, group_name
)
def _generate_overhead_average_plot(self, overhead_data: Dict, op_type: str, output_dir: Path, metric_config: Dict, category: str) -> None:
"""Generates a single average overhead plot for either encoder or decoder for a given category.
:param overhead_data: Aggregated overhead data by codec and BPP
:type overhead_data: Dict
:param op_type: Operation type (Encoder or Decoder)
:type op_type: str
:param output_dir: Output directory for plots
:type output_dir: Path
:param metric_config: Configuration for the metric
:type metric_config: Dict
:param category: Name of the lightfield category
:type category: str
:return: None
:rtype: None
"""
if not overhead_data:
print(f" No {op_type.lower()} overhead data available for plotting (category: {category})")
return
codec_stats = {}
for codec_name, bpp_data in overhead_data.items():
codec_stats[codec_name] = {}
for bpp, values in bpp_data.items():
codec_stats[codec_name][bpp] = {
'mean': np.mean(values),
'stddev': np.std(values)
}
metric_plot_config = metric_config.get("plots", {})
plt.figure(figsize=tuple(metric_plot_config.get("figure_size", [12, 8])))
for codec_name, stats in codec_stats.items():
plot_styles = self._get_plot_style(codec_name, metric_config)
sorted_bpps = sorted(stats.keys())
means = [stats[bpp]['mean'] for bpp in sorted_bpps]
stddevs = [stats[bpp]['stddev'] for bpp in sorted_bpps]
plt.plot(
sorted_bpps, means,
marker=plot_styles['marker'],
color=plot_styles['color'],
label=self.configuration["codecs"]["configuration"][codec_name]["rd_preferences"]["title"],
markersize=plot_styles['point_size'],
linewidth=plot_styles['line_width']
)
plt.fill_between(
sorted_bpps,
np.array(means) - np.array(stddevs),
np.array(means) + np.array(stddevs),
color=plot_styles['color'],
alpha=0.3
)
plt.title(f"Average memory overhead (%) with std deviation - {op_type} ({category})", fontsize=metric_plot_config["font_size"])
plt.xlabel("Target bpp", fontsize=metric_plot_config["font_size"])
plt.ylabel("Memory Overhead (%)", fontsize=metric_plot_config["font_size"])
plt.grid(True, axis='both', linestyle=':', alpha=0.7)
if metric_plot_config.get("legend", True):
handles, labels = plt.gca().get_legend_handles_labels()
# Reverse order of handles and labels to match the plot order if desired
plt.legend(
handles[::-1], labels[::-1],
loc='center left',
bbox_to_anchor=(1.02, 0.5),
title="Codecs",
fontsize='small',
title_fontsize='small',
frameon=False,
borderaxespad=0.0
)
plt.tight_layout()
if metric_plot_config.get("xscale", None):
plt.xscale(metric_plot_config["xscale"])
ax = plt.gca()
ax.set_xticks([0.005, 0.02, 0.1, 0.75])
ax.get_xaxis().set_major_formatter(matplotlib.ticker.ScalarFormatter())
safe_category = re.sub(r'[^A-Za-z0-9_-]+', '_', str(category))
output_file = output_dir / f"average_memory_overhead_with_stddev_{op_type.lower()}_{safe_category}.{metric_plot_config['format']}"
plt.savefig(output_file, format=metric_plot_config["format"], bbox_inches='tight')
plt.close()
print(f" Saved {op_type.lower()} average memory overhead plot: {output_file}")
[docs]
def generate_time_difference_table(self) -> None:
"""Generates a table comparing the median execution time difference (encoder - decoder) for all codecs and lightfields.
:return: None
:rtype: None
"""
print("Generating execution time difference table...")
# Get the time configuration for output paths and formats
time_config = self.performance_config.get("time", {})
output_dir = Path(os.path.expandvars(time_config.get("results_path", "./temp_time_results"))) / "tables"
output_dir.mkdir(parents=True, exist_ok=True)
table = PrettyTable()
table.field_names = [
"Lightfield", "BPP", "Codec",
"Encoder Median (s)", "Decoder Median (s)",
"Encoder/Decoder Ratio",
"Time Difference (Encoder - Decoder) (s)",
"Relative Time Difference ([Encoder-Decoder]/Encoder) (%)"
]
encoder_data = self.metrics_data.get("time_ns", {}).get("encoder", {})
decoder_data = self.metrics_data.get("time_ns", {}).get("decoder", {})
for lf_name in encoder_data:
for (codec_name, title), enc_points in encoder_data[lf_name].items():
enc_dict = dict(enc_points)
dec_points = decoder_data.get(lf_name, {}).get((codec_name, title), [])
dec_dict = dict(dec_points)
for bpp in enc_dict:
encoder_median = enc_dict.get(bpp, 0)
decoder_median = dec_dict.get(bpp, 0)
time_difference = encoder_median - decoder_median
relative_time_difference = (time_difference / encoder_median * 100) if encoder_median > 0 else 0
decoder_encoder_ratio = (encoder_median/decoder_median) if decoder_median > 0 else 0
table.add_row([
lf_name, f"{bpp:.3f}", codec_name, # Format BPP to 3 decimal places
f"{encoder_median:.6f}", f"{decoder_median:.6f}", # Format time for readability
f"{decoder_encoder_ratio:.3f}",
f"{time_difference:.6f}", f"{relative_time_difference:.2f}"
])
tables_config = time_config.get("tables", {})
formats = tables_config.get("formats", ["text", "html"])
if tables_config.get("verbose", True):
print('\n' + table.get_string())
for fmt in formats:
filename = output_dir / f"execution_time_difference.{fmt}"
if fmt == "text":
with open(filename, "w") as f:
f.write(table.get_string())
elif fmt == "html":
with open(filename, "w") as f:
f.write(table.get_html_string())
elif fmt == "csv":
with open(filename, "w") as f:
f.write(table.get_csv_string())
elif fmt == "latex":
with open(filename, "w") as f:
content = table.get_latex_string().replace(
r"\begin{tabular}",
f"\\begin{{tabular}}\n\\caption{{Execution Time Difference - {table.title}}}",
1
)
f.write(content)
elif fmt == "mediawiki":
with open(filename, "w") as f:
f.write(table.get_mediawiki_string())
print(f"Saved execution time difference table ({fmt.upper()}): {filename}")
[docs]
def main() -> None:
base_path = Path(os.path.abspath(os.path.dirname(sys.argv[0])))
configuration = read_config_from_argv(overriden_base_path=base_path / "..")
# The 'performance' key holds a list of configurations
performance_configurations = configuration["performance"]
for perf_config in performance_configurations:
# Each item in the list is a complete performance configuration
print(f"Processing performance configuration block.")
visualizer = PerformanceVisualizer(configuration=configuration, performance_config=perf_config)
# Process logs for both time and memory metrics within this visualizer instance
visualizer.process_all_logs()
# Create combined plots for time and memory (if enabled in respective configs)
visualizer.create_combined_plots()
# Generate time difference table
if perf_config.get("time", {}).get("tables", {}).get("generate_time_difference_table", False):
visualizer.generate_time_difference_table()
if __name__ == "__main__":
main()