"""
Author: Ismael Seidel (ismael.seidel@ufsc.br)
Affiliation: Embedded Computing Lab (ECL), Federal University of Santa Catarina (UFSC)
Description:
This module contains the `CodecWrapper` class, which provides a base implementation for encoding and decoding
light fields using various codecs.
"""
import json
import subprocess
import platform
import time
from datetime import datetime
from hashlib import md5
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union
from lfc_toolkit.src.file.custom_json import CustomJsonEncoder
from lfc_toolkit.src.data_handlers.formatters import get_modified_date
from lfc_toolkit.src.data_handlers.lightfield import (EncodedLightField,
LightField,
RAWLightFieldData)
from lfc_toolkit.src.quality.profile import (ExecutionMeasurements, RunProfile,
get_metrics_from_runs)
[docs]
class CodecWrapper:
[docs]
def __init__(
self,
codec_path: Path,
results_path: Path,
encoded_extension: str,
clear_log: bool = False,
repetitions: int = 1,
force_encoding: bool = False,
codec: str = None,
num_cores: int = None,
repository=None,
):
"""Initializes the CodecWrapper instance with the given parameters.
:param codec_path: Path to the codec binary
:type codec_path: Path
:param results_path: Path to store results
:type results_path: Path
:param encoded_extension: File extension for encoded files
:type encoded_extension: str
:param clear_log: Whether to clear logs after execution, defaults to False
:type clear_log: bool, optional
:param repetitions: Number of repetitions for encoding/decoding, defaults to 1
:type repetitions: int, optional
:param force_encoding: Whether to force encoding even if results exist, defaults to False
:type force_encoding: bool, optional
:param codec: Name of the codec, defaults to None
:type codec: str, optional
:param num_cores: Number of cores to use for encoding/decoding, defaults to None
:type num_cores: int, optional
:param repository: Repository object for managing codec source, defaults to None
:type repository: optional
"""
self.codec = codec
if codec_path:
self.codec_path: Path = Path(codec_path)
self.results_path = Path(results_path)
self.clear_log = clear_log
self.repetitions = repetitions
self.results = dict()
self.md5_history: Dict[str, str] = {} # Stores previous MD5 hashes
self.force_encoding: bool = force_encoding
self.num_cores = num_cores
self.repository = repository
self.encoded_extension = encoded_extension
if self.repository:
self.repository.clone_and_checkout_tag()
self.repository.build(num_cores=num_cores)
[docs]
def get_encoded_path(self, raw_lightfield: RAWLightFieldData) -> Path:
"""Gets the path for storing encoded light field data.
:param raw_lightfield: The raw light field data
:type raw_lightfield: RAWLightFieldData
:return: Path to the encoded data
:rtype: Path
"""
return self.results_path / f"{raw_lightfield.name}/encoded"
[docs]
def get_logs_path(self, raw_lightfield: RAWLightFieldData) -> Path:
"""Gets the path for storing logs.
:param raw_lightfield: The raw light field data
:type raw_lightfield: RAWLightFieldData
:return: Path to the logs
:rtype: Path
"""
return self.results_path / f"{raw_lightfield.name}/logs"
[docs]
def get_decoded_path(self, raw_lightfield: RAWLightFieldData) -> Path:
"""Gets the path for storing decoded light field data.
:param raw_lightfield: The raw light field data
:type raw_lightfield: RAWLightFieldData
:return: Path to the decoded data
:rtype: Path
"""
return self.results_path / f"{raw_lightfield.name}/decoded"
[docs]
def compute_bytes_and_bpp(
self, encoded_filename: Union[str, Path], raw_lightfield: RAWLightFieldData
) -> Tuple[int, float]:
"""Computes the number of bytes and bits per pixel (bpp) for the encoded file.
:param encoded_filename: Path to the encoded file
:type encoded_filename: Union[str, Path]
:param raw_lightfield: The raw light field data
:type raw_lightfield: RAWLightFieldData
:return: Tuple containing the number of bytes and bpp
:rtype: Tuple[int, float]
"""
obtained_n_bytes = Path(encoded_filename).stat().st_size
obtained_bpp = 8 * obtained_n_bytes / (raw_lightfield.get_number_of_pixels())
return obtained_n_bytes, obtained_bpp
[docs]
def add_decoded_conversion(self, decoded_conversion: RAWLightFieldData) -> None:
"""Adds a decoded conversion to the results.
:param decoded_conversion: The decoded conversion data
:type decoded_conversion: RAWLightFieldData
:return: None
:rtype: None
"""
self.results[decoded_conversion.name]["target_bpps"][
decoded_conversion.bpp_for_naming
].setdefault("decoded_conversions", list())
self.results[decoded_conversion.name]["target_bpps"][
decoded_conversion.bpp_for_naming
]["decoded_conversions"].append(decoded_conversion)
[docs]
def create_execution_log(self, output_log_filename: Path) -> None:
"""Creates a new execution log entry.
:param output_log_filename: Path to the output log file
:type output_log_filename: Path
:return: None
:rtype: None
"""
# Create new log entry
lscpu_info = get_lscpu_info()
log_entry = {
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"results": {},
}
if lscpu_info is not None:
log_entry["machine_characteristics"] = {
"processor": lscpu_info.get("Model name", platform.processor()),
"architecture": lscpu_info.get("Architecture"),
"cores": lscpu_info.get("CPU(s)"),
"caches": {
"L1d": lscpu_info.get("L1d cache"),
"L1i": lscpu_info.get("L1i cache"),
"L2": lscpu_info.get("L2 cache"),
"L3": lscpu_info.get("L3 cache"),
},
}
# Add repository data if available
if self.repository:
log_entry["encoder_data"] = {
"repository": self.repository.repository_address,
"path": str(self.repository.local_path),
"tag": self.repository.tag,
"use_docker_for_build": self.repository.use_docker_to_build,
"gcc_version": self.repository.gcc_version_from_build,
"build_options": self.repository.build_options,
"commit_hash": self.repository.commit_hash,
}
# Build results structure
for current_lf_name, result in self.results.items():
raw_lightfield = result["raw_lightfield"]
log_entry["results"][current_lf_name] = {
"raw_path": str(raw_lightfield.raw_path),
"raw_md5": raw_lightfield.get_md5(),
"encoder": {},
"decoder": {},
}
for target_bpp, data in result["target_bpps"].items():
encoded_lf = data["encoded"]["encoded_lf"]
decoded_lf = data.get("decoded", None)
encoding_entry = {
"target_bpp": target_bpp,
"obtained_bpp": encoded_lf.actual_bitrate,
"encoded_filename": str(encoded_lf.encoded_path),
"md5_of_encoded": md5(
encoded_lf.encoded_path.read_bytes()
).hexdigest(),
"modified_date": get_modified_date(encoded_lf.encoded_path),
"execution_time": {},
}
if "profile_encoder" in data and data["profile_encoder"] is not None:
for component_name, component_data in data["profile_encoder"].items():
if component_data is None:
continue
if hasattr(component_data, "runs") and hasattr(component_data, "pooled_metrics"):
log_data_list = [
{
"start_time": run.start_time.strftime("%H:%M:%S.%f"),
"end_time": run.end_time.strftime("%H:%M:%S.%f"),
"time_ns": run.duration,
**run.filtered_log_data("encoder"),
}
for run in component_data.runs
]
encoding_entry = {
"target_bpp": target_bpp,
"obtained_bpp": encoded_lf.actual_bitrate,
"encoded_filename": str(encoded_lf.encoded_path),
"md5_of_encoded": md5(encoded_lf.encoded_path.read_bytes()).hexdigest(),
"modified_date": get_modified_date(encoded_lf.encoded_path),
"log_data": log_data_list,
}
if self.repetitions > 1:
encoding_entry["pooled_metrics"] = {
"time_ns": (
component_data.pooled_metrics.asdict()
if hasattr(component_data, "pooled_metrics")
and component_data.pooled_metrics
else {}
),
"max_memory_usage": (
component_data.memory_metrics.asdict()
if hasattr(component_data, "memory_metrics")
and component_data.memory_metrics
else {}
),
}
log_entry["results"][current_lf_name]["encoder"][target_bpp] = encoding_entry
if decoded_lf:
profile_decoder = data.get("profile_decoder", {}).get("decoder")
if profile_decoder is not None:
log_data_list = [
{
"start_time": run.start_time.strftime("%H:%M:%S.%f"),
"end_time": run.end_time.strftime("%H:%M:%S.%f"),
"time_ns": run.duration,
**run.filtered_log_data("decoder"),
}
for run in profile_decoder.runs
]
decoding_entry = {
"target_bpp": target_bpp,
"decoded_path": str(decoded_lf.raw_path),
"decoded_type": decoded_lf.type,
"decoded_md5": decoded_lf.get_md5(),
"modified_date": get_modified_date(decoded_lf.raw_path),
"log_data": log_data_list,
"decoded_conversions": {},
}
if self.repetitions > 1:
decoding_entry["pooled_metrics"] = {
"time_ns": (
profile_decoder.pooled_metrics.asdict()
if hasattr(profile_decoder, "pooled_metrics")
and profile_decoder.pooled_metrics
else {}
),
"max_memory_usage": (
profile_decoder.memory_metrics.asdict()
if hasattr(profile_decoder, "memory_metrics")
and profile_decoder.memory_metrics
else {}
),
}
for conv in data.get("decoded_conversions", []):
decoding_entry["decoded_conversions"][conv.type] = {
"raw_path": str(conv.raw_path),
"md5": conv.get_md5(),
"modified_date": get_modified_date(conv.raw_path),
}
log_entry["results"][current_lf_name]["decoder"][target_bpp] = decoding_entry
if self.clear_log:
with open(output_log_filename, "w") as f:
json.dump([log_entry], f, indent=4, cls=CustomJsonEncoder)
else:
with open(output_log_filename, "r") as f:
existing_content = json.load(f)
if isinstance(existing_content, list):
existing_logs = existing_content
else:
existing_logs = [existing_content]
# Append new entry and write back
existing_logs.append(log_entry)
with open(output_log_filename, "w") as f:
json.dump(existing_logs, f, indent=4, cls=CustomJsonEncoder)
# Verify if the lightfield file has changed between repetitions by comparing MD5 hashes.
[docs]
def verify_lightfield_consistency(self, bpp: float, lightfield: LightField) -> bool:
"""Verifies if the light field file has changed between repetitions by comparing MD5 hashes.
:param bpp: Target bits per pixel value
:type bpp: float
:param lightfield: The light field data
:type lightfield: LightField
:return: True if the light field is consistent, False otherwise
:rtype: bool
"""
current_md5 = lightfield.get_md5()
mode = "encoded" if isinstance(lightfield, EncodedLightField) else "decoded"
lf_key = f"{lightfield.name}_{bpp}_{mode}"
if lf_key in self.md5_history:
if self.md5_history[lf_key] != current_md5:
print(
f"WARNING: Lightfield {lightfield.name} (bpp={bpp}) (mode={mode}) has changed. Previous MD5: {self.md5_history[lf_key]}, Current MD5: {current_md5}"
)
return False
self.md5_history[lf_key] = current_md5
return True
[docs]
def execute_perf_command(self, command: List[str], cwd: Optional[str] = None) -> RunProfile:
"""Executes a command and captures its execution profile (duration, stdout, stderr).
:param command: Command to execute as a list of arguments
:type command: List[str]
:param cwd: Current working directory for the command, defaults to None
:type cwd: Optional[str], optional
:return: Run profile with duration, success status, stdout and stderr
:rtype: RunProfile
"""
start_time = datetime.now()
start_perf = time.perf_counter_ns()
res = subprocess.run(command, cwd=cwd, capture_output=True, text=True)
end_perf = time.perf_counter_ns()
end_time = datetime.now()
duration = end_perf - start_perf
profile = RunProfile(
duration=duration,
success=(res.returncode == 0),
start_time=start_time,
end_time=end_time,
stdout=res.stdout,
stderr=res.stderr,
)
return profile
# Execute command for encoding and decoding lightfields, now allowing multiple repetitions
[docs]
def execute_command(
self,
bpp: float,
command: List[str],
log_filename: Path,
cwd: Optional[Path] = None,
lightfield: Optional[LightField] = None,
) -> ExecutionMeasurements:
"""Executes a command for encoding or decoding light fields, allowing multiple repetitions.
:param bpp: Target bits per pixel value
:type bpp: float
:param command: Command to execute
:type command: List[str]
:param log_filename: Path to the log file
:type log_filename: Path
:param cwd: Current working directory, defaults to None
:type cwd: Optional[Path], optional
:param lightfield: The light field data, defaults to None
:type lightfield: Optional[LightField], optional
:return: Execution measurements for the command
:rtype: ExecutionMeasurements
"""
runs = []
log_filename.parent.mkdir(parents=True, exist_ok=True)
for rep in range(self.repetitions):
if lightfield and self.repetitions > 1:
print(
f"Repetition: {rep+1}/{self.repetitions} ({self.codec}, {lightfield.name}, {bpp})"
)
if lightfield and rep > 0:
if not self.verify_lightfield_consistency(
bpp=bpp, lightfield=lightfield
):
raise ValueError(f"Lightfield changed during repetition {rep+1}!")
profile = self.execute_perf_command(command=command, cwd=cwd)
rep_log_filename = log_filename.with_stem(
f"{log_filename.stem}_rep_{rep+1}"
)
with open(rep_log_filename, "w") as f:
f.write(f"Command: {' '.join(command)}\n")
f.write(f"Repetition: {rep+1}\n")
f.write(f"Start time: {profile.start_time}\n")
f.write(f"End time: {profile.end_time}\n")
f.write(f"Duration: {profile.duration} ns\n")
f.write(f"Stdout:\n{profile.stdout}\n")
f.write(f"Stderr:\n{profile.stderr}\n")
profile.log_data = self.extract_data_from_log(rep_log_filename)
runs.append(profile)
return get_metrics_from_runs(runs)
[docs]
def get_lscpu_info() -> Optional[Dict[str, str]]:
"""Retrieves CPU information from the lscpu command.
Returns None if lscpu is not available or fails (e.g., on macOS or Windows).
:return: Dictionary with CPU information (e.g., model, architecture, cores, caches), or None on failure
:rtype: Optional[Dict[str, str]]
"""
try:
output = subprocess.check_output("lscpu", shell=True).decode()
except (subprocess.CalledProcessError, FileNotFoundError):
return None
info = {}
for line in output.split("\n"):
if ":" in line:
key, value = line.split(":", 1)
info[key.strip()] = value.strip()
return info