"""
Author: Leonardo de Sousa Marques
Affiliation: Embedded Computing Lab (ECL), Federal University of Santa Catarina (UFSC)
Description:
Organize encoded light fields files (JPL, x265, ...) in the correct folders.
"""
import os
import shutil
import sys
from pathlib import Path
from lfc_toolkit.src.configuration.configuration_reader import \
ConfigurationReader
# Add the parent directory to the Python path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
[docs]
def log_bundle_copy(lf_folder: Path, bitstream_file: Path) -> None:
"""Create a log entry for a copied bitstream file.
Generates a log file documenting that a bitstream was copied from
the bundle folder to the light field results folder.
:param lf_folder: Path to the light field folder where bitstream was copied
:type lf_folder: Path
:param bitstream_file: Path to the original bitstream file in bundle
:type bitstream_file: Path
:return: None
:rtype: None
"""
log_folder = lf_folder.parent / "logs"
log_folder.mkdir(parents=True, exist_ok=True)
log_file = log_folder / f"{bitstream_file.stem}.txt"
with open(log_file, "w") as f:
f.write(f"Bitstream '{bitstream_file.name}' was copied from the bundle folder {bitstream_file}.\n")
[docs]
def organize_encoded_bitstreams(configuration: ConfigurationReader) -> None:
"""Organize encoded bitstream files into the correct directory structure.
Copies encoded bitstreams from a bundle folder into per-codec, per-lightfield
results directories, creating appropriate folder structure and log entries.
:param configuration: Configuration reader with bundle and codec paths
:type configuration: ConfigurationReader
:return: None
:rtype: None
:raises Exception: If bundle path, codecs, or results paths are not configured
"""
try:
bundle_bitstream_path = Path(configuration["bundled_bitstream_path"])
print(f"Bundle Bitstream Path: {bundle_bitstream_path}")
except:
raise Exception("Bundle bitstream path was not provided in the configuration file.")
codecs = configuration["codecs"].get("run", None)
if not codecs:
raise Exception("No codecs were provided in the configuration file.")
lightfields = configuration["lightfields"]["ctc"]
for codec in codecs:
print(f"Processing for {codec=}")
results_path = Path(configuration["codecs"]["configuration"][codec].get("results", None))
if not results_path:
raise Exception(f"{codec} results path was not provided.")
results_path.mkdir(parents=True, exist_ok=True)
print(f"Adding encoded bitstreams to {results_path}")
for lf in lightfields:
print(f"LF: {lf}")
lf_folder = results_path / lf / "encoded"
lf_folder.mkdir(parents=True, exist_ok=True)
codec_bundle_folder = bundle_bitstream_path / codec
if not codec_bundle_folder.is_dir():
raise Exception(f"No folder for {codec} found in bundle folder.")
for bitstream_file in codec_bundle_folder.iterdir():
if bitstream_file.name.startswith(lf):
dst = lf_folder / bitstream_file.name
if dst.exists():
dst.unlink()
shutil.copy2(src=bitstream_file, dst=dst)
log_bundle_copy(lf_folder=lf_folder, bitstream_file=bitstream_file)
[docs]
def main() -> None:
"""Entry point for organizing encoded bitstreams.
Reads configuration from command line arguments and orchestrates
the organization of encoded bitstream files into the correct folder structure.
Usage: python organize_encoded_bitstreams.py <configuration.json>
:return: None
:rtype: None
:raises Exception: If configuration file is not provided
"""
if not len(sys.argv) == 2:
raise Exception("Error. Expecting a parameter with the name of the json configuration file.")
cfg_file = sys.argv[1]
print("Using the paths from", cfg_file)
configuration = ConfigurationReader(user_configuration_filename=cfg_file)
organize_encoded_bitstreams(configuration=configuration)
if __name__ == "__main__":
main()