"""
Author: Ismael Seidel (ismael.seidel@ufsc.br)
Affiliation: Embedded Computing Lab (ECL), Federal University of Santa Catarina (UFSC)
Description:
This module contains the `x265Wrapper` class, which extends the `CodecWrapper` class to provide
specific encoding and decoding functionality for the x265 codec.
"""
import json
import math
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union
from lfc_toolkit.src.quality.profile import (ExecutionMeasurements, RunProfile,
get_metrics_from_runs)
from lfc_toolkit.src.data_handlers.formatters import \
get_formatted_filename_for_lf
from lfc_toolkit.src.data_handlers.lightfield import (
EncodedLightField, RAW_Decoded_BT709_FR_YUV444p10le_LightField_Data,
RAWLightFieldData)
from .codec_wrapper import CodecWrapper
X265_FIRST_PASS_COMMAND_TEMPLATE = """
{x265_path}/x265
--input {yuv_filename}
--input-depth {bits_per_sample}
--input-csp {input_format}
--fps {fps}
--input-res {view_width}x{view_height}
--output-depth {bits_per_sample}
--profile {profile}
--tune {tune_metric}
--output /dev/null
--pass 1
--frame-threads 1
--stats {stats_file}
"""
X265_SECOND_PASS_COMMAND_TEMPLATE = """
{x265_path}/x265
--input {yuv_filename}
--input-depth {bits_per_sample}
--input-csp {input_format}
--fps {fps}
--input-res {view_width}x{view_height}
--output-depth {bits_per_sample}
--profile {profile}
--tune {tune_metric}
--{tune_metric}
--output {encoded_filename}
--bitrate {bitrate}
--pass 2
--log-level 4
--frame-threads 1
--csv {csv_filename}
--stats {stats_file}
"""
FFMPEG_HEVC_DECODE_TEMPLATE = """
ffmpeg
-y
-i {input_h265}
{output_filename}
"""
[docs]
class x265Wrapper(CodecWrapper):
[docs]
def __init__(
self,
codec: str,
codec_path: Union[str, Path],
results_path: Union[str, Path],
encoded_extension: str,
bpp_to_actual_kbps_map_path: Union[str, Path],
clear_log: bool = False,
repetitions: int = 1,
force_encoding: bool = False,
num_cores: int = None,
fps: int = 30,
tune_metric: str = "psnr",
repository=None
):
"""Initializes the x265Wrapper instance with the given parameters.
:param codec: Name of the codec
:type codec: str
:param codec_path: Path to the codec binary
:type codec_path: Union[str, Path]
:param results_path: Path to store results
:type results_path: Union[str, Path]
:param encoded_extension: File extension for encoded files
:type encoded_extension: str
:param bpp_to_actual_kbps_map_path: Path to the bpp-to-kbps mapping file
:type bpp_to_actual_kbps_map_path: Union[str, Path]
:param clear_log: Whether to clear logs after execution, defaults to False
:type clear_log: bool, optional
:param repetitions: Number of repetitions for encoding/decoding, defaults to 1
:type repetitions: int, optional
:param force_encoding: Whether to force encoding even if results exist, defaults to False
:type force_encoding: bool, optional
:param num_cores: Number of cores to use for encoding/decoding, defaults to None
:type num_cores: int, optional
:param fps: Frames per second, defaults to 30
:type fps: int, optional
:param tune_metric: Metric to tune for encoding, defaults to "psnr"
:type tune_metric: str, optional
:param repository: Repository object for managing codec source, defaults to None
:type repository: optional
"""
CodecWrapper.__init__(
self,
codec=codec,
codec_path=codec_path,
results_path=results_path,
clear_log=clear_log,
repetitions=repetitions,
encoded_extension=encoded_extension,
force_encoding=force_encoding,
num_cores=num_cores,
repository=repository,
)
self.tune_metric: str = tune_metric
self.fps: int = fps
with open(bpp_to_actual_kbps_map_path, "r") as file:
x265_target_kbps = json.load(file)
self.bpp_to_actual_kbps_map = x265_target_kbps
[docs]
def get_target_bitrate_kbps_from_bpp(
self, bpp: float, raw_lightfield: RAWLightFieldData
) -> int:
"""Calculates the target bitrate in kbps for the given bpp value.
:param bpp: Target bits per pixel value
:type bpp: float
:param raw_lightfield: The raw light field data
:type raw_lightfield: RAWLightFieldData
:return: Target bitrate in kbps
:rtype: int
"""
return math.ceil(
(raw_lightfield.get_number_of_pixels_per_view() * bpp * self.fps) / 1000
)
[docs]
def get_stats_path(self, raw_lightfield: RAWLightFieldData) -> Path:
"""Gets the path for storing x265 statistics files.
:param raw_lightfield: The raw light field data
:type raw_lightfield: RAWLightFieldData
:return: Path to the stats directory
:rtype: Path
"""
return self.results_path / f"{raw_lightfield.name}/stats"
[docs]
def get_csv_path(self, raw_lightfield: RAWLightFieldData) -> Path:
"""Gets the path for storing x265 CSV output files.
:param raw_lightfield: The raw light field data
:type raw_lightfield: RAWLightFieldData
:return: Path to the CSV directory
:rtype: Path
"""
return self.results_path / f"{raw_lightfield.name}/csv"
[docs]
def create_required_paths(self, raw_lightfield: RAWLightFieldData) -> None:
"""Creates the required directories for storing stats, encoded, and CSV data.
:param raw_lightfield: The raw light field data
:type raw_lightfield: RAWLightFieldData
:return: None
:rtype: None
"""
self.get_stats_path(raw_lightfield).mkdir(parents=True, exist_ok=True)
self.get_encoded_path(raw_lightfield).mkdir(parents=True, exist_ok=True)
self.get_csv_path(raw_lightfield).mkdir(parents=True, exist_ok=True)
[docs]
def encode_lightfield_for_target_bpps(
self,
raw_lightfield: RAWLightFieldData,
bpps: List[float],
) -> List[EncodedLightField]:
"""Encodes the light field for the specified target bits per pixel (bpp) values.
:param raw_lightfield: The raw light field data
:type raw_lightfield: RAWLightFieldData
:param bpps: List of target bpp values
:type bpps: List[float]
:return: List of encoded light fields
:rtype: List[EncodedLightField]
"""
self.create_required_paths(raw_lightfield)
lf_name = raw_lightfield.name
self.results.setdefault(
lf_name, {
"raw_lightfield": raw_lightfield,
"target_bpps": {
bpp: {
"encoded": None,
"profile_encoder": None,
"profile_decoder": None
} for bpp in bpps
}
}
)
stats_filename = get_formatted_filename_for_lf(
path=self.get_stats_path(raw_lightfield=raw_lightfield),
lightfield=raw_lightfield,
bpp=None,
file_extension="log",
)
results = list()
bpp_required = list()
bpp_to_encoded_filename = dict()
for bpp in bpps:
self.results[lf_name]["target_bpps"][bpp] = {
"encoded": None,
"profile_encoder": None,
"profile_decoder": None
}
log_filename = get_formatted_filename_for_lf(
path=self.get_logs_path(raw_lightfield=raw_lightfield),
lightfield=raw_lightfield,
bpp=bpp,
file_extension="txt",
)
encoded_filename = get_formatted_filename_for_lf(
path=self.get_encoded_path(raw_lightfield=raw_lightfield),
lightfield=raw_lightfield,
bpp=bpp,
file_extension=self.encoded_extension,
)
print(f"{encoded_filename=}")
bpp_to_encoded_filename[bpp] = encoded_filename
if not self.force_encoding and encoded_filename.is_file():
print(f"Encoded file {encoded_filename} already exists. Skipping... ")
else:
bpp_required.append(bpp)
if len(bpp_required) > 0:
print(f"Running x265 first pass.")
pass1_measurements = self._run_first_pass(bpp=bpp,
raw_lightfield=raw_lightfield,
stats_filename=stats_filename,
log_filename=log_filename)
for bpp, encoded_filename in bpp_to_encoded_filename.items():
bitrate_kbps = self.bpp_to_actual_kbps_map[raw_lightfield.name].get(str(bpp), None)
if not bitrate_kbps:
print("Inexistent bitrate in the configuration file.")
continue
if bpp in bpp_required:
print(f"Running x265 second pass for bpp {bpp}")
pass2_measurements = self._run_second_pass(raw_lightfield=raw_lightfield,
encoded_lightfield=None,
stats_filename=stats_filename,
bpp=bpp,
bitrate_kbps=bitrate_kbps,
log_filename=log_filename)
combined_measurements = self.combine_measurements(pass1_measurements=pass1_measurements, pass2_measurements=pass2_measurements)
self.results[lf_name]["target_bpps"][bpp]["profile_encoder"] = {
"encoder_first_pass": pass1_measurements,
"encoder_second_pass": pass2_measurements,
"encoder": combined_measurements
}
# Move the file size calculation to after second pass execution
_, obtained_bpp = self.compute_bytes_and_bpp(
encoded_filename, raw_lightfield
)
encoded_lightfield = EncodedLightField(
raw_lightfield.copy(),
encoded_path=encoded_filename,
target_bitrate=bpp,
actual_bitrate=obtained_bpp,
encoder_name="x265",
)
results.append(encoded_lightfield)
self.results[lf_name]["target_bpps"][bpp]["encoded"] = {
"encoded_lf": encoded_lightfield,
"stats_filename": stats_filename,
}
return results
def _run_first_pass(
self,
bpp: float,
raw_lightfield: RAWLightFieldData,
stats_filename: Union[str, Path],
log_filename: Optional[Path] = None
) -> ExecutionMeasurements:
"""Runs the x265 first pass to collect statistics for rate control.
:param bpp: Target bits per pixel value
:type bpp: float
:param raw_lightfield: The raw light field data
:type raw_lightfield: RAWLightFieldData
:param stats_filename: Path to the stats file for two-pass encoding
:type stats_filename: Union[str, Path]
:param log_filename: Path to the log file, defaults to None
:type log_filename: Path, optional
:return: Execution measurements for the first pass
:rtype: ExecutionMeasurements
"""
command = X265_FIRST_PASS_COMMAND_TEMPLATE.format(
x265_path=(self.codec_path / "build"),
yuv_filename=raw_lightfield.raw_path,
bits_per_sample=10,
input_format="i444",
fps=self.fps,
view_width=raw_lightfield.view_width,
view_height=raw_lightfield.view_height,
profile="main444-10",
tune_metric=self.tune_metric,
stats_file=stats_filename,
).split()
execution_measurements = self.execute_command(bpp=bpp,
command=command,
cwd=(self.codec_path / "build"),
log_filename=log_filename)
return execution_measurements
def _run_second_pass(
self,
raw_lightfield: RAWLightFieldData,
encoded_lightfield: EncodedLightField,
stats_filename: Union[str, Path],
bpp: float,
bitrate_kbps: float,
log_filename: Optional[Path] = None
) -> ExecutionMeasurements:
"""Runs the x265 second pass to produce the encoded output using statistics from the first pass.
:param raw_lightfield: The raw light field data
:type raw_lightfield: RAWLightFieldData
:param encoded_lightfield: The encoded light field (may be None during encoding)
:type encoded_lightfield: EncodedLightField
:param stats_filename: Path to the stats file from the first pass
:type stats_filename: Union[str, Path]
:param bpp: Target bits per pixel value
:type bpp: float
:param bitrate_kbps: Target bitrate in kilobits per second
:type bitrate_kbps: float
:param log_filename: Path to the log file, defaults to None
:type log_filename: Path, optional
:return: Execution measurements for the second pass
:rtype: ExecutionMeasurements
"""
encoded_filename = get_formatted_filename_for_lf(
path=self.get_encoded_path(raw_lightfield=raw_lightfield),
lightfield=raw_lightfield,
bpp=bpp,
file_extension="h265",
)
csv_filename = get_formatted_filename_for_lf(
path=self.get_csv_path(raw_lightfield=raw_lightfield),
lightfield=raw_lightfield,
bpp=bpp,
file_extension="csv",
)
csv_filename.unlink(missing_ok=True)
assert raw_lightfield.bits_per_sample == 10
command = X265_SECOND_PASS_COMMAND_TEMPLATE.format(
x265_path=(self.codec_path / "build"),
yuv_filename=raw_lightfield.raw_path,
bits_per_sample=raw_lightfield.bits_per_sample,
input_format="i444",
fps=self.fps,
view_width=raw_lightfield.view_width,
view_height=raw_lightfield.view_height,
profile="main444-10",
tune_metric=self.tune_metric,
stats_file=stats_filename,
bitrate=bitrate_kbps,
encoded_filename=encoded_filename,
csv_filename=csv_filename,
).split()
executuion_measurements = self.execute_command(bpp=bpp,
lightfield=encoded_lightfield,
command=command,
cwd=(self.codec_path / "build"),
log_filename=log_filename)
return executuion_measurements
[docs]
def combine_measurements(self, pass1_measurements: ExecutionMeasurements, pass2_measurements: ExecutionMeasurements) -> ExecutionMeasurements:
"""Combines first-pass and second-pass measurements into a single ExecutionMeasurements object.
:param pass1_measurements: Execution measurements from the first pass
:type pass1_measurements: ExecutionMeasurements
:param pass2_measurements: Execution measurements from the second pass
:type pass2_measurements: ExecutionMeasurements
:return: Combined execution measurements with summed durations
:rtype: ExecutionMeasurements
"""
# Verify both passes have the same number of runs
if len(pass1_measurements.runs) != len(pass2_measurements.runs):
raise ValueError("First and second pass run lists have different lengths.")
combined_runs = []
# Combine corresponding runs from both passes
for run1, run2 in zip(pass1_measurements.runs, pass2_measurements.runs):
new_run = RunProfile(
duration=run1.duration + run2.duration,
start_time=run1.start_time, # Using first pass start time
end_time=run2.end_time, # Using second pass end time
success=run1.success and run2.success
)
combined_runs.append(new_run)
# Calculate metrics for the combined runs
combined = get_metrics_from_runs(combined_runs)
return combined
[docs]
def decode(self, encoded_lightfield: EncodedLightField) -> RAW_Decoded_BT709_FR_YUV444p10le_LightField_Data:
"""Decodes an encoded light field using ffmpeg to decompress HEVC to YUV.
:param encoded_lightfield: The encoded light field to decode
:type encoded_lightfield: EncodedLightField
:return: Decoded raw light field data in YUV format
:rtype: RAWLightFieldData
"""
decoded_path = self.get_decoded_path(encoded_lightfield) / "yuv"
decoded_path.mkdir(parents=True, exist_ok=True)
lf_name = encoded_lightfield.name
bpp = encoded_lightfield.target_bitrate
self.results.setdefault(lf_name, {"target_bpps": dict()})
self.results[lf_name]["target_bpps"].setdefault(bpp, dict())
log_filename = get_formatted_filename_for_lf(
path=self.get_logs_path(raw_lightfield=encoded_lightfield),
lightfield=encoded_lightfield,
bpp=bpp,
file_extension="txt",
)
encoded_filename = encoded_lightfield.encoded_path
output_yuv_filename = get_formatted_filename_for_lf(
decoded_path,
lightfield=encoded_lightfield,
bpp=encoded_lightfield.target_bitrate,
file_extension="yuv",
)
output_decoded_raw_lf = RAW_Decoded_BT709_FR_YUV444p10le_LightField_Data(
lightfield=encoded_lightfield.copy(),
decoded_filename=output_yuv_filename,
bpp_for_naming=encoded_lightfield.target_bitrate,
)
command = FFMPEG_HEVC_DECODE_TEMPLATE.format(
input_h265=encoded_filename,
output_filename=output_yuv_filename,
).split()
execution_measurements = self.execute_command(bpp=bpp,
lightfield=output_decoded_raw_lf,
command=command,
log_filename=log_filename)
self.results[lf_name]["target_bpps"][bpp]["decoded"] = output_decoded_raw_lf
self.results[lf_name]["target_bpps"][bpp]["profile_decoder"] = {
"decoder": execution_measurements
}
return output_decoded_raw_lf