"""
Author: Ismael Seidel
Affiliation: Embedded Computing Lab (ECL), Federal University of Santa Catarina (UFSC)
Contributors:
- André Filipe da Silva Fernandes
Description:
This module converts the JPEG Pleno LF Dataset from PPM to YUV and PGX formats.
The PGX and YUV files should have the same data.
This implementation is for JPEG Pleno CE 14.
"""
import json
import os
import subprocess
import sys
import zipfile
from pathlib import Path
from lfc_toolkit.src.configuration.configuration_reader import ConfigurationReader
from lfc_toolkit.src.converters.lightfield_converter import LightfieldConverter
from lfc_toolkit.src.ctc.lightfield_preprocessing import LightfieldPreprocess
from lfc_toolkit.src.data_handlers.formatters import get_modified_date
from lfc_toolkit.src.data_handlers.lightfield import (
RAW_BT709_FR_PGX_LightField_Data,
RAW_BT709_FR_YUV444p10le_LightField_Data,
RAW_RGB_PPM_LightField_Data,
RAWLightFieldData,
)
# Add the parent directory to the Python path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from lfc_toolkit.src.configuration.configuration_reader import read_config_from_argv
from lfc_toolkit.src.ctc.download import LightfieldDownloader
from lfc_toolkit.src.ctc.lightfield_factory import LightFieldFactory
[docs]
def checksum_conversion(
lightfield_name: str, md5: str, file_format: str, configuration: ConfigurationReader
) -> None:
"""Verify MD5 checksum for converted light field files.
Compares computed MD5 hash against expected value stored in configuration.
Logs result indicating whether checksum matches or not.
:param lightfield_name: Name of the light field
:type lightfield_name: str
:param md5: Computed MD5 hash value
:type md5: str
:param file_format: File format (ppm, yuv, or pgx)
:type file_format: str
:param configuration: Configuration reader with checksum data
:type configuration: ConfigurationReader
:return: None
:rtype: None
:raises Exception: If file format is not supported (ppm, yuv, or pgx)
"""
# Raise exception if the format is not PPM, YUV or PGX
if file_format.lower() not in ["ppm", "yuv", "pgx"]:
raise Exception(
"Format not supported for checksum conversion. Use ppm, yuv or pgx."
)
print(f"Checking checksum for {lightfield_name} in {file_format.upper()} format.")
checksum_dir = configuration["ctc-conversions-checksums"]
checksum_file = Path(checksum_dir) / f"{file_format.lower()}_md5.json"
with open(checksum_file, "r", encoding="utf-8") as f:
data = json.load(f)
md5_expected = data[lightfield_name]
if md5 == md5_expected:
print(
f"Checksum for {lightfield_name} in {file_format.upper()} matches with expected."
)
else:
print(
f"Checksum for {lightfield_name} in {file_format.upper()} does not match."
)
print(f"Expected: {md5_expected}, Got: {md5}")
[docs]
def get_output_log() -> dict:
"""Get FFmpeg version and initialize conversion output log.
:return: Dictionary with FFmpeg version and empty conversion results list
:rtype: dict
:raises Exception: If FFmpeg is not available or version check fails
"""
res = subprocess.run(["ffmpeg", "-version"], capture_output=True)
if res.returncode != 0:
raise Exception(res.stderr)
ffmpeg_version_out = res.stdout.decode().splitlines()
formated_version = f"{ffmpeg_version_out[0]} {ffmpeg_version_out[1]}"
output_log = {
"ffmpeg_version": formated_version,
"conversion": list(),
}
return output_log
[docs]
def download_lightfield_if_needed(
lightfield: RAWLightFieldData, configuration: ConfigurationReader
) -> None:
"""Download light field PPM files if not already present locally.
Downloads the light field from configured URL and extracts to the
appropriate raw_ppm_path directory if files don't already exist.
:param lightfield: Light field object with raw_path attribute
:type lightfield: RAWLightFieldData
:param configuration: Configuration reader with download URLs and paths
:type configuration: ConfigurationReader
:return: None
:rtype: None
:raises Exception: If URL not configured and files not present
"""
print(lightfield.raw_path)
if lightfield.raw_path.exists():
return
raw_ppm_path = Path(configuration["raw_paths"]["ppm-download"])
raw_ppm_path.mkdir(parents=True, exist_ok=True)
zip_path = raw_ppm_path / f"{lightfield.name}.zip"
if not zip_path.is_file():
url = configuration.lightfield_configurations[lightfield.name].get("url", None)
if url == None:
raise Exception(
"Unable to find the raw LF and unable to download it. If you want the dataset to be downloaded, include the url in the lf configuration file."
)
LightfieldDownloader.download(
url=url,
filename=lightfield.name,
extension="zip",
destination=raw_ppm_path,
)
with zipfile.ZipFile(zip_path, "r") as zip:
# the next few lines are for solving the issue of different structure within zip files...
for filename in zip.namelist():
needs_folder = True
if lightfield.name + "/" in filename:
print(filename)
needs_folder = False
break
try:
if needs_folder:
zip.extractall(lightfield.raw_path)
else:
zip.extractall(raw_ppm_path)
except Exception as e:
print("Warning: Maybe something is wrong in the zip file.")
print(str(e))
[docs]
def convert_lightfield_to_yuv_and_pgx(
lightfield_name: str, configuration: ConfigurationReader
) -> dict:
"""Ensure light field is available in PPM, YUV and PGX formats.
Handles all format conversions needed for a light field:
- Downloads original format if needed
- Preprocesses the raw light field
- Converts to PPM format
- Converts from PPM to YUV
- Converts from YUV to PGX
:param lightfield_name: Name of the light field to convert
:type lightfield_name: str
:param configuration: Configuration reader with paths and format settings
:type configuration: ConfigurationReader
:return: Dictionary with 'ppm', 'yuv', and 'pgx' light field instances
:rtype: dict
"""
if lightfield_name not in configuration.lightfield_names:
configuration.lightfield_configurations[lightfield_name] = (
configuration.get_lightfield_configuration(lightfield_name)
)
# Determine original format
lf_cfg = configuration.lightfield_configurations.get(lightfield_name, {})
original_format = lf_cfg.get("original-format", "ppm")
# Get lightfield object (may raise if configuration still missing)
raw_lightfield = LightFieldFactory.get_raw_lightfield(
configuration=configuration,
lightfield_name=lightfield_name,
raw_type=original_format,
after_preprocessing=False,
)
# Download PPM if needed
if isinstance(raw_lightfield, RAW_RGB_PPM_LightField_Data):
download_lightfield_if_needed(raw_lightfield, configuration)
# Preprocess and convert
pre_processed = LightfieldPreprocess.get(
configuration=configuration, lightfield=raw_lightfield
)
# Convert to PPM (will place files under configured ppm path)
ppm_output_path = configuration["raw_paths"]["ppm"]
ppm_lf = LightfieldConverter.convert(
source=pre_processed, destination_type="ppm", output_path=ppm_output_path
)
# Convert to YUV
raw_yuv_path = Path(configuration["raw_paths"]["yuv"])
yuv_lf = LightfieldConverter.convert(
source=ppm_lf, destination_type="yuv", output_path=raw_yuv_path
)
# Convert to PGX
raw_pgx_path = Path(configuration["raw_paths"]["pgx"])
pgx_lf = LightfieldConverter.convert(
source=yuv_lf, destination_type="pgx", output_path=raw_pgx_path
)
return {
"ppm": ppm_lf,
"yuv": yuv_lf,
"pgx": pgx_lf,
}
[docs]
def main(configuration: ConfigurationReader = None) -> None:
"""Convert JPEG Pleno light field dataset from PPM to YUV and PGX formats.
Orchestrates the complete format conversion pipeline for all configured light fields.
Downloads raw PPM files if needed, preprocesses them, and converts to both YUV and PGX
formats with full MD5 checksum verification. Performs round-trip validation (YUV->PGX->YUV)
to ensure data integrity.
Usage: python yuv_and_pgx_from_ppm.py <configuration.json>
:param configuration: Configuration reader (read from argv if None)
:type configuration: ConfigurationReader
:return: None
:rtype: None
"""
if not configuration:
configuration = read_config_from_argv()
# Load codecs
run_codecs = configuration["codecs-to-run"]
sample_codecs = configuration["sample-codecs-to-run"]
# Check if there are codecs to run that are not example (sample) codecs.
if len(run_codecs) == 0 and len(sample_codecs) > 0:
print("Skipping YUV and PGX conversion. Will use sampled values.")
return
output_log = get_output_log()
output_log["output_log"] = dict(configuration)
raw_paths = configuration["raw_paths"]
raw_yuv_path = Path(raw_paths["yuv"])
raw_pgx_path = Path(raw_paths["pgx"])
raw_yuv_from_pgx_path = Path(raw_paths["yuv_check"])
remove_yuv_from_pgx_after_check = configuration["remove_yuv_from_pgx_after_check"]
output_log_filename = Path(configuration["logs"]["yuv_and_pgx_from_ppm"])
lightfields_names = configuration.lightfield_names
raw_lightfields = [
LightFieldFactory.get_raw_lightfield(
configuration=configuration,
lightfield_name=name,
raw_type=configuration.lightfield_configurations[name].get(
"original-format", "ppm"
),
after_preprocessing=False,
)
for name in lightfields_names
]
## download the lightfields if needed
for lightfield in raw_lightfields:
if isinstance(lightfield, RAW_RGB_PPM_LightField_Data):
download_lightfield_if_needed(lightfield, configuration)
## preprocess
force_conversions = False
try:
force_conversions = configuration["force_conversions"]
except KeyError:
pass
pre_processed_raw_ppm_lfs = []
for lightfield in raw_lightfields:
ppm_lf_path = Path(raw_paths["ppm"]) / lightfield.name
if not force_conversions and ppm_lf_path.is_dir() and any(ppm_lf_path.rglob("*.ppm")):
print(f"Skipping preprocessing for {lightfield.name}: PPM already exists at {ppm_lf_path}")
ppm_lf = LightFieldFactory.get_raw_lightfield(
configuration=configuration,
lightfield_name=lightfield.name,
raw_type="ppm",
after_preprocessing=True,
)
pre_processed_raw_ppm_lfs.append(ppm_lf)
continue
pre_processed = LightfieldPreprocess.get(configuration=configuration, lightfield=lightfield)
ppm_lf = LightfieldConverter.convert(
source=pre_processed, destination_type="ppm", output_path=raw_paths["ppm"]
)
pre_processed_raw_ppm_lfs.append(ppm_lf)
# pre_processed_raw_lfs
yuvs: list[RAW_BT709_FR_YUV444p10le_LightField_Data] = list()
pgxs: list[RAW_BT709_FR_PGX_LightField_Data] = list()
# exit(0)
for lightfield in pre_processed_raw_ppm_lfs:
lf_name = lightfield.name
result = {"name": lf_name}
if not force_conversions:
yuvs = list(raw_yuv_path.glob(f"{lf_name}_*.yuv"))
pgx_lf_path = raw_pgx_path / lf_name
yuv_exists = len(yuvs) > 0 and yuvs[0].is_file()
pgx_exists = pgx_lf_path.is_dir() and any(pgx_lf_path.rglob("*.pgx"))
if yuv_exists and pgx_exists:
print(f"Skipping {lf_name}: YUV ({yuvs[0]}) and PGX ({pgx_lf_path}) already exist.")
output_log["conversion"].append(result)
continue
md5_of_ppm = lightfield.get_md5()
result["ppm"] = {
"path": str(lightfield.raw_path),
"timestamp": get_modified_date(lightfield.raw_path),
"md5": md5_of_ppm,
}
checksum_conversion(
lightfield_name=lf_name,
md5=md5_of_ppm,
file_format="ppm",
configuration=configuration,
)
print(f"Converting {lf_name} from PPM to YUV")
print(lightfield.raw_path)
print(raw_yuv_path)
yuv_lf = LightfieldConverter.convert(
source=lightfield, destination_type="yuv", output_path=raw_yuv_path
)
yuvs.append(yuv_lf)
yuv_lf = yuvs[-1]
yuv_filename = yuv_lf.raw_path
md5_of_yuv = yuv_lf.get_md5()
result["yuv"] = {
"path": str(yuv_filename),
"timestamp": get_modified_date(yuv_filename),
"md5": md5_of_yuv,
}
checksum_conversion(
lightfield_name=lf_name,
md5=md5_of_yuv,
file_format="yuv",
configuration=configuration,
)
print("Generated the file", yuv_filename)
print("MD5 (YUV from PPM):", md5_of_yuv)
print(f"Converting {lf_name} from YUV to PGX (this may take a while)")
pgxs.append(
LightfieldConverter.convert(
source=yuvs[-1], destination_type="pgx", output_path=raw_pgx_path
)
)
pgx_raw_path = pgxs[-1].raw_path
md5_of_pgx = pgxs[-1].get_md5()
print("Generated the files in", pgx_raw_path)
print(f"MD5 (PGX from YUV): {md5_of_pgx}")
result["pgx"] = {
"path": str(pgx_raw_path),
"timestamp": get_modified_date(pgx_raw_path),
"md5": md5_of_pgx,
}
checksum_conversion(
lightfield_name=lf_name,
md5=md5_of_pgx,
file_format="pgx",
configuration=configuration,
)
print(
f"Converting {lf_name} back from PGX to YUV for checking",
"(no loss in the YUV->PGX->YUV should occur)",
)
yuv_from_pgx = LightfieldConverter.convert(
source=pgxs[-1], destination_type="yuv", output_path=raw_yuv_from_pgx_path
)
md5_of_yuv_from_pgx = yuv_from_pgx.get_md5()
print("MD5 (YUV from PGX):", md5_of_yuv_from_pgx)
result["pgx_yuv_check"] = {
"path": str(yuv_from_pgx.raw_path),
"md5": md5_of_yuv_from_pgx,
}
if md5_of_yuv == md5_of_yuv_from_pgx:
print("MD5 of YUV from PPM and YUV from PGX are the same.")
result["pgx_yuv_check"]["check"] = True
else:
print(
"Something is wrong!",
"The MD5 of YUV from PPM and YUV from PGX are NOT the same.",
)
result["pgx_yuv_check"]["check"] = False
if remove_yuv_from_pgx_after_check and os.path.exists(yuv_from_pgx.raw_path):
print(
"Removing the YUV generated from PGX",
"(the YUV generated from PPM is kept)",
)
result["pgx_yuv_check"]["removed"] = True
yuv_from_pgx.raw_path.unlink()
else:
result["pgx_yuv_check"]["removed"] = False
output_log["conversion"].append(result)
print(f"Done processing {lf_name}")
print()
output_log_filename.parent.mkdir(parents=True, exist_ok=True)
with open(output_log_filename, "w") as file:
json.dump(output_log, file, indent=4)
if __name__ == "__main__":
main()