Source code for src.codecrepos.git_repository_cloner_and_builder

import multiprocessing
import os
import re
import subprocess
import warnings
from dataclasses import dataclass, field
from typing import List, Optional

import git
import numpy as np
from git import GitCommandError
from tqdm import tqdm

from .clone_progress import CloneProgress
from .docker_builder import DockerBuilder


[docs] @dataclass class GitRepositoryClonerAndBuilder: repository_address: str = "" tag: str = "master" local_path: str = "" build_tool: str = "cmake" cmake_path: str = ".." build_path: str = "build" build_options: List[str] = field(default_factory=list) use_docker_to_build: bool = True gcc_version_from_build: str = "Undefined" commit_hash: str = ""
[docs] def clone_and_checkout_tag(self) -> None: """Clones the repository if it does not exist and checks out the configured tag. :return: None :rtype: None """ if os.path.isdir(self.local_path) and os.path.isdir( os.path.join(self.local_path, ".git") ): print("Repository already exists. Opening the repository...") repo = git.Repo(self.local_path) else: print("Repository does not exist. Cloning...") repo = git.Repo.clone_from( self.repository_address, self.local_path, progress=CloneProgress() ) print("Done") repo.git.checkout(self.tag) try: current_tag = repo.git.describe("--tags", "--exact-match") assert self.tag == current_tag except GitCommandError: current_branch = repo.git.branch( "--contains", "HEAD", "--format=%(refname:short)" ) assert current_branch == self.tag self.commit_hash = repo.head.commit.hexsha
[docs] def build(self, num_cores: Optional[float] = None) -> None: """Builds the codec using the configured build tool (cmake or make). :param num_cores: Number of CPU cores for parallel build, defaults to all available :type num_cores: Optional[float] :return: None :rtype: None """ if not num_cores: num_cores = multiprocessing.cpu_count() else: num_cores = np.floor(num_cores) # To make sure we have an integer value build_dir = os.path.join(self.local_path, self.build_path) os.makedirs(build_dir, exist_ok=True) if self.build_tool == "cmake": cmake_command = ["cmake"] cmake_command.extend(self.build_options) cmake_command.append(self.cmake_path) build_command = f"cmake --build . -j{num_cores}".split() commands = [cmake_command, build_command] elif self.build_tool == "make": cmake_command = [] build_command = ["make"] build_command.extend(self.build_options) build_command.append(f"-j{num_cores}") commands = [build_command] subprocess_kwargs = dict() use_docker = self.use_docker_to_build original_commands = commands.copy() if use_docker: try: commands.insert(0, f"cd {self.build_path}".split()) commands = list(map(lambda command: ' '.join(command), commands)) commands = ' && '.join(commands) docker_builder = DockerBuilder() self.gcc_version_from_build = docker_builder.get_gcc_version() commands = [docker_builder.get_docker_build_command( repository_path=self.local_path, build_command=commands )] except Exception as e: warnings.warn( "Problem running Docker for building the codecs. Results may differ and not be valid for cross-checking according to the CTC.", RuntimeWarning, stacklevel=2 ) use_docker = False commands = original_commands if not use_docker: subprocess_kwargs = { "cwd": build_dir } res = subprocess.run( "gcc --version".split(), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, ) self.gcc_version_from_build = res.stdout.split('\n')[0] print(commands) progress_pattern = re.compile( r"\[\s*(\d+)%\]\s+(Building|Linking|Built)(\sCXX\sobject\s.*/|\s.*library\s|\starget\s)(.*$)" ) def remove_ansi_escape_sequences(text): ansi_escape_pattern = re.compile(r"\x1B\[[0-?9;]*[mK]") return ansi_escape_pattern.sub("", text) docker_failed = False try: for command in commands: print(f"Running command \'{command}\'") # Run make with progress bar with tqdm(total=100, unit="%", leave=True) as progress_bar: process = subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True, bufsize=1, **subprocess_kwargs ) for line in process.stdout: # Search for the percentage in each line line = remove_ansi_escape_sequences(line) match = progress_pattern.search(line.strip()) if match: percent_complete = int(match.group(1)) progress_bar.n = percent_complete # Update tqdm's current position progress_bar.refresh() # Refresh to show the updated progress action = match.group(2) target = match.group(4) progress_bar.set_postfix_str(f"{action} {target}") process.wait() # Wait for make to finish if process.returncode != 0: if use_docker and process.returncode in [125, 127]: warnings.warn( "Problem running Docker for building the codecs. Results may differ and not be valid for cross-checking according to the CTC.", RuntimeWarning, stacklevel=2 ) use_docker = False commands = original_commands docker_failed = True break else: raise subprocess.CalledProcessError(process.returncode, process.args) progress_bar.set_postfix_str("Build completed!") if docker_failed: subprocess_kwargs = {"cwd": build_dir} for command in commands: print(f"Running command \'{command}\'") with tqdm(total=100, unit="%", leave=True) as progress_bar: process = subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True, bufsize=1, **subprocess_kwargs ) for line in process.stdout: line = remove_ansi_escape_sequences(line) match = progress_pattern.search(line.strip()) if match: percent_complete = int(match.group(1)) progress_bar.n = percent_complete progress_bar.refresh() action = match.group(2) target = match.group(4) progress_bar.set_postfix_str(f"{action} {target}") process.wait() if process.returncode != 0: raise subprocess.CalledProcessError(process.returncode, process.args) progress_bar.set_postfix_str("Build completed!") except subprocess.CalledProcessError as e: print("Error ocurred while building:") print(e) print("Stdout: ") print(e.stdout) print("Stderr:") print(e.stderr) print("Return code:") print(e.returncode) print("Output:") print(e.output) print("Finished error messages") exit(1)