Source code for src.organize_encoded_bitstreams

"""
Author: Leonardo de Sousa Marques
Affiliation: Embedded Computing Lab (ECL), Federal University of Santa Catarina (UFSC)

Description:
Organize encoded light fields files (JPL, x265, ...) in the correct folders.
"""

import os
import shutil
import sys
from pathlib import Path

from lfc_toolkit.src.configuration.configuration_reader import \
    ConfigurationReader

# Add the parent directory to the Python path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))

[docs] def log_bundle_copy(lf_folder: Path, bitstream_file: Path) -> None: """Create a log entry for a copied bitstream file. Generates a log file documenting that a bitstream was copied from the bundle folder to the light field results folder. :param lf_folder: Path to the light field folder where bitstream was copied :type lf_folder: Path :param bitstream_file: Path to the original bitstream file in bundle :type bitstream_file: Path :return: None :rtype: None """ log_folder = lf_folder.parent / "logs" log_folder.mkdir(parents=True, exist_ok=True) log_file = log_folder / f"{bitstream_file.stem}.txt" with open(log_file, "w") as f: f.write(f"Bitstream '{bitstream_file.name}' was copied from the bundle folder {bitstream_file}.\n")
[docs] def organize_encoded_bitstreams(configuration: ConfigurationReader) -> None: """Organize encoded bitstream files into the correct directory structure. Copies encoded bitstreams from a bundle folder into per-codec, per-lightfield results directories, creating appropriate folder structure and log entries. :param configuration: Configuration reader with bundle and codec paths :type configuration: ConfigurationReader :return: None :rtype: None :raises Exception: If bundle path, codecs, or results paths are not configured """ try: bundle_bitstream_path = Path(configuration["bundled_bitstream_path"]) print(f"Bundle Bitstream Path: {bundle_bitstream_path}") except: raise Exception("Bundle bitstream path was not provided in the configuration file.") codecs = configuration["codecs"].get("run", None) if not codecs: raise Exception("No codecs were provided in the configuration file.") lightfields = configuration["lightfields"]["ctc"] for codec in codecs: print(f"Processing for {codec=}") results_path = Path(configuration["codecs"]["configuration"][codec].get("results", None)) if not results_path: raise Exception(f"{codec} results path was not provided.") results_path.mkdir(parents=True, exist_ok=True) print(f"Adding encoded bitstreams to {results_path}") for lf in lightfields: print(f"LF: {lf}") lf_folder = results_path / lf / "encoded" lf_folder.mkdir(parents=True, exist_ok=True) codec_bundle_folder = bundle_bitstream_path / codec if not codec_bundle_folder.is_dir(): raise Exception(f"No folder for {codec} found in bundle folder.") for bitstream_file in codec_bundle_folder.iterdir(): if bitstream_file.name.startswith(lf): dst = lf_folder / bitstream_file.name if dst.exists(): dst.unlink() shutil.copy2(src=bitstream_file, dst=dst) log_bundle_copy(lf_folder=lf_folder, bitstream_file=bitstream_file)
[docs] def main() -> None: """Entry point for organizing encoded bitstreams. Reads configuration from command line arguments and orchestrates the organization of encoded bitstream files into the correct folder structure. Usage: python organize_encoded_bitstreams.py <configuration.json> :return: None :rtype: None :raises Exception: If configuration file is not provided """ if not len(sys.argv) == 2: raise Exception("Error. Expecting a parameter with the name of the json configuration file.") cfg_file = sys.argv[1] print("Using the paths from", cfg_file) configuration = ConfigurationReader(user_configuration_filename=cfg_file) organize_encoded_bitstreams(configuration=configuration)
if __name__ == "__main__": main()