Source code for src.yuv_and_pgx_from_ppm

"""
Author: Ismael Seidel
Affiliation: Embedded Computing Lab (ECL), Federal University of Santa Catarina (UFSC)
Contributors:
    - André Filipe da Silva Fernandes

Description:
    This module converts the JPEG Pleno LF Dataset from PPM to YUV and PGX formats.
    The PGX and YUV files should have the same data.
    This implementation is for JPEG Pleno CE 14.
"""

import json
import os
import subprocess
import sys
import zipfile
from pathlib import Path

from lfc_toolkit.src.configuration.configuration_reader import ConfigurationReader
from lfc_toolkit.src.converters.lightfield_converter import LightfieldConverter
from lfc_toolkit.src.ctc.lightfield_preprocessing import LightfieldPreprocess
from lfc_toolkit.src.data_handlers.formatters import get_modified_date
from lfc_toolkit.src.data_handlers.lightfield import (
    RAW_BT709_FR_PGX_LightField_Data,
    RAW_BT709_FR_YUV444p10le_LightField_Data,
    RAW_RGB_PPM_LightField_Data,
    RAWLightFieldData,
)

# Add the parent directory to the Python path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))

from lfc_toolkit.src.configuration.configuration_reader import read_config_from_argv
from lfc_toolkit.src.ctc.download import LightfieldDownloader
from lfc_toolkit.src.ctc.lightfield_factory import LightFieldFactory


[docs] def checksum_conversion( lightfield_name: str, md5: str, file_format: str, configuration: ConfigurationReader ) -> None: """Verify MD5 checksum for converted light field files. Compares computed MD5 hash against expected value stored in configuration. Logs result indicating whether checksum matches or not. :param lightfield_name: Name of the light field :type lightfield_name: str :param md5: Computed MD5 hash value :type md5: str :param file_format: File format (ppm, yuv, or pgx) :type file_format: str :param configuration: Configuration reader with checksum data :type configuration: ConfigurationReader :return: None :rtype: None :raises Exception: If file format is not supported (ppm, yuv, or pgx) """ # Raise exception if the format is not PPM, YUV or PGX if file_format.lower() not in ["ppm", "yuv", "pgx"]: raise Exception( "Format not supported for checksum conversion. Use ppm, yuv or pgx." ) print(f"Checking checksum for {lightfield_name} in {file_format.upper()} format.") checksum_dir = configuration["ctc-conversions-checksums"] checksum_file = Path(checksum_dir) / f"{file_format.lower()}_md5.json" with open(checksum_file, "r", encoding="utf-8") as f: data = json.load(f) md5_expected = data[lightfield_name] if md5 == md5_expected: print( f"Checksum for {lightfield_name} in {file_format.upper()} matches with expected." ) else: print( f"Checksum for {lightfield_name} in {file_format.upper()} does not match." ) print(f"Expected: {md5_expected}, Got: {md5}")
[docs] def get_output_log() -> dict: """Get FFmpeg version and initialize conversion output log. :return: Dictionary with FFmpeg version and empty conversion results list :rtype: dict :raises Exception: If FFmpeg is not available or version check fails """ res = subprocess.run(["ffmpeg", "-version"], capture_output=True) if res.returncode != 0: raise Exception(res.stderr) ffmpeg_version_out = res.stdout.decode().splitlines() formated_version = f"{ffmpeg_version_out[0]} {ffmpeg_version_out[1]}" output_log = { "ffmpeg_version": formated_version, "conversion": list(), } return output_log
[docs] def download_lightfield_if_needed( lightfield: RAWLightFieldData, configuration: ConfigurationReader ) -> None: """Download light field PPM files if not already present locally. Downloads the light field from configured URL and extracts to the appropriate raw_ppm_path directory if files don't already exist. :param lightfield: Light field object with raw_path attribute :type lightfield: RAWLightFieldData :param configuration: Configuration reader with download URLs and paths :type configuration: ConfigurationReader :return: None :rtype: None :raises Exception: If URL not configured and files not present """ print(lightfield.raw_path) if lightfield.raw_path.exists(): return raw_ppm_path = Path(configuration["raw_paths"]["ppm-download"]) raw_ppm_path.mkdir(parents=True, exist_ok=True) zip_path = raw_ppm_path / f"{lightfield.name}.zip" if not zip_path.is_file(): url = configuration.lightfield_configurations[lightfield.name].get("url", None) if url == None: raise Exception( "Unable to find the raw LF and unable to download it. If you want the dataset to be downloaded, include the url in the lf configuration file." ) LightfieldDownloader.download( url=url, filename=lightfield.name, extension="zip", destination=raw_ppm_path, ) with zipfile.ZipFile(zip_path, "r") as zip: # the next few lines are for solving the issue of different structure within zip files... for filename in zip.namelist(): needs_folder = True if lightfield.name + "/" in filename: print(filename) needs_folder = False break try: if needs_folder: zip.extractall(lightfield.raw_path) else: zip.extractall(raw_ppm_path) except Exception as e: print("Warning: Maybe something is wrong in the zip file.") print(str(e))
[docs] def convert_lightfield_to_yuv_and_pgx( lightfield_name: str, configuration: ConfigurationReader ) -> dict: """Ensure light field is available in PPM, YUV and PGX formats. Handles all format conversions needed for a light field: - Downloads original format if needed - Preprocesses the raw light field - Converts to PPM format - Converts from PPM to YUV - Converts from YUV to PGX :param lightfield_name: Name of the light field to convert :type lightfield_name: str :param configuration: Configuration reader with paths and format settings :type configuration: ConfigurationReader :return: Dictionary with 'ppm', 'yuv', and 'pgx' light field instances :rtype: dict """ if lightfield_name not in configuration.lightfield_names: configuration.lightfield_configurations[lightfield_name] = ( configuration.get_lightfield_configuration(lightfield_name) ) # Determine original format lf_cfg = configuration.lightfield_configurations.get(lightfield_name, {}) original_format = lf_cfg.get("original-format", "ppm") # Get lightfield object (may raise if configuration still missing) raw_lightfield = LightFieldFactory.get_raw_lightfield( configuration=configuration, lightfield_name=lightfield_name, raw_type=original_format, after_preprocessing=False, ) # Download PPM if needed if isinstance(raw_lightfield, RAW_RGB_PPM_LightField_Data): download_lightfield_if_needed(raw_lightfield, configuration) # Preprocess and convert pre_processed = LightfieldPreprocess.get( configuration=configuration, lightfield=raw_lightfield ) # Convert to PPM (will place files under configured ppm path) ppm_output_path = configuration["raw_paths"]["ppm"] ppm_lf = LightfieldConverter.convert( source=pre_processed, destination_type="ppm", output_path=ppm_output_path ) # Convert to YUV raw_yuv_path = Path(configuration["raw_paths"]["yuv"]) yuv_lf = LightfieldConverter.convert( source=ppm_lf, destination_type="yuv", output_path=raw_yuv_path ) # Convert to PGX raw_pgx_path = Path(configuration["raw_paths"]["pgx"]) pgx_lf = LightfieldConverter.convert( source=yuv_lf, destination_type="pgx", output_path=raw_pgx_path ) return { "ppm": ppm_lf, "yuv": yuv_lf, "pgx": pgx_lf, }
[docs] def main(configuration: ConfigurationReader = None) -> None: """Convert JPEG Pleno light field dataset from PPM to YUV and PGX formats. Orchestrates the complete format conversion pipeline for all configured light fields. Downloads raw PPM files if needed, preprocesses them, and converts to both YUV and PGX formats with full MD5 checksum verification. Performs round-trip validation (YUV->PGX->YUV) to ensure data integrity. Usage: python yuv_and_pgx_from_ppm.py <configuration.json> :param configuration: Configuration reader (read from argv if None) :type configuration: ConfigurationReader :return: None :rtype: None """ if not configuration: configuration = read_config_from_argv() # Load codecs run_codecs = configuration["codecs-to-run"] sample_codecs = configuration["sample-codecs-to-run"] # Check if there are codecs to run that are not example (sample) codecs. if len(run_codecs) == 0 and len(sample_codecs) > 0: print("Skipping YUV and PGX conversion. Will use sampled values.") return output_log = get_output_log() output_log["output_log"] = dict(configuration) raw_paths = configuration["raw_paths"] raw_yuv_path = Path(raw_paths["yuv"]) raw_pgx_path = Path(raw_paths["pgx"]) raw_yuv_from_pgx_path = Path(raw_paths["yuv_check"]) remove_yuv_from_pgx_after_check = configuration["remove_yuv_from_pgx_after_check"] output_log_filename = Path(configuration["logs"]["yuv_and_pgx_from_ppm"]) lightfields_names = configuration.lightfield_names raw_lightfields = [ LightFieldFactory.get_raw_lightfield( configuration=configuration, lightfield_name=name, raw_type=configuration.lightfield_configurations[name].get( "original-format", "ppm" ), after_preprocessing=False, ) for name in lightfields_names ] ## download the lightfields if needed for lightfield in raw_lightfields: if isinstance(lightfield, RAW_RGB_PPM_LightField_Data): download_lightfield_if_needed(lightfield, configuration) ## preprocess force_conversions = False try: force_conversions = configuration["force_conversions"] except KeyError: pass pre_processed_raw_ppm_lfs = [] for lightfield in raw_lightfields: ppm_lf_path = Path(raw_paths["ppm"]) / lightfield.name if not force_conversions and ppm_lf_path.is_dir() and any(ppm_lf_path.rglob("*.ppm")): print(f"Skipping preprocessing for {lightfield.name}: PPM already exists at {ppm_lf_path}") ppm_lf = LightFieldFactory.get_raw_lightfield( configuration=configuration, lightfield_name=lightfield.name, raw_type="ppm", after_preprocessing=True, ) pre_processed_raw_ppm_lfs.append(ppm_lf) continue pre_processed = LightfieldPreprocess.get(configuration=configuration, lightfield=lightfield) ppm_lf = LightfieldConverter.convert( source=pre_processed, destination_type="ppm", output_path=raw_paths["ppm"] ) pre_processed_raw_ppm_lfs.append(ppm_lf) # pre_processed_raw_lfs yuvs: list[RAW_BT709_FR_YUV444p10le_LightField_Data] = list() pgxs: list[RAW_BT709_FR_PGX_LightField_Data] = list() # exit(0) for lightfield in pre_processed_raw_ppm_lfs: lf_name = lightfield.name result = {"name": lf_name} if not force_conversions: yuvs = list(raw_yuv_path.glob(f"{lf_name}_*.yuv")) pgx_lf_path = raw_pgx_path / lf_name yuv_exists = len(yuvs) > 0 and yuvs[0].is_file() pgx_exists = pgx_lf_path.is_dir() and any(pgx_lf_path.rglob("*.pgx")) if yuv_exists and pgx_exists: print(f"Skipping {lf_name}: YUV ({yuvs[0]}) and PGX ({pgx_lf_path}) already exist.") output_log["conversion"].append(result) continue md5_of_ppm = lightfield.get_md5() result["ppm"] = { "path": str(lightfield.raw_path), "timestamp": get_modified_date(lightfield.raw_path), "md5": md5_of_ppm, } checksum_conversion( lightfield_name=lf_name, md5=md5_of_ppm, file_format="ppm", configuration=configuration, ) print(f"Converting {lf_name} from PPM to YUV") print(lightfield.raw_path) print(raw_yuv_path) yuv_lf = LightfieldConverter.convert( source=lightfield, destination_type="yuv", output_path=raw_yuv_path ) yuvs.append(yuv_lf) yuv_lf = yuvs[-1] yuv_filename = yuv_lf.raw_path md5_of_yuv = yuv_lf.get_md5() result["yuv"] = { "path": str(yuv_filename), "timestamp": get_modified_date(yuv_filename), "md5": md5_of_yuv, } checksum_conversion( lightfield_name=lf_name, md5=md5_of_yuv, file_format="yuv", configuration=configuration, ) print("Generated the file", yuv_filename) print("MD5 (YUV from PPM):", md5_of_yuv) print(f"Converting {lf_name} from YUV to PGX (this may take a while)") pgxs.append( LightfieldConverter.convert( source=yuvs[-1], destination_type="pgx", output_path=raw_pgx_path ) ) pgx_raw_path = pgxs[-1].raw_path md5_of_pgx = pgxs[-1].get_md5() print("Generated the files in", pgx_raw_path) print(f"MD5 (PGX from YUV): {md5_of_pgx}") result["pgx"] = { "path": str(pgx_raw_path), "timestamp": get_modified_date(pgx_raw_path), "md5": md5_of_pgx, } checksum_conversion( lightfield_name=lf_name, md5=md5_of_pgx, file_format="pgx", configuration=configuration, ) print( f"Converting {lf_name} back from PGX to YUV for checking", "(no loss in the YUV->PGX->YUV should occur)", ) yuv_from_pgx = LightfieldConverter.convert( source=pgxs[-1], destination_type="yuv", output_path=raw_yuv_from_pgx_path ) md5_of_yuv_from_pgx = yuv_from_pgx.get_md5() print("MD5 (YUV from PGX):", md5_of_yuv_from_pgx) result["pgx_yuv_check"] = { "path": str(yuv_from_pgx.raw_path), "md5": md5_of_yuv_from_pgx, } if md5_of_yuv == md5_of_yuv_from_pgx: print("MD5 of YUV from PPM and YUV from PGX are the same.") result["pgx_yuv_check"]["check"] = True else: print( "Something is wrong!", "The MD5 of YUV from PPM and YUV from PGX are NOT the same.", ) result["pgx_yuv_check"]["check"] = False if remove_yuv_from_pgx_after_check and os.path.exists(yuv_from_pgx.raw_path): print( "Removing the YUV generated from PGX", "(the YUV generated from PPM is kept)", ) result["pgx_yuv_check"]["removed"] = True yuv_from_pgx.raw_path.unlink() else: result["pgx_yuv_check"]["removed"] = False output_log["conversion"].append(result) print(f"Done processing {lf_name}") print() output_log_filename.parent.mkdir(parents=True, exist_ok=True) with open(output_log_filename, "w") as file: json.dump(output_log, file, indent=4)
if __name__ == "__main__": main()