From d21f41448f3fa414e60799d99e2d14b5a18aaf28 Mon Sep 17 00:00:00 2001 From: VAN BOSSUYT Nicolas Date: Sun, 26 Jun 2022 00:22:53 +0200 Subject: [PATCH] Initial commit. --- .mypyconfig | 8 ++ __main__.py | 194 +++++++++++++++++++++++++++++++++++++++++ build.py | 117 +++++++++++++++++++++++++ environments.py | 115 +++++++++++++++++++++++++ manifests.py | 166 +++++++++++++++++++++++++++++++++++ ninja.py | 220 +++++++++++++++++++++++++++++++++++++++++++++++ requirements.txt | 1 + utils.py | 189 ++++++++++++++++++++++++++++++++++++++++ 8 files changed, 1010 insertions(+) create mode 100644 .mypyconfig create mode 100755 __main__.py create mode 100644 build.py create mode 100644 environments.py create mode 100644 manifests.py create mode 100644 ninja.py create mode 100644 requirements.txt create mode 100644 utils.py diff --git a/.mypyconfig b/.mypyconfig new file mode 100644 index 0000000..d521d00 --- /dev/null +++ b/.mypyconfig @@ -0,0 +1,8 @@ +[mypy] +disallow_untyped_defs = True +disallow_any_unimported = True +no_implicit_optional = True +check_untyped_defs = True +warn_return_any = True +show_error_codes = True +warn_unused_ignores = True diff --git a/__main__.py b/__main__.py new file mode 100755 index 0000000..e877f0c --- /dev/null +++ b/__main__.py @@ -0,0 +1,194 @@ +import shutil +import sys +import os +import random + + +import build +import utils +from utils import Colors +import environments + + +CMDS = {} + + +def parseOptions(args: list[str]) -> dict: + result = { + 'opts': {}, + 'args': [] + } + + for arg in args: + if arg.startswith("--"): + if "=" in arg: + key, value = arg[2:].split("=", 1) + result['opts'][key] = value + else: + result['opts'][arg[2:]] = True + else: + result['args'].append(arg) + + return result + + +def runCmd(opts: dict, args: list[str]) -> None: + if len(args) == 0: + print(f"Usage: {sys.argv[0]} run ") + sys.exit(1) + + out = build.buildOne(opts.get('env', 'host-clang'), args[0]) + + print(f"{Colors.BOLD}Running: {args[0]}{Colors.RESET}") + utils.runCmd(out, *args[1:]) + + +def kvmAvailable() -> bool: + if os.path.exists("/dev/kvm") and \ + os.access("/dev/kvm", os.R_OK): + return True + return False + + +BOOTAGENT = "loader" + + +def bootCmd(opts: dict, args: list[str]) -> None: + imageDir = utils.mkdirP(".build/image") + efiBootDir = utils.mkdirP(".build/image/EFI/BOOT") + bootDir = utils.mkdirP(".build/image/boot") + + ovmf = utils.downloadFile( + "https://retrage.github.io/edk2-nightly/bin/DEBUGX64_OVMF.fd") + hjert = build.buildOne("kernel-x86_64", "hjert") + shutil.copy(hjert, f"{bootDir}/kernel.elf") + + if BOOTAGENT == "loader": + loader = build.buildOne("efi-x86_64", "loader") + shutil.copy(loader, f"{efiBootDir}/BOOTX64.EFI") + elif BOOTAGENT == "limine": + limine = utils.downloadFile( + "https://github.com/limine-bootloader/limine/raw/v3.0-branch-binary/BOOTX64.EFI") + limineSys = utils.downloadFile( + "https://github.com/limine-bootloader/limine/raw/v3.0-branch-binary/limine.sys") + shutil.copy(limineSys, f"{bootDir}/limine.sys") + shutil.copy('meta/images/limine-x86_64/limine.cfg', + f"{bootDir}/limine.cfg") + shutil.copy(limine, f"{efiBootDir}/BOOTX64.EFI") + + qemuCmd = [ + "qemu-system-x86_64", + "-no-reboot", + "-d", "guest_errors", + "-serial", "mon:stdio", + "-bios", ovmf, + "-m", "256M", + "-smp", "4", + "-drive", f"file=fat:rw:{imageDir},media=disk,format=raw", + ] + + if kvmAvailable(): + qemuCmd += ["-enable-kvm"] + else: + print("KVM not available, using QEMU-TCG") + + utils.runCmd(*qemuCmd) + + +def buildCmd(opts: dict, args: list[str]) -> None: + env = opts.get('env', 'host-clang') + + if len(args) == 0: + build.buildAll(env) + else: + for component in args: + build.buildOne(env, component) + + +def cleanCmd(opts: dict, args: list[str]) -> None: + shutil.rmtree(".build", ignore_errors=True) + + +def nukeCmd(opts: dict, args: list[str]) -> None: + shutil.rmtree(".build", ignore_errors=True) + shutil.rmtree(".cache", ignore_errors=True) + + +def idCmd(opts: dict, args: list[str]) -> None: + i = hex(random.randint(0, 2**64)) + print("64bit: " + i) + print("32bit: " + i[:10]) + + +def helpCmd(opts: dict, args: list[str]) -> None: + print(f"Usage: {sys.argv[0]} [options...] []") + print("") + + print("Description:") + print(" The skift operating system build system.") + print("") + + print("Commands:") + for cmd in CMDS: + print(" " + cmd + " - " + CMDS[cmd]["desc"]) + print("") + + print("Enviroments:") + for env in environments.available(): + print(" " + env) + print("") + + print("Variants:") + for var in environments.VARIANTS: + print(" " + var) + print("") + + +CMDS = { + "run": { + "func": runCmd, + "desc": "Run a component on the host", + }, + "boot": { + "func": bootCmd, + "desc": "Boot a component in a QEMU instance", + }, + "build": { + "func": buildCmd, + "desc": "Build one or more components", + }, + "clean": { + "func": cleanCmd, + "desc": "Clean the build directory", + }, + "nuke": { + "func": nukeCmd, + "desc": "Clean the build directory and cache", + }, + "id": { + "func": idCmd, + "desc": "Generate a 64bit random id", + }, + "help": { + "func": helpCmd, + "desc": "Show this help message", + }, +} + +if __name__ == "__main__": + try: + if len(sys.argv) < 2: + helpCmd({}, []) + else: + o = parseOptions(sys.argv[2:]) + if not sys.argv[1] in CMDS: + print(f"Unknown command: {sys.argv[1]}") + print("") + print(f"Use '{sys.argv[0]} help' for a list of commands") + sys.exit(1) + CMDS[sys.argv[1]]["func"](o['opts'], o['args']) + sys.exit(0) + except utils.CliException as e: + print() + print(f"{Colors.RED}{e.msg}{Colors.RESET}") + sys.exit(1) diff --git a/build.py b/build.py new file mode 100644 index 0000000..f9d20e5 --- /dev/null +++ b/build.py @@ -0,0 +1,117 @@ +from os import environ +from typing import TextIO, Tuple +import json + +import ninja +import manifests as m +import environments as e +import copy +import utils +from utils import Colors + + +def genNinja(out: TextIO, manifests: dict, env: dict) -> None: + env = copy.deepcopy(env) + + env["cflags"] += [m.cincludes(manifests)] + env["cxxflags"] += [m.cincludes(manifests)] + + writer = ninja.Writer(out) + + writer.comment("Generated by the meta build system") + writer.newline() + + writer.comment("Environment:") + for key in env: + if isinstance(env[key], list): + writer.variable(key, " ".join(env[key])) + else: + writer.variable(key, env[key]) + writer.newline() + + writer.comment("Rules:") + writer.rule( + "cc", "$cc -c -o $out $in -MD -MF $out.d $cflags", depfile="$out.d") + writer.rule( + "cxx", "$cxx -c -o $out $in -MD -MF $out.d $cxxflags", depfile="$out.d") + writer.rule("ld", "$ld -o $out $in $ldflags") + writer.rule("ar", "$ar crs $out $in") + writer.rule("as", "$as -o $out $in $asflags") + writer.newline() + + writer.comment("Build:") + all = [] + for key in manifests: + item = manifests[key] + + writer.comment("Project: " + item["id"]) + + for obj in item["objs"]: + if obj[1].endswith(".c"): + writer.build(obj[0], "cc", obj[1]) + elif obj[1].endswith(".cpp"): + writer.build(obj[0], "cxx", obj[1]) + elif obj[1].endswith(".s"): + writer.build(obj[0], "as", obj[1]) + + writer.newline() + + objs = [x[0] for x in item["objs"]] + + if item["type"] == "lib": + writer.build(item["out"], "ar", objs) + else: + objs = objs + item["libs"] + writer.build(item["out"], "ld", objs) + + all.append(item["out"]) + + writer.newline() + + writer.comment("Phony:") + writer.build("all", "phony", all) + + +def prepare(envName: str) -> Tuple[dict, dict]: + env = e.load(envName) + manifests = m.loadAll("./src", env) + utils.mkdirP(env["dir"]) + genNinja(open(env["ninjafile"], "w"), manifests, env) + + meta = {} + meta["id"] = envName + meta["type"] = "artifacts" + meta["components"] = manifests + meta["toolchain"] = env + + with open(env["dir"] + "/manifest.json", "w") as f: + json.dump(meta, f, indent=4) + + return env, manifests + + +def buildAll(envName: str) -> None: + environment, _ = prepare(envName) + print(f"{Colors.BOLD}Building all targets for {envName}{Colors.RESET}") + try: + utils.runCmd("ninja", "-j", "1", "-f", environment["ninjafile"]) + except: + raise utils.CliException( + "Failed to build all for " + environment["key"]) + + +def buildOne(envName: str, target: str) -> str: + print(f"{Colors.BOLD}Building {target} for {envName}{Colors.RESET}") + environment, manifests = prepare(envName) + + if not target in manifests: + raise utils.CliException("Unknown target: " + target) + + try: + utils.runCmd("ninja", "-j", "1", "-f", + environment["ninjafile"], manifests[target]["out"]) + except: + raise utils.CliException( + f"Failed to build {target} for {environment['key']}") + + return manifests[target]["out"] diff --git a/environments.py b/environments.py new file mode 100644 index 0000000..60c7077 --- /dev/null +++ b/environments.py @@ -0,0 +1,115 @@ + +import copy +import os + +import utils + + +PASSED_TO_BUILD = [ + "toolchain", "arch", "sub", "vendor", "sys", "abi", "encoding", "freestanding", "variant"] + + +def enableCache(env: dict) -> dict: + env = copy.deepcopy(env) + env["cc"] = "ccache " + env["cc"] + env["cxx"] = "ccache " + env["cxx"] + return env + + +def enableSan(env: dict) -> dict: + if (env["freestanding"]): + return env + env = copy.deepcopy(env) + env["cflags"] += ["-fsanitize=address", "-fsanitize=undefined"] + env["cxxflags"] += ["-fsanitize=address", "-fsanitize=undefined"] + env["ldflags"] += ["-fsanitize=address", "-fsanitize=undefined"] + return env + + +def enableColors(env: dict) -> dict: + env = copy.deepcopy(env) + if (env["toolchain"] == "clang"): + env["cflags"] += ["-fcolor-diagnostics"] + env["cxxflags"] += ["-fcolor-diagnostics"] + elif (env["toolchain"] == "gcc"): + env["cflags"] += ["-fdiagnostics-color=alaways"] + env["cxxflags"] += ["-fdiagnostics-color=always"] + + return env + + +def enableOptimizer(env: dict, level: str) -> dict: + env = copy.deepcopy(env) + env["cflags"] += ["-O%s" % level] + env["cxxflags"] += ["-O%s" % level] + return env + + +def available() -> list: + return [file.removesuffix(".json") for file in os.listdir("meta/toolchains") if file.endswith(".json")] + + +VARIANTS = ["debug", "devel", "release", "sanatize"] + + +def load(env: str) -> dict: + variant = "devel" + if ":" in env: + env, variant = env.split(":") + + if not env in available(): + raise utils.CliException(f"Environment '{env}' not available") + + if not variant in VARIANTS: + raise utils.CliException(f"Variant '{variant}' not available") + + result = utils.loadJson(f"meta/toolchains/{env}.json") + result["variant"] = variant + + for key in PASSED_TO_BUILD: + if isinstance(result[key], bool): + if result[key]: + result["cflags"] += [f"-D__sdk_{key}__"] + result["cxxflags"] += [f"-D__sdk_{key}__"] + else: + result["cflags"] += [f"-D__sdk_{key}_{result[key]}__"] + result["cxxflags"] += [f"-D__sdk_{key}_{result[key]}__"] + + result["cflags"] += [ + "-std=gnu2x", + "-Isrc", + "-Wall", + "-Wextra", + "-Werror" + ] + + result["cxxflags"] += [ + "-std=gnu++2b", + "-Isrc", + "-Wall", + "-Wextra", + "-Werror", + "-fno-exceptions", + "-fno-rtti" + ] + + result["hash"] = utils.objSha256(result, PASSED_TO_BUILD) + result["key"] = utils.objKey(result, PASSED_TO_BUILD) + result["dir"] = f".build/{result['hash'][:8]}" + result["bindir"] = f"{result['dir']}/bin" + result["objdir"] = f"{result['dir']}/obj" + result["ninjafile"] = result["dir"] + "/build.ninja" + + result = enableColors(result) + + if variant == "debug": + result = enableOptimizer(result, "g") + elif variant == "devel": + result = enableOptimizer(result, "2") + elif variant == "release": + result = enableOptimizer(result, "3") + elif variant == "sanatize": + result = enableOptimizer(result, "g") + result = enableSan(result) + + return result diff --git a/manifests.py b/manifests.py new file mode 100644 index 0000000..f9d5902 --- /dev/null +++ b/manifests.py @@ -0,0 +1,166 @@ +import os +import json +import copy + +import utils + + +def loadJsons(basedir: str) -> dict: + result = {} + for root, dirs, files in os.walk(basedir): + for filename in files: + if filename == 'manifest.json': + filename = os.path.join(root, filename) + try: + with open(filename) as f: + manifest = json.load(f) + manifest["dir"] = os.path.dirname(filename) + result[manifest["id"]] = manifest + except Exception as e: + raise utils.CliException( + f"Failed to load manifest {filename}: {e}") + + return result + + +def filter(manifests: dict, env: dict) -> dict: + result = {} + for id in manifests: + manifest = manifests[id] + accepted = True + + if "requires" in manifest: + for req in manifest["requires"]: + if not env[req] in manifest["requires"][req]: + accepted = False + break + + if accepted: + result[id] = manifest + + return result + + +def doInjects(manifests: dict) -> dict: + manifests = copy.deepcopy(manifests) + for key in manifests: + item = manifests[key] + if "inject" in item: + for inject in item["inject"]: + if inject in manifests: + manifests[inject]["deps"].append(key) + return manifests + + +def resolveDeps(manifests: dict) -> dict: + manifests = copy.deepcopy(manifests) + + def resolve(key: str, stack: list[str] = []) -> list[str]: + result: list[str] = [] + if key in stack: + raise utils.CliException("Circular dependency detected: " + + str(stack) + " -> " + key) + + if not key in manifests: + raise utils.CliException("Unknown dependency: " + key) + + if "deps" in manifests[key]: + stack.append(key) + result.extend(manifests[key]["deps"]) + for dep in manifests[key]["deps"]: + result += resolve(dep, stack) + stack.pop() + return result + + for key in manifests: + manifests[key]["deps"] = utils.stripDups(resolve(key)) + + return manifests + + +def findFiles(manifests: dict) -> dict: + manifests = copy.deepcopy(manifests) + + for key in manifests: + item = manifests[key] + path = manifests[key]["dir"] + testsPath = os.path.join(path, "tests") + assetsPath = os.path.join(path, "assets") + + item["tests"] = utils.findFiles(testsPath, [".c", ".cpp"]) + item["srcs"] = utils.findFiles(path, [".c", ".cpp", ".s"]) + item["assets"] = utils.findFiles(assetsPath) + + return manifests + + +def prepareTests(manifests: dict) -> dict: + if not "tests" in manifests: + return manifests + manifests = copy.deepcopy(manifests) + tests = manifests["tests"] + + for key in manifests: + item = manifests[key] + if "tests" in item and len(item["tests"]) > 0: + tests["deps"] += [item["id"]] + tests["srcs"] += item["tests"] + + return manifests + + +def prepareInOut(manifests: dict, env: dict) -> dict: + manifests = copy.deepcopy(manifests) + for key in manifests: + item = manifests[key] + basedir = os.path.dirname(item["dir"]) + + item["objs"] = [(x.replace(basedir, env["objdir"] + "/") + ".o", x) + for x in item["srcs"]] + + if item["type"] == "lib": + item["out"] = env["bindir"] + "/" + key + ".a" + elif item["type"] == "exe": + item["out"] = env["bindir"] + "/" + key + else: + raise utils.CliException("Unknown type: " + item["type"]) + + for key in manifests: + item = manifests[key] + item["libs"] = [manifests[x]["out"] + for x in item["deps"] if manifests[x]["type"] == "lib"] + return manifests + + +def cincludes(manifests: dict) -> str: + include_paths = [] + + for key in manifests: + item = manifests[key] + if "root-include" in item: + include_paths.append(item["dir"]) + + if len(include_paths) == 0: + return "" + + return " -I" + " -I".join(include_paths) + + +cache: dict = {} + + +def loadAll(basedir: str, env: dict) -> dict: + cacheKey = basedir + ":" + env["id"] + if cacheKey in cache: + return cache[cacheKey] + + manifests = loadJsons(basedir) + manifests = filter(manifests, env) + manifests = doInjects(manifests) + manifests = resolveDeps(manifests) + manifests = findFiles(manifests) + manifests = prepareTests(manifests) + manifests = prepareInOut(manifests, env) + + cache[cacheKey] = manifests + return manifests diff --git a/ninja.py b/ninja.py new file mode 100644 index 0000000..c38f42f --- /dev/null +++ b/ninja.py @@ -0,0 +1,220 @@ +#!/usr/bin/python + +# Copyright 2011 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +"""Python module for generating .ninja files. + +Note that this is emphatically not a required piece of Ninja; it's +just a helpful utility for build-file-generation systems that already +use Python. +""" + +import re +import textwrap +from typing import Any, TextIO, Union + + +def escape_path(word: str) -> str: + return word.replace('$ ', '$$ ').replace(' ', '$ ').replace(':', '$:') + + +VarValue = Union[int, str, list[str], None] +VarPath = Union[str, list[str], None] + + +class Writer(object): + def __init__(self, output: TextIO, width: int = 78): + self.output = output + self.width = width + + def newline(self) -> None: + self.output.write('\n') + + def comment(self, text: str) -> None: + for line in textwrap.wrap(text, self.width - 2, break_long_words=False, + break_on_hyphens=False): + self.output.write('# ' + line + '\n') + + def variable(self, key: str, value: VarValue, indent: int = 0) -> None: + if value is None: + return + if isinstance(value, list): + value = ' '.join(filter(None, value)) # Filter out empty strings. + self._line('%s = %s' % (key, value), indent) + + def pool(self, name: str, depth: int) -> None: + self._line('pool %s' % name) + self.variable('depth', depth, indent=1) + + def rule(self, + name: str, + command: VarValue, + description: Union[str, None] = None, + depfile: VarValue = None, + generator: VarValue = False, + pool: VarValue = None, + restat: bool = False, + rspfile: VarValue = None, + rspfile_content: VarValue = None, + deps: VarValue = None) -> None: + self._line('rule %s' % name) + self.variable('command', command, indent=1) + if description: + self.variable('description', description, indent=1) + if depfile: + self.variable('depfile', depfile, indent=1) + if generator: + self.variable('generator', '1', indent=1) + if pool: + self.variable('pool', pool, indent=1) + if restat: + self.variable('restat', '1', indent=1) + if rspfile: + self.variable('rspfile', rspfile, indent=1) + if rspfile_content: + self.variable('rspfile_content', rspfile_content, indent=1) + if deps: + self.variable('deps', deps, indent=1) + + def build(self, + outputs: Union[str, list[str]], + rule: str, + inputs: Union[VarPath, None], + implicit: VarPath = None, + order_only: VarPath = None, + variables: Union[dict[str, str], None] = None, + implicit_outputs: VarPath = None, + pool: Union[str, None] = None, + dyndep: Union[str, None] = None) -> list[str]: + outputs = as_list(outputs) + out_outputs = [escape_path(x) for x in outputs] + all_inputs = [escape_path(x) for x in as_list(inputs)] + + if implicit: + implicit = [escape_path(x) for x in as_list(implicit)] + all_inputs.append('|') + all_inputs.extend(implicit) + if order_only: + order_only = [escape_path(x) for x in as_list(order_only)] + all_inputs.append('||') + all_inputs.extend(order_only) + if implicit_outputs: + implicit_outputs = [escape_path(x) + for x in as_list(implicit_outputs)] + out_outputs.append('|') + out_outputs.extend(implicit_outputs) + + self._line('build %s: %s' % (' '.join(out_outputs), + ' '.join([rule] + all_inputs))) + if pool is not None: + self._line(' pool = %s' % pool) + if dyndep is not None: + self._line(' dyndep = %s' % dyndep) + + if variables: + iterator = iter(variables.items()) + + for key, val in iterator: + self.variable(key, val, indent=1) + + return outputs + + def include(self, path: str) -> None: + self._line('include %s' % path) + + def subninja(self, path: str) -> None: + self._line('subninja %s' % path) + + def default(self, paths: VarPath) -> None: + self._line('default %s' % ' '.join(as_list(paths))) + + def _count_dollars_before_index(self, s: str, i: int) -> int: + """Returns the number of '$' characters right in front of s[i].""" + dollar_count = 0 + dollar_index = i - 1 + while dollar_index > 0 and s[dollar_index] == '$': + dollar_count += 1 + dollar_index -= 1 + return dollar_count + + def _line(self, text: str, indent: int = 0) -> None: + """Write 'text' word-wrapped at self.width characters.""" + leading_space = ' ' * indent + while len(leading_space) + len(text) > self.width: + # The text is too wide; wrap if possible. + + # Find the rightmost space that would obey our width constraint and + # that's not an escaped space. + available_space = self.width - len(leading_space) - len(' $') + space = available_space + while True: + space = text.rfind(' ', 0, space) + if (space < 0 or + self._count_dollars_before_index(text, space) % 2 == 0): + break + + if space < 0: + # No such space; just use the first unescaped space we can find. + space = available_space - 1 + while True: + space = text.find(' ', space + 1) + if (space < 0 or + self._count_dollars_before_index(text, space) % 2 == 0): + break + if space < 0: + # Give up on breaking. + break + + self.output.write(leading_space + text[0:space] + ' $\n') + text = text[space+1:] + + # Subsequent lines are continuations, so indent them. + leading_space = ' ' * (indent+2) + + self.output.write(leading_space + text + '\n') + + def close(self) -> None: + self.output.close() + + +def as_list(input: Any) -> list: + if input is None: + return [] + if isinstance(input, list): + return input + return [input] + + +def escape(string: str) -> str: + """Escape a string such that it can be embedded into a Ninja file without + further interpretation.""" + assert '\n' not in string, 'Ninja syntax does not allow newlines' + # We only have one special metacharacter: '$'. + return string.replace('$', '$$') + + +def expand(string: str, vars: dict[str, str], local_vars: dict[str, str] = {}) -> str: + """Expand a string containing $vars as Ninja would. + + Note: doesn't handle the full Ninja variable syntax, but it's enough + to make configure.py's use of it work. + """ + def exp(m: Any) -> Any: + var = m.group(1) + if var == '$': + return '$' + return local_vars.get(var, vars.get(var, '')) + return re.sub(r'\$(\$|\w*)', exp, string) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..20c295c --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +requests ~= 2.28.0 diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..7f632bd --- /dev/null +++ b/utils.py @@ -0,0 +1,189 @@ +from copy import copy +import errno +import os +import hashlib +import signal +import requests +import subprocess +import json +import copy +from types import SimpleNamespace + + +class Colors: + BLACK = "\033[0;30m" + RED = "\033[0;31m" + GREEN = "\033[0;32m" + BROWN = "\033[0;33m" + BLUE = "\033[0;34m" + PURPLE = "\033[0;35m" + CYAN = "\033[0;36m" + LIGHT_GRAY = "\033[0;37m" + DARK_GRAY = "\033[1;30m" + LIGHT_RED = "\033[1;31m" + LIGHT_GREEN = "\033[1;32m" + YELLOW = "\033[1;33m" + LIGHT_BLUE = "\033[1;34m" + LIGHT_PURPLE = "\033[1;35m" + LIGHT_CYAN = "\033[1;36m" + LIGHT_WHITE = "\033[1;37m" + BOLD = "\033[1m" + FAINT = "\033[2m" + ITALIC = "\033[3m" + UNDERLINE = "\033[4m" + BLINK = "\033[5m" + NEGATIVE = "\033[7m" + CROSSED = "\033[9m" + RESET = "\033[0m" + + +class CliException(Exception): + def __init__(self, msg: str): + self.msg = msg + + +def stripDups(l: list[str]) -> list[str]: + # Remove duplicates from a list + # by keeping only the last occurence + result: list[str] = [] + for item in l: + if item in result: + result.remove(item) + result.append(item) + return result + + +def findFiles(dir: str, exts: list[str] = []) -> list[str]: + if not os.path.isdir(dir): + return [] + + result: list[str] = [] + + for f in os.listdir(dir): + if len(exts) == 0: + result.append(f) + else: + for ext in exts: + if f.endswith(ext): + result.append(os.path.join(dir, f)) + break + + return result + + +def hashFile(filename: str) -> str: + with open(filename, "rb") as f: + return hashlib.sha256(f.read()).hexdigest() + + +def objSha256(obj: dict, keys: list[str] = []) -> str: + toHash = {} + if len(keys) == 0: + toHash = obj + else: + for key in keys: + if key in obj: + toHash[key] = obj[key] + + data = json.dumps(toHash, sort_keys=True) + return hashlib.sha256(data.encode("utf-8")).hexdigest() + + +def objKey(obj: dict, keys: list[str]) -> str: + toKey = [] + for key in keys: + if key in obj: + if isinstance(obj[key], bool): + if obj[key]: + toKey.append(key) + else: + toKey.append(obj[key]) + + return "-".join(toKey) + + +def mkdirP(path: str) -> str: + try: + os.makedirs(path) + except OSError as exc: + if exc.errno == errno.EEXIST and os.path.isdir(path): + pass + else: + raise + return path + + +def downloadFile(url: str) -> str: + dest = ".cache/remote/" + hashlib.sha256(url.encode('utf-8')).hexdigest() + tmp = dest + ".tmp" + + if os.path.isfile(dest): + return dest + + print(f"Downloading {url} to {dest}") + + try: + r = requests.get(url, stream=True) + r.raise_for_status() + mkdirP(os.path.dirname(dest)) + with open(tmp, 'wb') as f: + for chunk in r.iter_content(chunk_size=8192): + if chunk: + f.write(chunk) + + os.rename(tmp, dest) + return dest + except: + raise CliException(f"Failed to download {url}") + + +def runCmd(*args: str) -> bool: + try: + proc = subprocess.run(args) + except FileNotFoundError: + raise CliException(f"Failed to run {args[0]}: command not found") + + if proc.returncode == -signal.SIGSEGV: + raise CliException("Segmentation fault") + + if proc.returncode != 0: + raise CliException( + f"Failed to run {' '.join(args)}: process exited with code {proc.returncode}") + + return True + + +CACHE = {} + + +def processJson(e: any) -> any: + if isinstance(e, dict): + for k in e: + e[processJson(k)] = processJson(e[k]) + elif isinstance(e, list): + for i in range(len(e)): + e[i] = processJson(e[i]) + elif isinstance(e, str): + if e == "@sysname": + e = os.uname().sysname.lower() + elif e.startswith("@include("): + e = loadJson(e[9:-1]) + + return e + + +def loadJson(filename: str) -> dict: + result = {} + if filename in CACHE: + result = CACHE[filename] + else: + with open(filename) as f: + result = json.load(f) + + result["dir"] = os.path.dirname(filename) + result["json"] = filename + result = processJson(result) + CACHE[filename] = result + + result = copy.deepcopy(result) + return result