Source code for src.performance.measure_time_and_memory

"""
Author: Leonardo de Sousa Marques
Affiliation: Embedded Computing Lab (ECL), Federal University of Santa Catarina (UFSC)

Description:
    Performance visualization tool to generate execution time and memory usage plots and tables.
"""

import os
import re
import sys
from collections import defaultdict
from pathlib import Path
from typing import Any, Dict, List, Optional

import matplotlib
import matplotlib.pyplot as plt
import numpy as np
from prettytable import PrettyTable

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))

from lfc_toolkit.src.performance.auxiliary.execution_time_utils import (calculate_memory_overhead,
                                                                        get_metric_title,
                                                                        get_metric_ylabel, load_json_file,
                                                                        save_json_file, save_plot)

from lfc_toolkit.src.configuration.configuration_reader import (ConfigurationReader,
                                                                read_config_from_argv)

[docs] class PerformanceVisualizer: def __init__(self, configuration: ConfigurationReader, performance_config: Dict) -> None: self.configuration = configuration self.performance_config = performance_config # Store averaging configuration self.averaging_config = performance_config.get("averaging", {}) self.averaging_metric = self.averaging_config.get("metric", "median") self.averaging_groups = self.averaging_config.get("grouping", []) self.metrics_data = defaultdict(lambda: { 'encoder': defaultdict(lambda: defaultdict(list)), 'decoder': defaultdict(lambda: defaultdict(list)) }) # Store full processed data with individual values for speedup calculations self.full_metrics_data = defaultdict(lambda: { 'encoder': defaultdict(lambda: defaultdict(dict)), 'decoder': defaultdict(lambda: defaultdict(dict)) }) self.baseline_codec = performance_config["codecs"].get("baseline") self.baseline_memory = {}
[docs] def process_all_logs(self) -> None: """Processes all log files from configured results paths for time and memory metrics. :return: None :rtype: None """ print(f"\nProcessing log files for performance configuration...") codecs_to_process = self.performance_config["codecs"].get("analyse", []) # baseline memory will be identified just before processing memory logs # (to ensure paths from configuration are available and to avoid duplicate calls) # Process Time metrics if "time" in self.performance_config: time_config = self.performance_config["time"] performance_graphs_path = Path(os.path.expandvars(time_config.get("results_path"))) output_dir = performance_graphs_path output_dir.mkdir(parents=True, exist_ok=True) self._process_codecs_for_metrics(codecs_to_process, output_dir, time_config, "time_ns") # Process Memory metrics if "memory" in self.performance_config: memory_config = self.performance_config["memory"] performance_graphs_path = Path(os.path.expandvars(memory_config.get("results_path"))) output_dir = performance_graphs_path output_dir.mkdir(parents=True, exist_ok=True) # Baseline memory identification might need all BPPs from baseline codec. # Call it before processing memory for other codecs. if self.baseline_codec: try: self._identify_baseline_memory() except Exception as e: print(f"Warning identifying baseline memory: {e}") self._process_codecs_for_metrics(codecs_to_process, output_dir, memory_config, "max_memory_usage") self._save_consolidated_overhead_json(output_dir)
def _process_codecs_for_metrics(self, codecs_to_process: List[str], output_dir: Path, metric_plot_config: Dict, metric_key: str) -> None: """Processes log files for a given set of metrics (time or memory). :param codecs_to_process: List of codec names to process :type codecs_to_process: List[str] :param output_dir: Output directory for results :type output_dir: Path :param metric_plot_config: Configuration for metric plotting :type metric_plot_config: Dict :param metric_key: Key identifying the metric (e.g., time_ns, max_memory_usage) :type metric_key: str :return: None :rtype: None """ for codec_name in codecs_to_process: codec_config = self.configuration["codecs"]["configuration"].get(codec_name, {}) if not codec_config: print(f" Warning: No configuration found for codec {codec_name}") continue results_path = Path(os.path.expandvars(codec_config.get("results"))) json_files = list(results_path.glob("*.json")) if not json_files: print(f" Warning: No JSON log file found in {results_path}") continue json_file = json_files[0] print(f" Processing log file: {json_file} for {metric_key}") self._process_codec_log_file(json_file, codec_name, output_dir, metric_plot_config, metric_key) def _process_codec_log_file(self, json_file: Path, codec_name: str, output_dir: Path, metric_plot_config: Dict, current_metric_key: str) -> None: """Processes a single JSON log file for a codec. :param json_file: Path to the JSON log file :type json_file: Path :param codec_name: Name of the codec :type codec_name: str :param output_dir: Output directory for results :type output_dir: Path :param metric_plot_config: Configuration for metric plotting :type metric_plot_config: Dict :param current_metric_key: Key identifying the metric being processed :type current_metric_key: str :return: None :rtype: None """ experiments = load_json_file(json_file) for experiment in experiments: if isinstance(experiment, dict): self._process_experiment_data(experiment, codec_name, output_dir, metric_plot_config, current_metric_key) def _identify_baseline_memory(self) -> None: """Identifies baseline memory using baseline codec from configuration, storing per BPP. :return: None :rtype: None :raises ValueError: If no baseline codec is configured or no log files are found """ if not self.baseline_codec: raise ValueError("Cannot identify baseline memory - no baseline codec") codec_config = self.configuration["codecs"]["configuration"].get(self.baseline_codec.lower(), {}) if not codec_config: raise ValueError(f"Warning: No configuration found for baseline codec {self.baseline_codec}") results_path = Path(os.path.expandvars(codec_config.get("results"))) json_files = list(results_path.glob("*.json")) if not json_files: raise ValueError(f"Warning: No JSON log file found in {results_path}") json_file = json_files[0] experiments = load_json_file(json_file) for experiment in experiments: results = experiment.get('results', {}) for lf_name, lf_data in results.items(): for op_type in ['encoder', 'decoder']: op_data = lf_data.get(op_type, {}) if isinstance(op_data, dict): for bpp_str, bpp_data in op_data.items(): try: bpp = float(bpp_str) log_data = bpp_data.get('log_data', []) memory_values = self._extract_memory_from_repetitions(log_data) if memory_values: key = (lf_name, op_type, bpp) self.baseline_memory[key] = np.median(memory_values) except (ValueError, TypeError): continue def _calculate_memory_overhead(self, lf_name: str, op_type: str, bpp: float, memory_values: List[float]) -> Dict[str, float]: """Calculates memory overhead percentage compared to baseline at the same BPP. :param lf_name: Light field name :type lf_name: str :param op_type: Operation type (encoder or decoder) :type op_type: str :param bpp: Bits per pixel value :type bpp: float :param memory_values: List of memory values from current codec :type memory_values: List[float] :return: Dictionary with overhead_percent, baseline_bytes, current_bytes, or empty dict :rtype: Dict[str, float] """ baseline_key = (lf_name, op_type, bpp) if baseline_key not in self.baseline_memory or not memory_values: return {} return calculate_memory_overhead(self.baseline_memory[baseline_key], memory_values) def _process_experiment_data(self, experiment_data: Dict, codec_name: str, output_dir: Path, metric_plot_config: Dict, current_metric_key: str) -> None: """Processes all data from a single experiment for a specific metric. :param experiment_data: Experiment data dictionary :type experiment_data: Dict :param codec_name: Name of the codec :type codec_name: str :param output_dir: Output directory for results :type output_dir: Path :param metric_plot_config: Configuration for metric plotting :type metric_plot_config: Dict :param current_metric_key: Key identifying the metric being processed :type current_metric_key: str :return: None :rtype: None """ results = experiment_data.get('results', {}) if not results: return codec_config = self.configuration["codecs"]["configuration"].get(codec_name, {}) rd_prefs = codec_config.get("rd_preferences", {}) title = rd_prefs.get("title", codec_name) # The metric to process is passed as current_metric_key metrics_to_process = [current_metric_key] for lf_name, lf_data in results.items(): if not isinstance(lf_data, dict): continue encoder_metrics = self._process_operation_data( lf_data.get('encoder', {}), True, metrics_to_process, # Pass only the current metric lf_name, 'encoder', codec_name ) decoder_metrics = self._process_operation_data( lf_data.get('decoder', {}), False, metrics_to_process, # Pass only the current metric lf_name, 'decoder', codec_name ) self._generate_plots( lf_name, encoder_metrics, decoder_metrics, codec_name, title, output_dir, metric_plot_config, # Pass the config for the current metric type current_metric_key # Pass the specific metric key ) def _process_operation_data(self, op_data: Dict, is_encoder: bool, metrics: List[str], lf_name: str, op_type: str, codec_name: str) -> Dict[str, Dict]: """Processes operation data and returns extracted metrics. :param op_data: Operation data dictionary :type op_data: Dict :param is_encoder: Whether this is encoder data :type is_encoder: bool :param metrics: List of metric keys to extract :type metrics: List[str] :param lf_name: Light field name :type lf_name: str :param op_type: Operation type (encoder or decoder) :type op_type: str :param codec_name: Name of the codec :type codec_name: str :return: Dictionary of metric keys to bpp-to-metrics mapping :rtype: Dict[str, Dict] """ if not isinstance(op_data, dict): return {} result = {} for bpp_str, data in op_data.items(): try: bpp = float(bpp_str) log_data = data.get('log_data', []) for metric in metrics: if metric in ['time_ns', 'max_memory_usage']: # repetitions > 1 pooled_metrics = data.get('pooled_metrics', {}) if pooled_metrics and metric in pooled_metrics: divisor = 1e9 if metric == 'time_ns' else 1e6 if metric == 'max_memory_usage' else 1 metric_dict = { k: v / divisor for k, v in pooled_metrics[metric].items() if k in ['mean', 'median', 'min', 'max', 'stddev'] } # Also store individual values from log_data when available for speedup calculations individual_values = [] for log_entry in log_data: if metric == 'time_ns' and 'time_ns' in log_entry: individual_values.append(float(log_entry['time_ns']) / divisor) elif metric == 'max_memory_usage': memory = log_entry.get('max_memory_usage', {}) if memory and memory.get('value'): value = float(memory['value']) if memory.get('unit', '').lower() == 'kbytes': value *= 1024 individual_values.append(value / divisor) if individual_values: metric_dict['individual_values'] = individual_values result.setdefault(metric, {})[bpp] = metric_dict # repetitions == 1, there is no "pooled_metrics" else: metric_values = [] for log_entry in log_data: if metric == 'time_ns' and 'time_ns' in log_entry: metric_values.append(float(log_entry['time_ns'])) elif metric == 'max_memory_usage': memory = log_entry.get('max_memory_usage', {}) if memory and memory.get('value'): value = float(memory['value']) if memory.get('unit', '').lower() == 'kbytes': value *= 1024 metric_values.append(value) if metric_values: divisor = 1e9 if metric == 'time_ns' else 1e6 if metric == 'max_memory_usage' else 1 metric_dict = { 'mean': np.mean(metric_values) / divisor, 'median': np.median(metric_values) / divisor, 'min': np.min(metric_values) / divisor, 'max': np.max(metric_values) / divisor } # Store individual values for speedup calculations metric_dict['individual_values'] = [v / divisor for v in metric_values] result.setdefault(metric, {})[bpp] = metric_dict # CALCULATE MEMORY OVERHEAD if this is not baseline and we have memory data if (metric == 'max_memory_usage' and not self._is_baseline(codec_name) and bpp in result.get('max_memory_usage', {}) and self.performance_config.get("memory", {}).get("calculate_overhead", False)): # Extract memory values from log_data memory_values = [] for log_entry in log_data: memory = log_entry.get('max_memory_usage', {}) if memory and memory.get('value'): value = float(memory['value']) if memory.get('unit', '').lower() == 'kbytes': value *= 1024 memory_values.append(value) if memory_values: # Calculate overhead compared to baseline baseline_key = (lf_name, op_type, bpp) if baseline_key in self.baseline_memory: baseline_value = self.baseline_memory[baseline_key] if baseline_value > 0: # Calculate overhead for each individual memory value overhead_values = [((val - baseline_value) / baseline_value) * 100 for val in memory_values] # Calculate statistics from individual overhead values overhead_mean = np.mean(overhead_values) overhead_median = np.median(overhead_values) overhead_stddev = np.std(overhead_values) current_median = np.median(memory_values) result.setdefault('memory_overhead', {})[bpp] = { 'median': overhead_median, 'mean': overhead_mean, 'stddev': overhead_stddev, 'baseline_bytes': baseline_value, 'current_bytes': current_median, 'individual_overheads': overhead_values # Store for aggregation } except (ValueError, AttributeError, TypeError) as e: print(f"Error processing metrics for {op_type} at bpp {bpp_str}: {e}") continue return result def _extract_memory_from_repetitions(self, repetitions: List[Dict], multiplier: int = 1024) -> List[float]: """Extracts memory values from log repetition entries. :param repetitions: List of log data dictionaries from repetitions :type repetitions: List[Dict] :param multiplier: Multiplier for unit conversion (e.g., kbytes to bytes), defaults to 1024 :type multiplier: int, optional :return: List of memory values in bytes :rtype: List[float] """ memory_values = [] for rep in repetitions: if not isinstance(rep, dict): continue memory = rep.get('max_memory_usage', {}) if memory and isinstance(memory, dict) and memory.get('value'): try: value = float(memory['value']) unit = memory.get('unit', '').lower() if unit == 'kbytes': value *= multiplier memory_values.append(value) except (ValueError, TypeError): continue return memory_values def _is_baseline(self, codec_name: str) -> bool: """Checks if the given codec is the baseline codec. :param codec_name: Name of the codec to check :type codec_name: str :return: True if codec is baseline, False otherwise :rtype: bool """ return codec_name == self.baseline_codec.lower() if self.baseline_codec else False def _generate_plots(self, lf_name: str, encoder_data: Dict, decoder_data: Dict, codec_name: str, title: str, output_dir: Path, metric_plot_config: Dict, current_metric_key: str) -> None: """Generates all plots for a lightfield. :param lf_name: Light field name :type lf_name: str :param encoder_data: Encoder metrics data :type encoder_data: Dict :param decoder_data: Decoder metrics data :type decoder_data: Dict :param codec_name: Name of the codec :type codec_name: str :param title: Plot title :type title: str :param output_dir: Output directory for plots :type output_dir: Path :param metric_plot_config: Configuration for metric plotting :type metric_plot_config: Dict :param current_metric_key: Key identifying the metric :type current_metric_key: str :return: None :rtype: None """ if current_metric_key == "max_memory_usage" and self.performance_config.get("memory", {}).get("calculate_overhead", False) and not self._is_baseline(codec_name): self._generate_memory_overhead_plot( lf_name, encoder_data.get('memory_overhead', {}), # Use memory_overhead data here decoder_data.get('memory_overhead', {}), # Use memory_overhead data here codec_name, title, output_dir, metric_plot_config ) # STORE memory_overhead into consolidated metrics_data so average plots / json export work mem_config = self.performance_config.get("memory", {}) if encoder_data.get('memory_overhead'): self._store_operation_metrics( self.metrics_data['memory_overhead']['encoder'], lf_name, codec_name, title, encoder_data['memory_overhead'], mem_config ) if decoder_data.get('memory_overhead'): self._store_operation_metrics( self.metrics_data['memory_overhead']['decoder'], lf_name, codec_name, title, decoder_data['memory_overhead'], mem_config ) # Store full metrics data with individual values for speedup calculations if encoder_data.get(current_metric_key): self.full_metrics_data[current_metric_key]['encoder'][lf_name][(codec_name, title)] = encoder_data[current_metric_key] if decoder_data.get(current_metric_key): self.full_metrics_data[current_metric_key]['decoder'][lf_name][(codec_name, title)] = decoder_data[current_metric_key] # Generate single plots for the current metric self._store_and_plot_metrics( lf_name, encoder_data.get(current_metric_key, {}), decoder_data.get(current_metric_key, {}), codec_name, title, output_dir, current_metric_key, metric_plot_config ) def _store_and_plot_metrics(self, lf_name: str, encoder_data: Dict, decoder_data: Dict, codec_name: str, title: str, output_dir: Path, metric: str, metric_plot_config: Dict) -> None: """Stores metrics and generates plots. :param lf_name: Light field name :type lf_name: str :param encoder_data: Encoder metrics data :type encoder_data: Dict :param decoder_data: Decoder metrics data :type decoder_data: Dict :param codec_name: Name of the codec :type codec_name: str :param title: Plot title :type title: str :param output_dir: Output directory for plots :type output_dir: Path :param metric: Metric key :type metric: str :param metric_plot_config: Configuration for metric plotting :type metric_plot_config: Dict :return: None :rtype: None """ if metric in ['time_ns', 'max_memory_usage', 'memory_overhead']: # Store data for combined plots if encoder_data: self._store_operation_metrics( self.metrics_data[metric]['encoder'], lf_name, codec_name, title, encoder_data, metric_plot_config ) if decoder_data: self._store_operation_metrics( self.metrics_data[metric]['decoder'], lf_name, codec_name, title, decoder_data, metric_plot_config ) # Generate individual plots if enabled for this specific metric type if metric == "time_ns": generate_individual_plot = self.performance_config.get("time", {}).get("generate_individual_plot", False) elif metric == "max_memory_usage": generate_individual_plot = self.performance_config.get("memory", {}).get("generate_individual_plot", False) if (encoder_data or decoder_data) and generate_individual_plot: self._generate_individual_plot( lf_name, encoder_data, decoder_data, codec_name, title, output_dir, metric, metric_plot_config ) def _generate_memory_overhead_plot(self, lf_name: str, encoder_overhead_data: Dict, decoder_overhead_data: Dict, codec_name: str, title: str, output_dir: Path, metric_plot_config: Dict) -> None: """Generates side-by-side memory overhead plots comparing baseline with other codecs. :param lf_name: Light field name :type lf_name: str :param encoder_overhead_data: Encoder memory overhead data :type encoder_overhead_data: Dict :param decoder_overhead_data: Decoder memory overhead data :type decoder_overhead_data: Dict :param codec_name: Name of the codec :type codec_name: str :param title: Plot title :type title: str :param output_dir: Output directory for plots :type output_dir: Path :param metric_plot_config: Configuration for metric plotting :type metric_plot_config: Dict :return: None :rtype: None """ if not encoder_overhead_data and not decoder_overhead_data: return # Set figure size to 10x8 (width x height) fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(10, 8)) fig.suptitle(f"{lf_name} - {title} (Memory Overhead %)".strip(), fontsize=metric_plot_config.get("plots", {}).get("font_size", 8)) if encoder_overhead_data: self._plot_memory_overhead_data(ax1, encoder_overhead_data, "Encoder Memory Overhead", metric_plot_config.get("plots")) if decoder_overhead_data: self._plot_memory_overhead_data(ax2, decoder_overhead_data, "Decoder Memory Overhead", metric_plot_config.get("plots")) plt.tight_layout(rect=[0, 0, 1, 0.96]) save_plot(output_dir / "memory_overhead" / "lightfields" / lf_name, codec_name, metric_plot_config.get("plots", {}).get("format", "pdf"), "memory_overhead") plt.close() def _plot_memory_overhead_data(self, ax: Any, data: Dict, title: str, metric_plot_config: Dict) -> None: """Plots memory overhead data on given axis with a single legend inside the plot. :param ax: Matplotlib axis to plot on :type ax: Any :param data: Memory overhead data by BPP :type data: Dict :param title: Axis title :type title: str :param metric_plot_config: Configuration for plot styling :type metric_plot_config: Dict :return: None :rtype: None """ bpps = sorted(data.keys()) overheads = [data[bpp]['median'] for bpp in bpps if 'median' in data[bpp]] baselines = [data[bpp].get('baseline_bytes', 0) / 1e6 for bpp in bpps] # Convert to MB currents = [data[bpp].get('current_bytes', 0) / 1e6 for bpp in bpps] # Convert to MB # Plot all lines and collect handles/labels for a single legend lines = [] labels = [] l1, = ax.plot(bpps, overheads, 'o-', color='blue', label='Overhead %', markersize=metric_plot_config['point_size'], linewidth=metric_plot_config['line_width']) lines.append(l1) labels.append('Overhead %') ax.set_title(title, fontsize=metric_plot_config['font_size']) ax.set_xlabel("Target bpp", fontsize=metric_plot_config['font_size']) ax.set_ylabel("Memory Overhead (%)", fontsize=metric_plot_config['font_size']) ax.grid(True) ax_twin = ax.twinx() l2, = ax_twin.plot(bpps, baselines, 'x--', color='green', label='Baseline (MB)', markersize=metric_plot_config['point_size'], linewidth=metric_plot_config['line_width']) l3, = ax_twin.plot(bpps, currents, 'x--', color='red', label='Current (MB)', markersize=metric_plot_config['point_size'], linewidth=metric_plot_config['line_width']) lines.extend([l2, l3]) labels.extend(['Baseline (MB)', 'Current (MB)']) ax_twin.set_ylabel('Memory (MB)', fontsize=metric_plot_config['font_size'], labelpad=15) ax_twin.yaxis.set_label_position("right") ax_twin.yaxis.set_ticks_position("right") ax_twin.figure.subplots_adjust(right=0.85) # Single legend inside the plot (bottom right by default) if metric_plot_config.get("legend", True): ax.legend(lines, labels, loc='best', fontsize='small', frameon=True) if metric_plot_config.get("xscale", None): ax.set_xscale(metric_plot_config["xscale"]) ax_twin.set_xscale(metric_plot_config["xscale"]) # Add tick formatter if log scale if metric_plot_config["xscale"] == "log": ax.xaxis.set_major_formatter(matplotlib.ticker.ScalarFormatter()) ax_twin.xaxis.set_major_formatter(matplotlib.ticker.ScalarFormatter()) if metric_plot_config.get("yscale", None): ax.set_yscale(metric_plot_config["yscale"]) # ax_twin.set_yscale(metric_plot_config["yscale"]) # Memory overhead % does not usually need log scale for twin axis def _store_operation_metrics(self, storage: Dict, lf_name: str, codec_name: str, title: str, metrics: Dict[float, Dict[str, float]], metric_plot_config: Dict) -> None: """Stores metrics for combined plots. :param storage: Storage dictionary to update :type storage: Dict :param lf_name: Light field name :type lf_name: str :param codec_name: Name of the codec :type codec_name: str :param title: Codec title for legend :type title: str :param metrics: Dictionary of BPP to metric data :type metrics: Dict[float, Dict[str, float]] :param metric_plot_config: Configuration for metric plotting :type metric_plot_config: Dict :return: None :rtype: None """ # Use averaging metric from performance config, fallback to median metric_key_for_combining = self.averaging_metric for bpp, metric_data in metrics.items(): # For memory_overhead, store individual overhead values if available # to allow proper stddev calculation across repetitions if 'individual_overheads' in metric_data: # Store each individual overhead value for overhead_val in metric_data['individual_overheads']: storage[lf_name][(codec_name, title)].append((bpp, overhead_val)) else: # Fallback to single aggregated value for other metrics storage[lf_name][(codec_name, title)].append((bpp, metric_data[metric_key_for_combining])) def _generate_individual_plot(self, lf_name: str, encoder_data: Dict, decoder_data: Dict, codec_name: str, title: str, output_dir: Path, metric: str, metric_plot_config: Dict) -> None: """Generates and saves individual performance graph. :param lf_name: Light field name :type lf_name: str :param encoder_data: Encoder metrics data :type encoder_data: Dict :param decoder_data: Decoder metrics data :type decoder_data: Dict :param codec_name: Name of the codec :type codec_name: str :param title: Plot title :type title: str :param output_dir: Output directory for plots :type output_dir: Path :param metric: Metric key :type metric: str :param metric_plot_config: Configuration for metric plotting :type metric_plot_config: Dict :return: None :rtype: None """ if metric == "memory_overhead" or (not encoder_data and not decoder_data): return plot_styles = self._get_plot_style(codec_name, metric_plot_config) fig, (ax1, ax2) = plt.subplots(1, 2, figsize=tuple(plot_styles['figure_size'])) fig.suptitle(f"{lf_name} - {title} ({get_metric_title(metric)})".strip(), fontsize=plot_styles['font_size']) self._plot_operation_metrics(ax1, encoder_data, f"Encoder ({get_metric_title(metric)})", plot_styles, metric_plot_config, metric) self._plot_operation_metrics(ax2, decoder_data, f"Decoder ({get_metric_title(metric)})", plot_styles, metric_plot_config, metric) plt.tight_layout() save_plot(output_dir / metric / "lightfields" / lf_name, codec_name, metric_plot_config.get("plots", {}).get("format", "pdf"), metric) plt.close() def _plot_operation_metrics( self, ax: Any, data: Dict, title: str, plot_styles: Dict, metric_plot_config: Dict, metric_key: str, ) -> None: """Plots metrics for a single operation with configurable styling. :param ax: Matplotlib axis to plot on :type ax: Any :param data: Metric data by BPP :type data: Dict :param title: Axis title :type title: str :param plot_styles: Dictionary of plot style settings :type plot_styles: Dict :param metric_plot_config: Configuration for metric plotting :type metric_plot_config: Dict :param metric_key: Key identifying the metric :type metric_key: str :return: None :rtype: None """ if not data: ax.set_title(f"{title.split('(')[0]}(No Data) ({title.split('(')[1]}") return bpps = sorted(data.keys()) median = [data[bpp]['median'] for bpp in bpps] ax.plot( bpps, median, marker=plot_styles['marker'], linestyle='-', color=plot_styles['color'], label='Median', markersize=plot_styles['point_size'], linewidth=plot_styles['line_width'] ) mean = [data[bpp]['mean'] for bpp in bpps] ax.plot( bpps, mean, marker=plot_styles['marker'], linestyle='--', color='green', label='Mean', markersize=plot_styles['point_size'], linewidth=plot_styles['line_width'] ) min_vals = [data[bpp]['min'] for bpp in bpps] ax.plot( bpps, min_vals, marker='x', linestyle='', color='red', label='Min', markersize=plot_styles['point_size'] ) max_vals = [data[bpp]['max'] for bpp in bpps] ax.plot( bpps, max_vals, marker='x', linestyle='', color='blue', label='Max', markersize=plot_styles['point_size'] ) ax.set_title(title, fontsize=plot_styles['font_size']) ax.set_xlabel("Target bpp", fontsize=plot_styles['font_size']) ax.set_ylabel(get_metric_ylabel(metric_key), fontsize=plot_styles['font_size']) # Use specific metric_key here if metric_plot_config.get("legend", True): ax.legend( loc='center left', bbox_to_anchor=(1.02, 0.5), title=plot_styles['legend_title'], fontsize='small', title_fontsize='small', frameon=False, borderaxespad=0.0 ) ax.grid(True) if metric_plot_config.get("xscale", None): ax.set_xscale(metric_plot_config["xscale"]) if metric_plot_config["xscale"] == "log": ax.xaxis.set_major_formatter(matplotlib.ticker.ScalarFormatter()) if metric_plot_config.get("yscale", None): ax.set_yscale(metric_plot_config["yscale"]) if metric_plot_config["yscale"] == "log": ax.yaxis.set_major_locator(matplotlib.ticker.LogLocator(base=10.0, subs="auto")) ax.yaxis.set_major_formatter(matplotlib.ticker.ScalarFormatter()) ax.ticklabel_format(style='plain', axis='y')
[docs] def create_combined_plots(self) -> None: """Creates all combined plots automatically grouped by codec type. :return: None :rtype: None """ print("Creating combined plots...\n") # Iterate over time and memory configurations for metric_type_key in ["time", "memory"]: if metric_type_key in self.performance_config: metric_config = self.performance_config[metric_type_key] metric_plot_config = metric_config.get("plots", {}) current_metric_key = "time_ns" if metric_type_key == "time" else "max_memory_usage" # The actual metric stored if current_metric_key not in self.metrics_data: continue output_dir = Path(os.path.expandvars(metric_config["results_path"])) codec_groups = defaultdict(list) for codec_name in self.performance_config["codecs"].get("analyse", []): if any(c in codec_name for c in ["jplm", "parallel-jplm"]): # Group all JPLM related codecs group = "jplm" else: group = codec_name codec_groups[group].append(codec_name) for op_type in ['encoder', 'decoder']: if op_type not in self.metrics_data[current_metric_key]: continue for group_name, codecs_in_group in codec_groups.items(): group_output_path = output_dir / "combined_codecs_graphs" / f"{group_name}_{op_type.lower()}_graphs" group_output_path.mkdir(parents=True, exist_ok=True) for lf_name, codec_data in self.metrics_data[current_metric_key][op_type].items(): filtered_data = { k: v for k, v in codec_data.items() if k[0] in codecs_in_group } if not filtered_data: continue sorted_configs = sorted( filtered_data.items(), key=lambda x: ( x[0][0].lower().startswith("jplm") and "sequential" not in x[0][0].lower(), # Put jplm first, then sequential "sequential" in x[0][0].lower(), int(re.search(r"(\d+)\s*threads", x[0][0]).group(1)) if re.search(r"(\d+)\s*threads", x[0][0]) else 0, # Sort by threads x[0][0] # Lexicographical for tie-breaking ), reverse=False # Changed to False for ascending thread count ) plt.figure(figsize=tuple(metric_plot_config.get('figure_size', [12, 8]))) for (codec_name, title), data_points in sorted_configs: if data_points: data_points.sort() bpps, values = zip(*data_points) styles = self._get_plot_style(codec_name, metric_config) plt.plot( bpps, values, marker=styles['marker'], color=styles['color'], label=title, markersize=styles['point_size'], linewidth=styles['line_width'] ) if metric_plot_config.get("xscale", None): plt.xscale(metric_plot_config["xscale"]) ax = plt.gca() # Example specific tick settings (can be made configurable) ax.get_xaxis().set_major_formatter(matplotlib.ticker.ScalarFormatter()) if metric_plot_config.get("yscale", None): plt.yscale(metric_plot_config["yscale"]) ax = plt.gca() if metric_plot_config["yscale"] == "log": ax.yaxis.set_major_locator(matplotlib.ticker.LogLocator(base=10.0, subs="auto")) ax.yaxis.set_major_formatter(matplotlib.ticker.ScalarFormatter()) ax.ticklabel_format(style='plain', axis='y') plt.title(f"{op_type.capitalize()} - {get_metric_title(current_metric_key)} - {lf_name}", fontsize=metric_plot_config['font_size']) plt.xlabel("Target bpp", fontsize=metric_plot_config['font_size']) plt.ylabel(get_metric_ylabel(current_metric_key), fontsize=metric_plot_config['font_size']) if metric_plot_config.get("legend", True): plt.legend( loc='center left', bbox_to_anchor=(1.02, 0.5), title="Codecs", # More generic title, can be 'Threads' for parallel-jplm specific fontsize='small', title_fontsize='small', frameon=False, borderaxespad=0.0 ) plt.grid(True, axis='both', linestyle=':', alpha=0.7) plt.tight_layout() plt.axhline(0, color='black', linewidth=0.5) output_file = group_output_path / f"{lf_name}_combined_{group_name}_{op_type}.{metric_plot_config['format']}" plt.savefig(output_file, format=metric_plot_config["format"], bbox_inches='tight') plt.close() # After all time and memory combined plots, create average plots for time and memory if "time" in self.performance_config: self.create_average_time_plots() if "memory" in self.performance_config: self.create_average_memory_plots() # Create average memory overhead plot if enabled if self.performance_config.get("memory", {}).get("calculate_overhead", True): self.create_average_memory_overhead_plot()
def _get_plot_style(self, codec_name: Optional[str] = None, metric_plot_config: Optional[Dict] = None) -> Dict: """Gets complete plot style configuration for a codec. :param codec_name: Name of the codec, defaults to None :type codec_name: Optional[str], optional :param metric_plot_config: Configuration for metric plotting, defaults to None :type metric_plot_config: Optional[Dict], optional :return: Dictionary of plot style settings :rtype: Dict """ codec_config = self.configuration["codecs"]["configuration"].get(codec_name, {}) rd_prefs = codec_config.get("rd_preferences", {}) return { 'marker': rd_prefs.get("marker", "o"), 'label': rd_prefs.get("title", codec_name), 'font_size': metric_plot_config["plots"].get("font_size", 14), 'color': rd_prefs.get("color", "black"), 'point_size': metric_plot_config["plots"].get("point_size", 8), 'line_width': metric_plot_config["plots"].get("line_width", 2), 'legend': metric_plot_config["plots"].get("legend", True), 'legend_title': metric_plot_config["plots"].get("legend_title", "Codecs"), 'figure_size': metric_plot_config["plots"].get("figure_size", [12, 8]), 'baseline_style': { 'color': 'gray', 'linestyle': '--', 'linewidth': metric_plot_config["plots"].get('line_width', 2) if metric_plot_config["plots"] else 2 } }
[docs] def create_average_time_plots(self) -> None: """Creates separate plots of average time with standard deviation for encoder and decoder. :return: None :rtype: None """ print("\nCreating average time plots with standard deviation...") time_config = self.performance_config.get("time", {}) output_dir = Path(os.path.expandvars(time_config.get("results_path"))) / "average_plots" output_dir.mkdir(parents=True, exist_ok=True) metric = "time_ns" if metric not in self.metrics_data: print(" No time data available. Skipping average plots.") return # Get available lightfields from processed data available_lfs = set() for op_type in ["encoder", "decoder"]: if op_type in self.metrics_data[metric]: available_lfs.update(self.metrics_data[metric][op_type].keys()) # Process each averaging group for group_config in self.averaging_groups: group_name = group_config.get("name", "unknown") group_lfs = group_config.get("lightfields", []) # Filter to only include lightfields that exist in the data valid_lfs = [lf for lf in group_lfs if lf in available_lfs] if not valid_lfs: print(f" Skipping group '{group_name}': no valid lightfields found in data") continue # Aggregate time data for this group by codec and BPP encoder_time_data = defaultdict(lambda: defaultdict(list)) decoder_time_data = defaultdict(lambda: defaultdict(list)) if "encoder" in self.metrics_data[metric]: for lf_name in valid_lfs: if lf_name in self.metrics_data[metric]["encoder"]: codec_data = self.metrics_data[metric]["encoder"][lf_name] for (codec_name, title), data_points in codec_data.items(): for bpp, value in data_points: encoder_time_data[codec_name][bpp].append(value) if "decoder" in self.metrics_data[metric]: for lf_name in valid_lfs: if lf_name in self.metrics_data[metric]["decoder"]: codec_data = self.metrics_data[metric]["decoder"][lf_name] for (codec_name, title), data_points in codec_data.items(): for bpp, value in data_points: decoder_time_data[codec_name][bpp].append(value) # Generate plots for this group if encoder_time_data: self._generate_average_metric_plot( encoder_time_data, "Encoder", output_dir, time_config, group_name, metric ) if decoder_time_data: self._generate_average_metric_plot( decoder_time_data, "Decoder", output_dir, time_config, group_name, metric )
[docs] def create_average_memory_plots(self) -> None: """Creates separate plots of average memory usage with standard deviation for encoder and decoder. :return: None :rtype: None """ print("\nCreating average memory plots with standard deviation...") memory_config = self.performance_config.get("memory", {}) output_dir = Path(os.path.expandvars(memory_config.get("results_path"))) / "average_plots" output_dir.mkdir(parents=True, exist_ok=True) metric = "max_memory_usage" if metric not in self.metrics_data: print(" No memory data available. Skipping average plots.") return # Get available lightfields from processed data available_lfs = set() for op_type in ["encoder", "decoder"]: if op_type in self.metrics_data[metric]: available_lfs.update(self.metrics_data[metric][op_type].keys()) # Process each averaging group for group_config in self.averaging_groups: group_name = group_config.get("name", "unknown") group_lfs = group_config.get("lightfields", []) # Filter to only include lightfields that exist in the data valid_lfs = [lf for lf in group_lfs if lf in available_lfs] if not valid_lfs: print(f" Skipping group '{group_name}': no valid lightfields found in data") continue # Aggregate memory data for this group by codec and BPP encoder_memory_data = defaultdict(lambda: defaultdict(list)) decoder_memory_data = defaultdict(lambda: defaultdict(list)) if "encoder" in self.metrics_data[metric]: for lf_name in valid_lfs: if lf_name in self.metrics_data[metric]["encoder"]: codec_data = self.metrics_data[metric]["encoder"][lf_name] for (codec_name, title), data_points in codec_data.items(): for bpp, value in data_points: encoder_memory_data[codec_name][bpp].append(value) if "decoder" in self.metrics_data[metric]: for lf_name in valid_lfs: if lf_name in self.metrics_data[metric]["decoder"]: codec_data = self.metrics_data[metric]["decoder"][lf_name] for (codec_name, title), data_points in codec_data.items(): for bpp, value in data_points: decoder_memory_data[codec_name][bpp].append(value) # Generate plots for this group if encoder_memory_data: self._generate_average_metric_plot( encoder_memory_data, "Encoder", output_dir, memory_config, group_name, metric ) if decoder_memory_data: self._generate_average_metric_plot( decoder_memory_data, "Decoder", output_dir, memory_config, group_name, metric )
def _generate_average_metric_plot(self, metric_data: Dict, op_type: str, output_dir: Path, metric_config: Dict, group_name: str, metric_key: str) -> None: """Generates a single average plot for either encoder or decoder for a given group and metric. :param metric_data: Aggregated metric data by codec and BPP :type metric_data: Dict :param op_type: Operation type (Encoder or Decoder) :type op_type: str :param output_dir: Output directory for plots :type output_dir: Path :param metric_config: Configuration for the metric :type metric_config: Dict :param group_name: Name of the lightfield grouping :type group_name: str :param metric_key: Key identifying the metric :type metric_key: str :return: None :rtype: None """ if not metric_data: print(f" No {op_type.lower()} {metric_key} data available for plotting (group: {group_name})") return codec_stats = {} for codec_name, bpp_data in metric_data.items(): codec_stats[codec_name] = {} for bpp, values in bpp_data.items(): codec_stats[codec_name][bpp] = { 'mean': np.mean(values), 'stddev': np.std(values) } metric_plot_config = metric_config.get("plots", {}) plt.figure(figsize=tuple(metric_plot_config.get("figure_size", [12, 8]))) for codec_name, stats in codec_stats.items(): plot_styles = self._get_plot_style(codec_name, metric_config) sorted_bpps = sorted(stats.keys()) means = [stats[bpp]['mean'] for bpp in sorted_bpps] stddevs = [stats[bpp]['stddev'] for bpp in sorted_bpps] plt.plot( sorted_bpps, means, marker=plot_styles['marker'], color=plot_styles['color'], label=plot_styles['label'], markersize=plot_styles['point_size'], linewidth=plot_styles['line_width'] ) plt.fill_between( sorted_bpps, np.array(means) - np.array(stddevs), np.array(means) + np.array(stddevs), color=plot_styles['color'], alpha=0.3 ) plt.title(f"Average {get_metric_title(metric_key)} with std deviation - {op_type} ({group_name})", fontsize=metric_plot_config["font_size"]) plt.xlabel("Target bpp", fontsize=metric_plot_config["font_size"]) plt.ylabel(get_metric_ylabel(metric_key), fontsize=metric_plot_config["font_size"]) plt.grid(True, axis='both', linestyle=':', alpha=0.7) if metric_plot_config.get("legend", True): handles, labels = plt.gca().get_legend_handles_labels() plt.legend( handles[::-1], labels[::-1], loc='center left', bbox_to_anchor=(1.02, 0.5), title="Codecs", fontsize='small', title_fontsize='small', frameon=False, borderaxespad=0.0 ) plt.tight_layout() if metric_plot_config.get("xscale", None): plt.xscale(metric_plot_config["xscale"]) ax = plt.gca() if metric_plot_config["xscale"] == "log": ax.get_xaxis().set_major_formatter(matplotlib.ticker.ScalarFormatter()) if metric_plot_config.get("yscale", None): plt.yscale(metric_plot_config["yscale"]) ax = plt.gca() if metric_plot_config["yscale"] == "log": ax.yaxis.set_major_locator(matplotlib.ticker.LogLocator(base=10.0, subs="auto")) ax.yaxis.set_major_formatter(matplotlib.ticker.ScalarFormatter()) ax.ticklabel_format(style='plain', axis='y') output_file = output_dir / f"average_{metric_key}_with_stddev_{op_type.lower()}_{group_name}.{metric_plot_config['format']}" plt.savefig(output_file, format=metric_plot_config["format"], bbox_inches='tight') plt.close() print(f" Saved {op_type.lower()} average {metric_key} plot: {output_file}") def _save_consolidated_overhead_json(self, output_dir: Path) -> None: """Saves a consolidated JSON file with overhead values for all codecs and lightfields. :param output_dir: Output directory for the JSON file :type output_dir: Path :return: None :rtype: None """ # This will be generated under the memory output directory memory_output_dir = Path(os.path.expandvars(self.performance_config.get("memory", {}).get("results_path", "./temp_memory_results"))) memory_output_dir.mkdir(parents=True, exist_ok=True) consolidated_data = { "baseline": self.baseline_codec or "None", "codecs": {} } metric = "memory_overhead" if metric not in self.metrics_data: return # No memory overhead data to save for op_type in ["encoder", "decoder"]: if op_type not in self.metrics_data[metric]: continue for lf_name, codec_data in self.metrics_data[metric][op_type].items(): for (codec_name, title), data_points in codec_data.items(): if codec_name not in consolidated_data["codecs"]: consolidated_data["codecs"][codec_name] = {"encoder": {}, "decoder": {}} overhead_values = {} for bpp, overhead_value in data_points: overhead_values[str(bpp)] = { # Ensure BPP is stored as string key in JSON "memory": { "overhead (%)": float(overhead_value), } } consolidated_data["codecs"][codec_name][op_type][lf_name] = overhead_values json_file = memory_output_dir / "memory_overhead" / "memory_overhead_data.json" save_json_file(json_file, consolidated_data, "Saved consolidated overhead JSON")
[docs] def create_average_memory_overhead_plot(self) -> None: """Creates separate plots of average memory overhead with standard deviation for encoder and decoder. :return: None :rtype: None """ print("\nCreating average memory overhead plots with standard deviation...") memory_config = self.performance_config.get("memory", {}) output_dir = Path(os.path.expandvars(memory_config.get("results_path"))) / "average_plots" output_dir.mkdir(parents=True, exist_ok=True) metric = "memory_overhead" if metric not in self.metrics_data: print(" No memory overhead data available. Skipping average plots.") return # Get available lightfields from processed data available_lfs = set() for op_type in ["encoder", "decoder"]: if op_type in self.metrics_data[metric]: available_lfs.update(self.metrics_data[metric][op_type].keys()) # Process each averaging group for group_config in self.averaging_groups: group_name = group_config.get("name", "unknown") group_lfs = group_config.get("lightfields", []) # Filter to only include lightfields that exist in the data valid_lfs = [lf for lf in group_lfs if lf in available_lfs] if not valid_lfs: print(f" Skipping group '{group_name}': no valid lightfields found in data") continue # Aggregate overhead data for this group by codec and BPP encoder_overhead_data = defaultdict(lambda: defaultdict(list)) decoder_overhead_data = defaultdict(lambda: defaultdict(list)) if "encoder" in self.metrics_data[metric]: for lf_name in valid_lfs: if lf_name in self.metrics_data[metric]["encoder"]: codec_data = self.metrics_data[metric]["encoder"][lf_name] for (codec_name, title), data_points in codec_data.items(): for bpp, overhead in data_points: encoder_overhead_data[codec_name][bpp].append(overhead) if "decoder" in self.metrics_data[metric]: for lf_name in valid_lfs: if lf_name in self.metrics_data[metric]["decoder"]: codec_data = self.metrics_data[metric]["decoder"][lf_name] for (codec_name, title), data_points in codec_data.items(): for bpp, overhead in data_points: decoder_overhead_data[codec_name][bpp].append(overhead) # Generate plots for this group if encoder_overhead_data: self._generate_overhead_average_plot( encoder_overhead_data, "Encoder", output_dir, memory_config, group_name ) if decoder_overhead_data: self._generate_overhead_average_plot( decoder_overhead_data, "Decoder", output_dir, memory_config, group_name )
def _generate_overhead_average_plot(self, overhead_data: Dict, op_type: str, output_dir: Path, metric_config: Dict, category: str) -> None: """Generates a single average overhead plot for either encoder or decoder for a given category. :param overhead_data: Aggregated overhead data by codec and BPP :type overhead_data: Dict :param op_type: Operation type (Encoder or Decoder) :type op_type: str :param output_dir: Output directory for plots :type output_dir: Path :param metric_config: Configuration for the metric :type metric_config: Dict :param category: Name of the lightfield category :type category: str :return: None :rtype: None """ if not overhead_data: print(f" No {op_type.lower()} overhead data available for plotting (category: {category})") return codec_stats = {} for codec_name, bpp_data in overhead_data.items(): codec_stats[codec_name] = {} for bpp, values in bpp_data.items(): codec_stats[codec_name][bpp] = { 'mean': np.mean(values), 'stddev': np.std(values) } metric_plot_config = metric_config.get("plots", {}) plt.figure(figsize=tuple(metric_plot_config.get("figure_size", [12, 8]))) for codec_name, stats in codec_stats.items(): plot_styles = self._get_plot_style(codec_name, metric_config) sorted_bpps = sorted(stats.keys()) means = [stats[bpp]['mean'] for bpp in sorted_bpps] stddevs = [stats[bpp]['stddev'] for bpp in sorted_bpps] plt.plot( sorted_bpps, means, marker=plot_styles['marker'], color=plot_styles['color'], label=self.configuration["codecs"]["configuration"][codec_name]["rd_preferences"]["title"], markersize=plot_styles['point_size'], linewidth=plot_styles['line_width'] ) plt.fill_between( sorted_bpps, np.array(means) - np.array(stddevs), np.array(means) + np.array(stddevs), color=plot_styles['color'], alpha=0.3 ) plt.title(f"Average memory overhead (%) with std deviation - {op_type} ({category})", fontsize=metric_plot_config["font_size"]) plt.xlabel("Target bpp", fontsize=metric_plot_config["font_size"]) plt.ylabel("Memory Overhead (%)", fontsize=metric_plot_config["font_size"]) plt.grid(True, axis='both', linestyle=':', alpha=0.7) if metric_plot_config.get("legend", True): handles, labels = plt.gca().get_legend_handles_labels() # Reverse order of handles and labels to match the plot order if desired plt.legend( handles[::-1], labels[::-1], loc='center left', bbox_to_anchor=(1.02, 0.5), title="Codecs", fontsize='small', title_fontsize='small', frameon=False, borderaxespad=0.0 ) plt.tight_layout() if metric_plot_config.get("xscale", None): plt.xscale(metric_plot_config["xscale"]) ax = plt.gca() ax.set_xticks([0.005, 0.02, 0.1, 0.75]) ax.get_xaxis().set_major_formatter(matplotlib.ticker.ScalarFormatter()) safe_category = re.sub(r'[^A-Za-z0-9_-]+', '_', str(category)) output_file = output_dir / f"average_memory_overhead_with_stddev_{op_type.lower()}_{safe_category}.{metric_plot_config['format']}" plt.savefig(output_file, format=metric_plot_config["format"], bbox_inches='tight') plt.close() print(f" Saved {op_type.lower()} average memory overhead plot: {output_file}")
[docs] def generate_time_difference_table(self) -> None: """Generates a table comparing the median execution time difference (encoder - decoder) for all codecs and lightfields. :return: None :rtype: None """ print("Generating execution time difference table...") # Get the time configuration for output paths and formats time_config = self.performance_config.get("time", {}) output_dir = Path(os.path.expandvars(time_config.get("results_path", "./temp_time_results"))) / "tables" output_dir.mkdir(parents=True, exist_ok=True) table = PrettyTable() table.field_names = [ "Lightfield", "BPP", "Codec", "Encoder Median (s)", "Decoder Median (s)", "Encoder/Decoder Ratio", "Time Difference (Encoder - Decoder) (s)", "Relative Time Difference ([Encoder-Decoder]/Encoder) (%)" ] encoder_data = self.metrics_data.get("time_ns", {}).get("encoder", {}) decoder_data = self.metrics_data.get("time_ns", {}).get("decoder", {}) for lf_name in encoder_data: for (codec_name, title), enc_points in encoder_data[lf_name].items(): enc_dict = dict(enc_points) dec_points = decoder_data.get(lf_name, {}).get((codec_name, title), []) dec_dict = dict(dec_points) for bpp in enc_dict: encoder_median = enc_dict.get(bpp, 0) decoder_median = dec_dict.get(bpp, 0) time_difference = encoder_median - decoder_median relative_time_difference = (time_difference / encoder_median * 100) if encoder_median > 0 else 0 decoder_encoder_ratio = (encoder_median/decoder_median) if decoder_median > 0 else 0 table.add_row([ lf_name, f"{bpp:.3f}", codec_name, # Format BPP to 3 decimal places f"{encoder_median:.6f}", f"{decoder_median:.6f}", # Format time for readability f"{decoder_encoder_ratio:.3f}", f"{time_difference:.6f}", f"{relative_time_difference:.2f}" ]) tables_config = time_config.get("tables", {}) formats = tables_config.get("formats", ["text", "html"]) if tables_config.get("verbose", True): print('\n' + table.get_string()) for fmt in formats: filename = output_dir / f"execution_time_difference.{fmt}" if fmt == "text": with open(filename, "w") as f: f.write(table.get_string()) elif fmt == "html": with open(filename, "w") as f: f.write(table.get_html_string()) elif fmt == "csv": with open(filename, "w") as f: f.write(table.get_csv_string()) elif fmt == "latex": with open(filename, "w") as f: content = table.get_latex_string().replace( r"\begin{tabular}", f"\\begin{{tabular}}\n\\caption{{Execution Time Difference - {table.title}}}", 1 ) f.write(content) elif fmt == "mediawiki": with open(filename, "w") as f: f.write(table.get_mediawiki_string()) print(f"Saved execution time difference table ({fmt.upper()}): {filename}")
[docs] def main() -> None: base_path = Path(os.path.abspath(os.path.dirname(sys.argv[0]))) configuration = read_config_from_argv(overriden_base_path=base_path / "..") # The 'performance' key holds a list of configurations performance_configurations = configuration["performance"] for perf_config in performance_configurations: # Each item in the list is a complete performance configuration print(f"Processing performance configuration block.") visualizer = PerformanceVisualizer(configuration=configuration, performance_config=perf_config) # Process logs for both time and memory metrics within this visualizer instance visualizer.process_all_logs() # Create combined plots for time and memory (if enabled in respective configs) visualizer.create_combined_plots() # Generate time difference table if perf_config.get("time", {}).get("tables", {}).get("generate_time_difference_table", False): visualizer.generate_time_difference_table()
if __name__ == "__main__": main()