Source code for src.performance.speedup_performance

"""
Author: Leonardo de Sousa Marques
Affiliation: Embedded Computing Lab (ECL), Federal University of Santa Catarina (UFSC)

Description:
    This module represents the main SpeedupVisualizer class, with functions to save results and plot speedup graphs.
"""

import os
import re
import sys
from collections import defaultdict
from pathlib import Path
from typing import Any, Dict, List, Optional

import matplotlib
import matplotlib.pyplot as plt
import numpy as np
from lfc_toolkit.src.performance.auxiliary.speedup_utils import (create_codec_comparison_matrices,
                                                                save_consolidated_speedup_json)
from lfc_toolkit.src.performance.measure_time_and_memory import PerformanceVisualizer

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from lfc_toolkit.src.configuration.configuration_reader import (ConfigurationReader,
                                                                read_config_from_argv)


[docs] class SpeedupVisualizer(PerformanceVisualizer): def __init__(self, configuration: ConfigurationReader, performance_config: Dict) -> None: # The super().__init__ (PerformanceVisualizer) now expects performance_config directly super().__init__(configuration, performance_config) # Access the 'speedup' specific configuration within the performance_config self.speedup_config = performance_config.get("speedup", {}) # Ensure 'pooled_metrics' is correctly handled for speedup calculation # Speedup is calculated based on 'time_ns' data, which should be processed by the superclass self.pooled_metrics_for_speedup = ["time_ns"] self.speedup_data = defaultdict(lambda: defaultdict(dict)) self.encoder_speedup_data = defaultdict(lambda: defaultdict(list)) self.decoder_speedup_data = defaultdict(lambda: defaultdict(list))
[docs] def calculate_speedups(self) -> None: """Calculates speedups for all codecs relative to baseline, BPP by BPP. Assumes self.metrics_data (populated by PerformanceVisualizer) contains 'time_ns'. Calculates individual speedups when individual values are available for proper stddev calculation. :return: None :rtype: None """ print("\nCalculating speedups...") if not self.baseline_codec: print("Warning: No baseline codec defined for speedup calculation. Cannot calculate speedups.") return metric = "time_ns" if metric not in self.metrics_data: print(f"Warning: No '{metric}' data found in metrics_data. Cannot calculate speedups.") return # Use full_metrics_data if available (contains individual values), otherwise fall back to metrics_data use_full_data = hasattr(self, 'full_metrics_data') and metric in self.full_metrics_data for op_type in ['encoder', 'decoder']: # Get data source if use_full_data: codec_data_source = self.full_metrics_data[metric][op_type] else: # Fall back to aggregated data codec_data_source = {} for lf_name, codec_data_for_lf in self.metrics_data[metric][op_type].items(): codec_data_source[lf_name] = codec_data_for_lf for lf_name, codec_data_for_lf in (codec_data_source.items() if use_full_data else self.metrics_data[metric][op_type].items()): # Find baseline data for the current lightfield and operation type baseline_key_found = None baseline_data = {} # Store both medians and individual values for (codec_name_in_data, title), data_dict in codec_data_for_lf.items(): if codec_name_in_data == self.baseline_codec: baseline_key_found = (codec_name_in_data, title) if use_full_data: # Store full data dict with individual values baseline_data = data_dict else: # Store only medians from aggregated data baseline_data = {bpp: {'median': median} for bpp, median in data_dict} break if not baseline_key_found: print(f"No baseline data found for {lf_name} {op_type} with codec '{self.baseline_codec}'. Skipping.") continue # Calculate speedups for all other codecs against the found baseline for (codec_name, title), data_dict in codec_data_for_lf.items(): if codec_name == self.baseline_codec: continue # Get current codec data if use_full_data: current_data = data_dict # Dict of {bpp: {median, individual_values, ...}} else: # Convert from list of tuples to dict current_data = {bpp: {'median': median} for bpp, median in data_dict} for bpp, current_bpp_data in current_data.items(): if bpp not in baseline_data: continue baseline_bpp_data = baseline_data[bpp] baseline_median = baseline_bpp_data.get('median', 0) current_median = current_bpp_data.get('median', 0) if baseline_median > 1e-9 and current_median > 1e-9: # Calculate speedups: use individual values if available, otherwise use median baseline_individual = baseline_bpp_data.get('individual_values', []) current_individual = current_bpp_data.get('individual_values', []) if baseline_individual and current_individual and len(baseline_individual) == len(current_individual): # Calculate individual speedups individual_speedups = [bl / cl for bl, cl in zip(baseline_individual, current_individual) if cl > 1e-9] if individual_speedups: # Store all individual speedups for stddev calculation key = f"{lf_name}_{op_type}" if key not in self.speedup_data: self.speedup_data[key] = defaultdict(dict) if (codec_name, title) not in self.speedup_data[key]: self.speedup_data[key][(codec_name, title)] = {} # Store individual speedups self.speedup_data[key][(codec_name, title)][bpp] = individual_speedups else: # Fall back to single speedup from medians speedup = baseline_median / current_median key = f"{lf_name}_{op_type}" self.speedup_data[key][(codec_name, title)][bpp] = speedup # Organize speedups by operation type for average calculations for lf_op_type, codec_speedups in self.speedup_data.items(): _, op_type = lf_op_type.rsplit('_', 1) for (codec_name, _), bpp_speedups in codec_speedups.items(): if codec_name == self.baseline_codec: continue for bpp, speedup_value in bpp_speedups.items(): # Handle both single speedup and list of individual speedups if isinstance(speedup_value, list): # Append all individual speedups for speedup in speedup_value: if op_type == "encoder": self.encoder_speedup_data[codec_name][bpp].append(speedup) else: self.decoder_speedup_data[codec_name][bpp].append(speedup) else: # Single speedup value if op_type == "encoder": self.encoder_speedup_data[codec_name][bpp].append(speedup_value) else: self.decoder_speedup_data[codec_name][bpp].append(speedup_value) print("Speedup calculation complete.")
[docs] def create_speedup_plots(self) -> None: """Creates and saves all speedup plots for lightfields and codecs. :return: None :rtype: None """ print("\nCreating speedup plots...") if not self.speedup_data: print("Warning: No speedup data calculated. Skipping speedup plots.") return if self.speedup_config.get("generate_boxplots", True): self.create_speedup_boxplots() # Use the specific speedup_config for output paths and settings output_dir_base = Path(os.path.expandvars(self.speedup_config.get("results_path", "./speedup_results"))) output_dir_base.mkdir(parents=True, exist_ok=True) # Save consolidated speedup JSON save_consolidated_speedup_json(baseline_codec=self.baseline_codec, output_dir=output_dir_base, speedup_data=self.speedup_data) # Get plot styles (using _get_plot_style from PerformanceVisualizer, passing speedup_config) # Create plots for each lightfield and operation type for lf_op_type, codec_speedups in self.speedup_data.items(): lf_name, op_type = lf_op_type.rsplit('_', 1) # Group codecs by family (jplm, parallel-jplm, etc) codec_groups = defaultdict(list) for (codec_name, title) in codec_speedups.keys(): if codec_name == self.baseline_codec: continue if "jplm" in codec_name: # Covers both 'jplm' and 'parallel-jplm' group = "jplm" else: group = codec_name.split('-')[0] codec_groups[group].append((codec_name, title)) # Create a plot for each codec group for group_name, codecs in codec_groups.items(): if not codecs: continue # Skip if no codecs in group after filtering baseline plt.figure(figsize=tuple(self.speedup_config.get("plots", {}).get('figure_size', [5, 3]))) # Sort codecs by thread count (extracted from title), ensuring non-thread codecs are handled sorted_codecs = sorted( codecs, key=lambda x: int(re.search(r'(\d+)\s*threads', x[1]).group(1)) # Updated regex if re.search(r'(\d+)\s*threads', x[1]) else 0, # Default to 0 if no thread count reverse=False # Usually ascending thread count is preferred ) for codec_name, title in sorted_codecs: bpp_data = codec_speedups[(codec_name, title)] if not bpp_data: continue bpps = sorted(bpp_data.keys()) # Handle both single speedup values and lists of individual speedups speedups = [] for bpp in bpps: speedup_value = bpp_data[bpp] if isinstance(speedup_value, list): # Use mean of individual speedups for plotting speedups.append(np.mean(speedup_value) if speedup_value else 0) else: speedups.append(speedup_value) # Get plot style from configuration for this specific codec plot_styles = self._get_plot_style(codec_name, self.speedup_config) plt.plot( bpps, speedups, marker=plot_styles['marker'], color=plot_styles['color'], label=title, markersize=plot_styles['point_size'], linewidth=plot_styles['line_width'] ) plt.title(f"Speedup Comparison - {lf_name} {op_type.capitalize()} ({group_name})", fontsize=self.speedup_config.get('font_size', 14)) plt.xlabel("Target bpp", fontsize=self.speedup_config.get('font_size', 14)) plt.ylabel("Speedup (w.r.t Baseline)", fontsize=self.speedup_config.get('font_size', 14)) # Add baseline line with configured style if self.speedup_config.get("plots", {}).get("show_baseline", True): # Use a generic style if codec-specific baseline style is not appropriate for global baseline baseline_line_style = self._get_plot_style(self.baseline_codec, self.speedup_config)['baseline_style'] plt.axhline( y=1, color=baseline_line_style.get('color', 'gray'), linestyle=baseline_line_style.get('linestyle', '--'), linewidth=baseline_line_style.get('linewidth', 2), label="Baseline" ) if self.speedup_config.get("legend", True): handles, labels = plt.gca().get_legend_handles_labels() plt.legend( handles[::-1], labels[::-1], # Reverse to have higher threads at top loc='center left', bbox_to_anchor=(1.02, 0.5), title=self.speedup_config.get('legend_title', "Codecs"), fontsize='small', title_fontsize='small', frameon=False, borderaxespad=0.0 ) plt.grid(True, axis='both', linestyle=':', alpha=0.7) # Apply scales if configured xscale = self.speedup_config.get("xscale", None) if xscale: plt.xscale(xscale) ax = plt.gca() if xscale == "log": # Example specific tick settings, can be made configurable # ax.set_xticks([0.005, 0.02, 0.1, 0.75]) ax.get_xaxis().set_major_formatter(matplotlib.ticker.ScalarFormatter()) # Save plot speedup_plot_format = self.speedup_config.get("plots", {}).get('format', 'pdf') group_dir = output_dir_base / op_type.lower() / group_name group_dir.mkdir(parents=True, exist_ok=True) plot_file = group_dir / f"speedup_{lf_name}_{op_type.lower()}.{speedup_plot_format}" plt.savefig(plot_file, bbox_inches='tight', format=speedup_plot_format) plt.close() print(f"Saved speedup plot: {plot_file}")
[docs] def create_average_speedup_plots(self) -> None: """Creates separate plots of average speedups with standard deviation for encoder and decoder across all codecs and lightfields. :return: None :rtype: None """ print("\nCreating average speedup plots with standard deviation...") if not self.speedup_data: print(" No speedup data available. Skipping average speedup plots.") return # Setup output directory output_dir_base = Path(os.path.expandvars(self.speedup_config.get("results_path", "./speedup_results"))) output_dir = output_dir_base / "average_plots" output_dir.mkdir(parents=True, exist_ok=True) # Generate plots for encoder and decoder self._generate_average_speedup_plot( op_type="Encoder", output_dir=output_dir ) self._generate_average_speedup_plot( op_type="Decoder", output_dir=output_dir )
def _generate_average_speedup_plot(self, op_type: str, output_dir: Path) -> None: """Generates average speedup plot for either encoder or decoder. :param op_type: Operation type (Encoder or Decoder) :type op_type: str :param output_dir: Output directory for plots :type output_dir: Path :return: None :rtype: None """ op_key = op_type.lower() # Get available lightfields from speedup data available_lfs = set() for lf_op_type in self.speedup_data.keys(): lf_name, current_op_type = lf_op_type.rsplit('_', 1) if current_op_type == op_key: available_lfs.add(lf_name) speedup_plots = self.speedup_config.get("plots", {}) # Process each averaging group for group_config in self.averaging_groups: group_name = group_config.get("name", "unknown") group_lfs = group_config.get("lightfields", []) # Filter to only include lightfields that exist in the data valid_lfs = [lf for lf in group_lfs if lf in available_lfs] if not valid_lfs: print(f" Skipping group '{group_name}': no valid lightfields found in data") continue # Aggregate speedups for this group by codec and BPP speedup_data_for_op = defaultdict(lambda: defaultdict(list)) for lf_op_type, codec_speedups in self.speedup_data.items(): lf_name, current_op_type = lf_op_type.rsplit('_', 1) if current_op_type != op_key or lf_name not in valid_lfs: continue for (codec_name, title), bpp_speedups in codec_speedups.items(): if codec_name == self.baseline_codec: continue for bpp, speedup_value in bpp_speedups.items(): # Normalize to list of values if isinstance(speedup_value, list): values = speedup_value else: values = [speedup_value] speedup_data_for_op[codec_name][bpp].extend(values) if not speedup_data_for_op: print(f" No {op_key} speedup data available for plotting average speedups (group: {group_name}).") continue # Calculate average and stddev for each codec at each bpp codec_stats = {} for codec_name, bpp_data in speedup_data_for_op.items(): codec_stats[codec_name] = {} for bpp, values in bpp_data.items(): codec_stats[codec_name][bpp] = { 'mean': np.mean(values), 'stddev': np.std(values) } # Prepare plot plt.figure(figsize=tuple(speedup_plots.get('figure_size', [12, 8]))) # Plot each codec with stddev cloud for codec_name, stats in codec_stats.items(): # Get plot style for this codec plot_styles = self._get_plot_style(codec_name, self.speedup_config) # Sort by bpp sorted_bpps = sorted(stats.keys()) means = [stats[bpp]['mean'] for bpp in sorted_bpps] stddevs = [stats[bpp]['stddev'] for bpp in sorted_bpps] # Plot main line plt.plot( sorted_bpps, means, marker=plot_styles['marker'], color=plot_styles['color'], label=plot_styles['label'], markersize=plot_styles['point_size'], linewidth=plot_styles['line_width'] ) # Plot stddev cloud plt.fill_between( sorted_bpps, np.array(means) - np.array(stddevs), np.array(means) + np.array(stddevs), color=plot_styles['color'], alpha=0.2 ) # Add baseline line if speedup_plots.get("show_baseline", True): plt.axhline( y=1, color='gray', # Use a fixed gray for average baseline linestyle='--', linewidth=self.speedup_config.get('line_width', 2), label="Baseline" ) # Configure plot plt.title( f"Average speedups and std deviation - {op_type} ({group_name})", fontsize=speedup_plots.get('font_size', 14) ) plt.xlabel("Target bpp", fontsize=speedup_plots.get('font_size', 14)) plt.ylabel("Speedup (w.r.t Baseline)", fontsize=speedup_plots.get('font_size', 14)) plt.grid(True, axis='both', linestyle=':', alpha=0.7) if speedup_plots.get("legend", True): handles, labels = plt.gca().get_legend_handles_labels() plt.legend( handles[::-1], labels[::-1], loc='center left', bbox_to_anchor=(1.02, 0.5), title=self.speedup_config.get('legend_title', "Codecs"), fontsize='small', title_fontsize='small', frameon=False, borderaxespad=0.0 ) plt.tight_layout() if speedup_plots.get("xscale", None): plt.xscale(speedup_plots["xscale"]) ax = plt.gca() # ax.set_xticks([0.005, 0.02, 0.1, 0.75]) # Example, can be made configurable ax.get_xaxis().set_major_formatter(matplotlib.ticker.ScalarFormatter()) # Save plot output_file = output_dir / f"average_speedup_with_stddev_{op_key}_{group_name}.{speedup_plots['format']}" plt.savefig(output_file, format=speedup_plots["format"], bbox_inches='tight') plt.close() print(f" Saved {op_key} average speedup plot for group {group_name}: {output_file}")
[docs] def create_speedup_boxplots(self) -> None: """Creates speedup distribution boxplots for encoder and decoder. :return: None :rtype: None """ print("\nCreating speedup distribution boxplots...") if not self.speedup_data: print("Warning: No speedup data available for boxplots. Skipping.") return # Setup output directory output_dir_base = Path(os.path.expandvars(self.speedup_config.get("results_path", "./speedup_results"))) output_dir = output_dir_base / "boxplots" output_dir.mkdir(parents=True, exist_ok=True) # Data structures codec_data = {"encoder": defaultdict(list), "decoder": defaultdict(list)} codec_titles = {} # To store the pretty titles for labels # Process data to aggregate all speedups for each codec/op_type combination for lf_op_type, codec_speedups in self.speedup_data.items(): _, op_type = lf_op_type.rsplit('_', 1) # 'encoder' or 'decoder' for (codec_name, title), bpp_speedups in codec_speedups.items(): if codec_name == self.baseline_codec: continue # Handle both single speedup values and lists of individual speedups for speedup_value in bpp_speedups.values(): if isinstance(speedup_value, list): # Extend with all individual speedups from the list codec_data[op_type][codec_name].extend(speedup_value) else: # Append single speedup value codec_data[op_type][codec_name].append(speedup_value) if codec_name not in codec_titles: codec_titles[codec_name] = self.configuration["codecs"]["configuration"].get(codec_name, {}).get("rd_preferences", {}).get("title", codec_name) # Create plots for encoder and decoder for op_type, speedups_by_codec in codec_data.items(): if not speedups_by_codec: continue plt.figure(figsize=tuple(self.speedup_config.get("plots", {}).get('figure_size', [12, 8]))) font_size = self.speedup_config.get("plots", {}).get('font_size', 14) # Prepare boxplot data: labels, data, colors, positions labels_to_plot, data_to_plot, colors_to_plot, positions_to_plot = [], [], [], [] # Sort codecs based on thread count extracted from their title # This is a bit brittle if titles don't consistently contain thread count. # You might need to refine the sorting key based on your actual titles. sorted_codec_names = sorted( speedups_by_codec.keys(), key=lambda codec_name: int(re.search(r'(\d+)\s*threads', codec_titles[codec_name]).group(1)) if re.search(r'(\d+)\s*threads', codec_titles[codec_name]) else 0 # Default to 0 for non-threaded ) # Assign positions based on sorted order current_position = 1 for codec_name in sorted_codec_names: speedups = speedups_by_codec[codec_name] if not speedups: continue labels_to_plot.append(codec_titles[codec_name]) data_to_plot.append(speedups) colors_to_plot.append(self._get_plot_style(codec_name, self.speedup_config)['color']) positions_to_plot.append(current_position) # Use sequential positions if actual thread counts are not for x-axis current_position += 1 if not data_to_plot: # No data after filtering/sorting plt.close() # Close empty figure continue # Create boxplot bp = plt.boxplot( data_to_plot, positions=positions_to_plot, widths=0.6, # Fixed width for uniform spacing patch_artist=True, flierprops=dict( marker='o', markersize=3, markeredgecolor='black', alpha=0.6 ), # Set tick labels after sorting labels=labels_to_plot, # Use sorted labels directly zorder=2 ) # Apply colors to boxes for patch, color in zip(bp['boxes'], colors_to_plot): patch.set_facecolor(color) plt.title(f"Speedup Distribution - {op_type.capitalize()}", fontsize=font_size) plt.xlabel("Codec Configuration", fontsize=font_size) # More generic label for X-axis plt.ylabel("Speedup (w.r.t Baseline)", fontsize=font_size) # Adjust x-axis ticks to match positions if labels are long plt.xticks(positions_to_plot, labels_to_plot, rotation=45, ha='right', fontsize=font_size * 0.8) # Rotate for readability # Baseline line - removed red line at y=1 as requested # Note: baseline line can be shown with other configurations if needed # Adjust xlim to fit all boxes plt.xlim(positions_to_plot[0] - 0.7, positions_to_plot[-1] + 0.7) plt.grid(True, axis='both', linestyle=':', alpha=0.7) plt.tight_layout() # Save plot speedup_plot_format = self.speedup_config.get("plots", {}).get('format', 'pdf') plot_file = output_dir / f"speedup_distribution_{op_type.lower()}.{speedup_plot_format}" plt.savefig(plot_file, bbox_inches='tight', format=speedup_plot_format) plt.close() print(f"Saved speedup boxplot: {plot_file}")
[docs] def main() -> None: base_path = Path(os.path.abspath(os.path.dirname(sys.argv[0]))) configuration = read_config_from_argv(overriden_base_path=base_path / "..") # The 'performance' key now holds a list of configurations performance_configurations = configuration["performance"] for perf_config in performance_configurations: # Each item in the list is a complete performance configuration, which includes 'speedup' print(f"\nProcessing new performance configuration block for Speedup.") # Initialize PerformanceVisualizer to process logs for time (required for speedup) # and then pass the same configuration to SpeedupVisualizer visualizer = PerformanceVisualizer(configuration=configuration, performance_config=perf_config) # Process all logs for time_ns # This populates visualizer.metrics_data, which SpeedupVisualizer inherits visualizer.process_all_logs() # Now create the SpeedupVisualizer instance, passing the full performance_config speedup_visualizer = SpeedupVisualizer(configuration=configuration, performance_config=perf_config) # It's crucial that speedup_visualizer.metrics_data is populated before calling calculate_speedups # The PerformanceVisualizer instance `visualizer` has already processed the logs. # We need to transfer its `metrics_data` to `speedup_visualizer`. speedup_visualizer.metrics_data = visualizer.metrics_data.copy() speedup_visualizer.full_metrics_data = getattr(visualizer, 'full_metrics_data', defaultdict(lambda: { 'encoder': defaultdict(lambda: defaultdict(dict)), 'decoder': defaultdict(lambda: defaultdict(dict)) })).copy() # Transfer full metrics data with individual values speedup_visualizer.baseline_codec = visualizer.baseline_codec speedup_visualizer.baseline_memory = getattr(visualizer, 'baseline_memory', {}) # Ensure baseline is also transferred speedup_visualizer.calculate_speedups() if speedup_visualizer.speedup_data: speedup_visualizer.create_speedup_plots() speedup_visualizer.create_average_speedup_plots() create_codec_comparison_matrices(speedup_data=speedup_visualizer.speedup_data, plot_config={**speedup_visualizer.speedup_config, 'baseline_codec': speedup_visualizer.baseline_codec}) else: print("No speedup data to plot.")
if __name__ == "__main__": main()