Compare commits

...

19 commits

Author SHA1 Message Date
Jordan ⌨️ e54f8f5964 ref: move to pathlib 2023-11-13 21:25:48 +01:00
Jordan ⌨️ 8f59111ad7 fix: check plugin if script is in same directory as __init__.py 2023-11-13 20:37:41 +01:00
Sleepy Monax 01f0868db0 Fix naming for ci step 2023-11-13 19:35:26 +01:00
Sleepy Monax f6f36ea79e Refactored build context and dependecy resolution code 2023-11-13 19:21:54 +01:00
Sleepy Monax 3dbf269cdd Run tests in the CI 2023-11-13 19:19:59 +01:00
Sleepy Monax 68cae44750 Added missing requirement 2023-11-11 19:08:53 +01:00
Sleepy Monax a472abb90f Use dataclasses for the model 2023-11-11 19:05:41 +01:00
Sleepy Monax 39ee66364d Add ensure function to check cutekit version 2023-11-11 19:03:19 +01:00
Sleepy Monax 9dc1575a57 Update Python version and MyPy command. 2023-11-11 17:12:44 +01:00
Sleepy Monax 0a5ed0c444 Moved more stuff to the cli module. 2023-11-11 17:02:56 +01:00
Sleepy Monax 2307a72564 Add ci checks. 2023-11-11 17:02:42 +01:00
Sleepy Monax ce3728ccfb Cleanups imports and got ride of cutekit.project 2023-11-11 16:44:40 +01:00
Sleepy Monax c8b23bc6c1 Removed the "manifest" suffix from classes names in the model. 2023-11-11 16:19:14 +01:00
Sleepy Monax e77e787547 Update cutekit CLI to use new cli module 2023-11-11 16:17:33 +01:00
Jordan ⌨️ 31ca0b19e8 feat: new command line decorator 2023-11-11 13:40:14 +01:00
Jordan ⌨️ d842c6af2d fix: incorrect type comparaison lead to infinite loop when not in ck project 2023-11-11 11:43:42 +01:00
Sleepy Monax 920762f56d Added command decorator. 2023-11-10 13:29:32 +01:00
Sleepy Monax 97f9f0ddba Add _ prefix to logger 2023-11-10 11:31:44 +01:00
Sleepy Monax 9572c6a3df Fix project lookup on windows. 2023-10-31 14:22:07 +01:00
25 changed files with 1430 additions and 1360 deletions

34
.github/workflows/checks.yml vendored Normal file
View file

@ -0,0 +1,34 @@
name: Checks
on:
pull_request:
push:
branches:
- stable
- dev
jobs:
checks:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v3
with:
python-version: '3.x'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install -r .github/workflows/requirements.txt
- name: Run MyPy
run: |
python -m mypy --install-types --non-interactive .
- name: Run PyTest
run: |
python -m pytest

5
.github/workflows/requirements.txt vendored Normal file
View file

@ -0,0 +1,5 @@
requests ~= 2.28.0
graphviz ~= 0.20.1
dataclasses-json ~= 0.6.2
mypy ~= 1.7.0
pytest ~= 7.4.3

View file

@ -2,8 +2,36 @@ import sys
import os import os
import logging import logging
from cutekit import const, project, vt100, plugins, cmds from . import (
from cutekit.args import parse builder,
cli,
compat,
const,
graph,
jexpr,
mixins,
model,
ninja,
plugins,
rules,
shell,
utils,
vt100,
)
def ensure(version: tuple[int, int, int]):
if (
const.VERSION[0] == version[0]
and const.VERSION[1] == version[1]
and const.VERSION[2] >= version[2]
):
return
raise RuntimeError(
f"Expected cutekit version {version[0]}.{version[1]}.{version[2]} but found {const.VERSION_STR}"
)
def setupLogger(verbose: bool): def setupLogger(verbose: bool):
if verbose: if verbose:
@ -13,15 +41,14 @@ def setupLogger(verbose: bool):
datefmt="%Y-%m-%d %H:%M:%S", datefmt="%Y-%m-%d %H:%M:%S",
) )
else: else:
projectRoot = project.root() projectRoot = model.Project.root()
logFile = const.GLOBAL_LOG_FILE logFile = const.GLOBAL_LOG_FILE
if projectRoot is not None: if projectRoot is not None:
logFile = os.path.join(projectRoot, const.PROJECT_LOG_FILE) logfile = projectRoot / const.PROJECT_LOG_FILE
# create the directory if it doesn't exist # create the directory if it doesn't exist
logDir = os.path.dirname(logFile) if not logFile.parent.is_dir():
if not os.path.isdir(logDir): logFile.parent.mkdir(parents=True)
os.makedirs(logDir)
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO,
@ -31,18 +58,19 @@ def setupLogger(verbose: bool):
datefmt="%Y-%m-%d %H:%M:%S", datefmt="%Y-%m-%d %H:%M:%S",
) )
def main() -> int: def main() -> int:
try: try:
a = parse(sys.argv[1:]) a = cli.parse(sys.argv[1:])
setupLogger(a.consumeOpt("verbose", False) is True) setupLogger(a.consumeOpt("verbose", False) is True)
plugins.loadAll() plugins.loadAll()
cmds.exec(a) cli.exec(a)
print() print()
return 0 return 0
except RuntimeError as e: except RuntimeError as e:
logging.exception(e) logging.exception(e)
cmds.error(str(e)) cli.error(str(e))
cmds.usage() cli.usage()
print() print()
return 1 return 1
except KeyboardInterrupt: except KeyboardInterrupt:

View file

@ -1,60 +0,0 @@
from typing import Optional, Union
Value = Union[str, bool, int]
class Args:
opts: dict[str, Value]
args: list[str]
def __init__(self):
self.opts = {}
self.args = []
def consumePrefix(self, prefix: str) -> dict[str, Value]:
result: dict[str, Value] = {}
copy = self.opts.copy()
for key, value in copy.items():
if key.startswith(prefix):
result[key[len(prefix) :]] = value
del self.opts[key]
return result
def consumeOpt(self, key: str, default: Value = False) -> Value:
if key in self.opts:
result = self.opts[key]
del self.opts[key]
return result
return default
def tryConsumeOpt(self, key: str) -> Optional[Value]:
if key in self.opts:
result = self.opts[key]
del self.opts[key]
return result
return None
def consumeArg(self, default: Optional[str] = None) -> Optional[str]:
if len(self.args) == 0:
return default
first = self.args[0]
del self.args[0]
return first
def parse(args: list[str]) -> Args:
result = Args()
for arg in args:
if arg.startswith("--"):
if "=" in arg:
key, value = arg[2:].split("=", 1)
result.opts[key] = value
else:
result.opts[arg[2:]] = True
else:
result.args.append(arg)
return result

View file

@ -1,171 +1,291 @@
import os import os
import logging import logging
from typing import TextIO from pathlib import Path
from dataclasses import dataclass
from itertools import chain
from typing import Generator, TextIO, Union, cast
from cutekit.model import Props from . import shell, rules, model, ninja, const, utils, cli
from cutekit.ninja import Writer
from cutekit.context import ComponentInstance, Context, contextFor
from cutekit import shell, rules
logger = logging.getLogger(__name__) _logger = logging.getLogger(__name__)
def gen(out: TextIO, context: Context): def aggregateCincs(target: model.Target, registry: model.Registry) -> set[str]:
writer = Writer(out) res = set()
target = context.target for c in registry.iterEnabled(target):
if "cpp-root-include" in c.props:
res.add(c.dirname())
elif c.type == model.Kind.LIB:
res.add(str(Path(c.dirname()).parent))
writer.comment("File generated by the build system, do not edit") return set(map(lambda i: f"-I{i}", res))
writer.newline()
writer.variable("builddir", context.builddir())
writer.separator("Tools")
writer.variable("cincs", " ".join(map(lambda i: f"-I{i}", context.cincls()))) def aggregateCdefs(target: model.Target) -> set[str]:
res = set()
writer.variable("cdefs", " ".join(context.cdefs())) def sanatize(s: str) -> str:
return s.lower().replace(" ", "_").replace("-", "_").replace(".", "_")
writer.newline() for k, v in target.props.items():
if isinstance(v, bool):
if v:
res.add(f"-D__ck_{sanatize(k)}__")
else:
res.add(f"-D__ck_{sanatize(k)}_{sanatize(str(v))}__")
res.add(f"-D__ck_{sanatize(k)}_value={str(v)}")
writer.rule("cp", "cp $in $out") return res
writer.newline()
def buildpath(target: model.Target, component: model.Component, path) -> Path:
return Path(target.builddir) / component.id / path
# --- Compilation ------------------------------------------------------------ #
def listSrc(component: model.Component) -> list[str]:
wildcards = set(chain(*map(lambda rule: rule.fileIn, rules.rules.values())))
dirs = [component.dirname()] + list(
map(lambda d: component.parent / d, component.subdirs)
)
return shell.find(dirs, list(wildcards), recusive=False)
def compileSrc(
w: ninja.Writer, target: model.Target, component: model.Component
) -> list[str]:
res: list[str] = []
for src in listSrc(component):
rel = Path(src).relative_to(component.dirname())
r = rules.byFileIn(src)
if r is None:
raise RuntimeError(f"Unknown rule for file {src}")
dest = buildpath(target, component, "obj") / rel.with_suffix(r.fileOut[0][1:])
t = target.tools[r.id]
w.build(str(dest), r.id, inputs=src, order_only=t.files)
res.append(str(dest))
return res
# --- Ressources ------------------------------------------------------------- #
def listRes(component: model.Component) -> list[str]:
return shell.find(str(component.subpath("res")))
def compileRes(
w: ninja.Writer,
target: model.Target,
component: model.Component,
) -> list[str]:
res: list[str] = []
for r in listRes(component):
rel = Path(r).relative_to(component.subpath("res"))
dest = buildpath(target, component, "res") / rel
w.build(str(dest), "cp", r)
res.append(str(dest))
return res
# --- Linking ---------------------------------------------------------------- #
def outfile(target: model.Target, component: model.Component) -> str:
if component.type == model.Kind.LIB:
return str(buildpath(target, component, f"lib/{component.id}.a"))
else:
return str(buildpath(target, component, f"bin/{component.id}.out"))
def collectLibs(
registry: model.Registry, target: model.Target, component: model.Component
) -> list[str]:
res: list[str] = []
for r in component.resolved[target.id].resolved:
req = registry.lookup(r, model.Component)
assert req is not None # model.Resolver has already checked this
if r == component.id:
continue
if not req.type == model.Kind.LIB:
raise RuntimeError(f"Component {r} is not a library")
res.append(outfile(target, req))
return res
def link(
w: ninja.Writer,
registry: model.Registry,
target: model.Target,
component: model.Component,
) -> str:
w.newline()
out = outfile(target, component)
objs: list[str] = compileSrc(w, target, component)
res = compileRes(w, target, component)
libs = collectLibs(registry, target, component)
if component.type == model.Kind.LIB:
w.build(out, "ar", objs, implicit=res)
else:
w.build(out, "ld", objs + libs, implicit=res)
return out
# --- Phony ------------------------------------------------------------------ #
def all(w: ninja.Writer, registry: model.Registry, target: model.Target) -> list[str]:
all: list[str] = []
for c in registry.iterEnabled(target):
all.append(link(w, registry, target, c))
w.build("all", "phony", all)
w.default("all")
return all
def gen(out: TextIO, target: model.Target, registry: model.Registry):
w = ninja.Writer(out)
w.comment("File generated by the build system, do not edit")
w.newline()
w.variable("builddir", target.builddir)
w.variable("hashid", target.hashid)
w.separator("Tools")
w.variable("cincs", " ".join(aggregateCincs(target, registry)))
w.variable("cdefs", " ".join(aggregateCdefs(target)))
w.newline()
w.rule("cp", "cp $in $out")
for i in target.tools: for i in target.tools:
tool = target.tools[i] tool = target.tools[i]
rule = rules.rules[i] rule = rules.rules[i]
writer.variable(i, tool.cmd) w.variable(i, tool.cmd)
writer.variable(i + "flags", " ".join(rule.args + tool.args)) w.variable(i + "flags", " ".join(rule.args + tool.args))
writer.rule( w.rule(
i, i,
f"{tool.cmd} {rule.rule.replace('$flags',f'${i}flags')}", f"{tool.cmd} {rule.rule.replace('$flags',f'${i}flags')}",
depfile=rule.deps, depfile=rule.deps,
) )
writer.newline() w.newline()
writer.separator("Components") w.separator("Build")
all: list[str] = [] all(w, registry, target)
for instance in context.enabledInstances():
objects = instance.objsfiles()
assets = instance.resfiles()
writer.comment(f"Component: {instance.manifest.id}")
writer.comment(f"Resolved: {', '.join(instance.resolved)}")
for obj in objects: @dataclass
r = rules.byFileIn(obj[0]) class Product:
if r is None: path: Path
raise RuntimeError(f"Unknown rule for file {obj[0]}") target: model.Target
t = target.tools[r.id] component: model.Component
writer.build(obj[1], r.id, obj[0], order_only=t.files)
for asset in assets:
writer.build(asset[1], "cp", asset[0])
writer.newline() def build(
target: model.Target,
registry: model.Registry,
components: Union[list[model.Component], model.Component, None] = None,
) -> list[Product]:
all = False
target.builddir.mkdir(parents=True, exist_ok=True)
ninjaPath = target.builddir / "build.ninja"
if instance.isLib(): with ninjaPath.open("w") as f:
writer.build( gen(f, target, registry)
instance.outfile(),
"ar", if components is None:
list(map(lambda o: o[1], objects)), all = True
implicit=list(map(lambda o: o[1], assets)), components = list(registry.iterEnabled(target))
if isinstance(components, model.Component):
components = [components]
products: list[Product] = []
for c in components:
products.append(
Product(
path=Path(outfile(target, c)),
target=target,
component=c,
) )
else:
libraries: list[str] = []
for req in instance.resolved:
reqInstance = context.componentByName(req)
if reqInstance is None:
raise RuntimeError(f"Component {req} not found")
if not reqInstance.isLib():
raise RuntimeError(f"Component {req} is not a library")
libraries.append(reqInstance.outfile())
writer.build(
instance.outfile(),
"ld",
list(map(lambda o: o[1], objects)) + libraries,
implicit=list(map(lambda o: o[1], assets)),
)
all.append(instance.outfile())
writer.newline()
writer.separator("Phony targets")
writer.build("all", "phony", all)
writer.default("all")
def build(componentSpec: str, targetSpec: str, props: Props = {}) -> ComponentInstance:
context = contextFor(targetSpec, props)
shell.mkdir(context.builddir())
ninjaPath = os.path.join(context.builddir(), "build.ninja")
with open(ninjaPath, "w") as f:
gen(f, context)
instance = context.componentByName(componentSpec)
if instance is None:
raise RuntimeError(f"Component {componentSpec} not found")
if not instance.enabled:
raise RuntimeError(
f"Component {componentSpec} is disabled: {instance.disableReason}"
) )
shell.exec("ninja", "-f", ninjaPath, instance.outfile()) outs = list(map(lambda p: str(p.path), products))
if all:
return instance shell.exec("ninja", "-v", "-f", ninjaPath)
else:
shell.exec("ninja", "-v", "-f", ninjaPath, *outs)
return products
class Paths: # --- Commands --------------------------------------------------------------- #
bin: str
lib: str
obj: str
def __init__(self, bin: str, lib: str, obj: str):
self.bin = bin
self.lib = lib
self.obj = obj
def buildAll(targetSpec: str, props: Props = {}) -> Context: @cli.command("b", "build", "Build a component or all components")
context = contextFor(targetSpec, props) def buildCmd(args: cli.Args):
registry = model.Registry.use(args)
shell.mkdir(context.builddir()) target = model.Target.use(args)
ninjaPath = os.path.join(context.builddir(), "build.ninja") componentSpec = args.consumeArg()
if componentSpec is None:
with open(ninjaPath, "w") as f: raise RuntimeError("No component specified")
gen(f, context) component = registry.lookup(componentSpec, model.Component)
build(target, registry, component)[0]
shell.exec("ninja", "-v", "-f", ninjaPath)
return context
def testAll(targetSpec: str): @cli.command("p", "project", "Show project information")
context = contextFor(targetSpec) def runCmd(args: cli.Args):
registry = model.Registry.use(args)
target = model.Target.use(args)
debug = args.consumeOpt("debug", False) is True
shell.mkdir(context.builddir()) componentSpec = args.consumeArg()
ninjaPath = os.path.join(context.builddir(), "build.ninja") if componentSpec is None:
raise RuntimeError("No component specified")
with open(ninjaPath, "w") as f: component = registry.lookup(componentSpec, model.Component)
gen(f, context) if component is None:
raise RuntimeError(f"Component {componentSpec} not found")
shell.exec("ninja", "-v", "-f", ninjaPath, "all") product = build(target, registry, component)[0]
for instance in context.enabledInstances(): os.environ["CK_TARGET"] = target.id
if instance.isLib(): os.environ["CK_COMPONENT"] = product.component.id
continue os.environ["CK_BUILDDIR"] = target.builddir
if instance.id().endswith("-tests"): shell.exec(*(["lldb", "-o", "run"] if debug else []), str(product.path), *args.args)
print(f"Running {instance.id()}")
shell.exec(instance.outfile())
@cli.command("t", "test", "Run all test targets")
def testCmd(args: cli.Args):
# This is just a wrapper around the `run` command that try
# to run a special hook component named __tests__.
args.args.insert(0, "__tests__")
runCmd(args)
@cli.command("d", "debug", "Debug a component")
def debugCmd(args: cli.Args):
# This is just a wrapper around the `run` command that
# always enable debug mode.
args.opts["debug"] = True
runCmd(args)
@cli.command("c", "clean", "Clean build files")
def cleanCmd(args: cli.Args):
model.Project.use(args)
shell.rmrf(const.BUILD_DIR)
@cli.command("n", "nuke", "Clean all build files and caches")
def nukeCmd(args: cli.Args):
model.Project.use(args)
shell.rmrf(const.PROJECT_CK_DIR)

158
cutekit/cli.py Normal file
View file

@ -0,0 +1,158 @@
import inspect
import sys
from pathlib import Path
from typing import Optional, Union, Callable
from dataclasses import dataclass
from . import const, vt100
Value = Union[str, bool, int]
class Args:
opts: dict[str, Value]
args: list[str]
def __init__(self):
self.opts = {}
self.args = []
def consumePrefix(self, prefix: str) -> dict[str, Value]:
result: dict[str, Value] = {}
copy = self.opts.copy()
for key, value in copy.items():
if key.startswith(prefix):
result[key[len(prefix) :]] = value
del self.opts[key]
return result
def consumeOpt(self, key: str, default: Value = False) -> Value:
if key in self.opts:
result = self.opts[key]
del self.opts[key]
return result
return default
def tryConsumeOpt(self, key: str) -> Optional[Value]:
if key in self.opts:
result = self.opts[key]
del self.opts[key]
return result
return None
def consumeArg(self, default: Optional[str] = None) -> Optional[str]:
if len(self.args) == 0:
return default
first = self.args[0]
del self.args[0]
return first
def parse(args: list[str]) -> Args:
result = Args()
for arg in args:
if arg.startswith("--"):
if "=" in arg:
key, value = arg[2:].split("=", 1)
result.opts[key] = value
else:
result.opts[arg[2:]] = True
else:
result.args.append(arg)
return result
Callback = Callable[[Args], None]
@dataclass
class Command:
shortName: Optional[str]
longName: str
helpText: str
isPlugin: bool
callback: Callback
commands: list[Command] = []
def command(shortName: Optional[str], longName: str, helpText: str):
curframe = inspect.currentframe()
calframe = inspect.getouterframes(curframe, 2)
def wrap(fn: Callable[[Args], None]):
commands.append(
Command(
shortName,
longName,
helpText,
Path(calframe[1].filename).parent != Path(__file__).parent,
fn,
)
)
return fn
return wrap
# --- Builtins Commands ------------------------------------------------------ #
@command("u", "usage", "Show usage information")
def usage(args: Optional[Args] = None):
print(f"Usage: {const.ARGV0} <command> [args...]")
def error(msg: str) -> None:
print(f"{vt100.RED}Error:{vt100.RESET} {msg}\n", file=sys.stderr)
@command("h", "help", "Show this help message")
def helpCmd(args: Args):
usage()
print()
vt100.title("Description")
print(f" {const.DESCRIPTION}")
print()
vt100.title("Commands")
for cmd in sorted(commands, key=lambda c: c.shortName or c.longName):
pluginText = ""
if cmd.isPlugin:
pluginText = f"{vt100.CYAN}(plugin){vt100.RESET}"
print(
f" {vt100.GREEN}{cmd.shortName or ' '}{vt100.RESET} {cmd.longName} - {cmd.helpText} {pluginText}"
)
print()
vt100.title("Logging")
print(" Logs are stored in:")
print(f" - {const.PROJECT_LOG_FILE}")
print(f" - {const.GLOBAL_LOG_FILE}")
@command("v", "version", "Show current version")
def versionCmd(args: Args):
print(f"CuteKit v{const.VERSION_STR}")
def exec(args: Args):
cmd = args.consumeArg()
if cmd is None:
raise RuntimeError("No command specified")
for c in commands:
if c.shortName == cmd or c.longName == cmd:
c.callback(args)
return
raise RuntimeError(f"Unknown command {cmd}")

View file

@ -1,317 +0,0 @@
import os
import logging
import sys
from typing import Callable, cast, Optional, NoReturn
from cutekit import context, shell, const, vt100, builder, graph, project
from cutekit.args import Args
from cutekit.jexpr import Json
from cutekit.model import Extern
from cutekit.context import contextFor
Callback = Callable[[Args], None]
logger = logging.getLogger(__name__)
class Cmd:
shortName: Optional[str]
longName: str
helpText: str
callback: Callable[[Args], NoReturn]
isPlugin: bool = False
def __init__(
self,
shortName: Optional[str],
longName: str,
helpText: str,
callback: Callable[[Args], NoReturn],
):
self.shortName = shortName
self.longName = longName
self.helpText = helpText
self.callback = callback
cmds: list[Cmd] = []
def append(cmd: Cmd):
cmd.isPlugin = True
cmds.append(cmd)
cmds.sort(key=lambda c: c.shortName or c.longName)
def runCmd(args: Args):
project.chdir()
targetSpec = cast(str, args.consumeOpt("target", "host-" + shell.uname().machine))
props = args.consumePrefix("prop:")
componentSpec = args.consumeArg()
if componentSpec is None:
raise RuntimeError("Component not specified")
component = builder.build(componentSpec, targetSpec, props)
os.environ["CK_TARGET"] = component.context.target.id
os.environ["CK_COMPONENT"] = component.id()
os.environ["CK_BUILDDIR"] = component.context.builddir()
shell.exec(component.outfile(), *args.args)
cmds += [Cmd("r", "run", "Run the target", runCmd)]
def testCmd(args: Args):
project.chdir()
targetSpec = cast(str, args.consumeOpt("target", "host-" + shell.uname().machine))
builder.testAll(targetSpec)
cmds += [Cmd("t", "test", "Run all test targets", testCmd)]
def debugCmd(args: Args):
project.chdir()
targetSpec = cast(str, args.consumeOpt("target", "host-" + shell.uname().machine))
props = args.consumePrefix("prop:")
componentSpec = args.consumeArg()
if componentSpec is None:
raise RuntimeError("Component not specified")
component = builder.build(componentSpec, targetSpec, props)
os.environ["CK_TARGET"] = component.context.target.id
os.environ["CK_COMPONENT"] = component.id()
os.environ["CK_BUILDDIR"] = component.context.builddir()
shell.exec("lldb", "-o", "run", component.outfile(), *args.args)
cmds += [Cmd("d", "debug", "Debug the target", debugCmd)]
def buildCmd(args: Args):
project.chdir()
targetSpec = cast(str, args.consumeOpt("target", "host-" + shell.uname().machine))
props = args.consumePrefix("prop:")
componentSpec = args.consumeArg()
if componentSpec is None:
builder.buildAll(targetSpec, props)
else:
builder.build(componentSpec, targetSpec, props)
cmds += [Cmd("b", "build", "Build the target", buildCmd)]
def listCmd(args: Args):
project.chdir()
components = context.loadAllComponents()
targets = context.loadAllTargets()
vt100.title("Components")
if len(components) == 0:
print(" (No components available)")
else:
print(vt100.indent(vt100.wordwrap(", ".join(map(lambda m: m.id, components)))))
print()
vt100.title("Targets")
if len(targets) == 0:
print(" (No targets available)")
else:
print(vt100.indent(vt100.wordwrap(", ".join(map(lambda m: m.id, targets)))))
print()
cmds += [Cmd("l", "list", "List the targets", listCmd)]
def cleanCmd(args: Args):
project.chdir()
shell.rmrf(const.BUILD_DIR)
cmds += [Cmd("c", "clean", "Clean the build directory", cleanCmd)]
def nukeCmd(args: Args):
project.chdir()
shell.rmrf(const.PROJECT_CK_DIR)
cmds += [Cmd("n", "nuke", "Clean the build directory and cache", nukeCmd)]
def helpCmd(args: Args):
usage()
print()
vt100.title("Description")
print(f" {const.DESCRIPTION}")
print()
vt100.title("Commands")
for cmd in cmds:
pluginText = ""
if cmd.isPlugin:
pluginText = f"{vt100.CYAN}(plugin){vt100.RESET}"
print(
f" {vt100.GREEN}{cmd.shortName or ' '}{vt100.RESET} {cmd.longName} - {cmd.helpText} {pluginText}"
)
print()
vt100.title("Logging")
print(" Logs are stored in:")
print(f" - {const.PROJECT_LOG_FILE}")
print(f" - {const.GLOBAL_LOG_FILE}")
cmds += [Cmd("h", "help", "Show this help message", helpCmd)]
def versionCmd(args: Args):
print(f"CuteKit v{const.VERSION_STR}\n")
cmds += [Cmd("v", "version", "Show current version", versionCmd)]
def graphCmd(args: Args):
project.chdir()
targetSpec = cast(str, args.consumeOpt("target", "host-" + shell.uname().machine))
scope: Optional[str] = cast(Optional[str], args.tryConsumeOpt("scope"))
onlyLibs: bool = args.consumeOpt("only-libs", False) is True
showDisabled: bool = args.consumeOpt("show-disabled", False) is True
context = contextFor(targetSpec)
graph.view(context, scope=scope, showExe=not onlyLibs, showDisabled=showDisabled)
cmds += [Cmd("g", "graph", "Show dependency graph", graphCmd)]
def grabExtern(extern: dict[str, Extern]):
for extSpec, ext in extern.items():
extPath = os.path.join(const.EXTERN_DIR, extSpec)
if os.path.exists(extPath):
print(f"Skipping {extSpec}, already installed")
continue
print(f"Installing {extSpec}-{ext.tag} from {ext.git}...")
shell.popen(
"git", "clone", "--depth", "1", "--branch", ext.tag, ext.git, extPath
)
if os.path.exists(os.path.join(extPath, "project.json")):
grabExtern(context.loadProject(extPath).extern)
def installCmd(args: Args):
project.chdir()
pj = context.loadProject(".")
grabExtern(pj.extern)
cmds += [Cmd("i", "install", "Install all the external packages", installCmd)]
def initCmd(args: Args):
import requests
repo = args.consumeOpt("repo", const.DEFAULT_REPO_TEMPLATES)
list = args.consumeOpt("list")
template = args.consumeArg()
name = args.consumeArg()
logger.info("Fetching registry...")
r = requests.get(f"https://raw.githubusercontent.com/{repo}/main/registry.json")
if r.status_code != 200:
logger.error("Failed to fetch registry")
exit(1)
registry = r.json()
if list:
print(
"\n".join(f"* {entry['id']} - {entry['description']}" for entry in registry)
)
return
if not template:
raise RuntimeError("Template not specified")
def template_match(t: Json) -> str:
return t["id"] == template
if not any(filter(template_match, registry)):
raise LookupError(f"Couldn't find a template named {template}")
if not name:
logger.info(f"No name was provided, defaulting to {template}")
name = template
if os.path.exists(name):
raise RuntimeError(f"Directory {name} already exists")
print(f"Creating project {name} from template {template}...")
shell.cloneDir(f"https://github.com/{repo}", template, name)
print(f"Project {name} created\n")
print("We suggest that you begin by typing:")
print(f" {vt100.GREEN}cd {name}{vt100.RESET}")
print(
f" {vt100.GREEN}cutekit install{vt100.BRIGHT_BLACK} # Install external packages{vt100.RESET}"
)
print(
f" {vt100.GREEN}cutekit build{vt100.BRIGHT_BLACK} # Build the project{vt100.RESET}"
)
cmds += [Cmd("I", "init", "Initialize a new project", initCmd)]
def usage():
print(f"Usage: {const.ARGV0} <command> [args...]")
def error(msg: str) -> None:
print(f"{vt100.RED}Error:{vt100.RESET} {msg}\n", file=sys.stderr)
def exec(args: Args):
cmd = args.consumeArg()
if cmd is None:
raise RuntimeError("No command specified")
for c in cmds:
if c.shortName == cmd or c.longName == cmd:
c.callback(args)
return
raise RuntimeError(f"Unknown command {cmd}")

View file

@ -1,3 +1,4 @@
from pathlib import Path
from typing import Any from typing import Any
@ -5,10 +6,11 @@ SUPPORTED_MANIFEST = [
"https://schemas.cute.engineering/stable/cutekit.manifest.component.v1", "https://schemas.cute.engineering/stable/cutekit.manifest.component.v1",
"https://schemas.cute.engineering/stable/cutekit.manifest.project.v1", "https://schemas.cute.engineering/stable/cutekit.manifest.project.v1",
"https://schemas.cute.engineering/stable/cutekit.manifest.target.v1", "https://schemas.cute.engineering/stable/cutekit.manifest.target.v1",
] ]
OSDK_MANIFEST_NOT_SUPPORTED = "OSDK manifests are not supported by CuteKit. Please use CuteKit manifest instead" OSDK_MANIFEST_NOT_SUPPORTED = (
"OSDK manifests are not supported by CuteKit. Please use CuteKit manifest instead"
)
UNSUPORTED_MANIFEST = { UNSUPORTED_MANIFEST = {
"https://schemas.cute.engineering/stable/osdk.manifest.component.v1": OSDK_MANIFEST_NOT_SUPPORTED, "https://schemas.cute.engineering/stable/osdk.manifest.component.v1": OSDK_MANIFEST_NOT_SUPPORTED,
@ -20,14 +22,16 @@ UNSUPORTED_MANIFEST = {
} }
def ensureSupportedManifest(manifest: Any, path: str): def ensureSupportedManifest(manifest: Any, path: Path):
if "$schema" not in manifest: if "$schema" not in manifest:
raise RuntimeError(f"Missing $schema in {path}") raise RuntimeError(f"Missing $schema in {path}")
if manifest["$schema"] in UNSUPORTED_MANIFEST: if manifest["$schema"] in UNSUPORTED_MANIFEST:
raise RuntimeError( raise RuntimeError(
f"Unsupported manifest schema {manifest['$schema']} in {path}: {UNSUPORTED_MANIFEST[manifest['$schema']]}") f"Unsupported manifest schema {manifest['$schema']} in {path}: {UNSUPORTED_MANIFEST[manifest['$schema']]}"
)
if manifest["$schema"] not in SUPPORTED_MANIFEST: if manifest["$schema"] not in SUPPORTED_MANIFEST:
raise RuntimeError( raise RuntimeError(
f"Unsupported manifest schema {manifest['$schema']} in {path}") f"Unsupported manifest schema {manifest['$schema']} in {path}"
)

View file

@ -1,20 +1,21 @@
import os import os
import sys import sys
from pathlib import Path
VERSION = (0, 6, 0, "dev") VERSION = (0, 6, 0, "dev")
VERSION_STR = f"{VERSION[0]}.{VERSION[1]}.{VERSION[2]}{'-' + VERSION[3] if VERSION[3] else ''}" VERSION_STR = (
MODULE_DIR = os.path.dirname(os.path.realpath(__file__)) f"{VERSION[0]}.{VERSION[1]}.{VERSION[2]}{'-' + VERSION[3] if VERSION[3] else ''}"
ARGV0 = os.path.basename(sys.argv[0]) )
PROJECT_CK_DIR = ".cutekit" ARGV0 = Path(sys.argv[0])
GLOBAL_CK_DIR = os.path.join(os.path.expanduser("~"), ".cutekit") PROJECT_CK_DIR = Path(".cutekit")
BUILD_DIR = os.path.join(PROJECT_CK_DIR, "build") GLOBAL_CK_DIR = Path.home() / ".cutekit"
CACHE_DIR = os.path.join(PROJECT_CK_DIR, "cache") BUILD_DIR = PROJECT_CK_DIR / "build"
EXTERN_DIR = os.path.join(PROJECT_CK_DIR, "extern") CACHE_DIR = PROJECT_CK_DIR / "cache"
SRC_DIR = "src" EXTERN_DIR = PROJECT_CK_DIR / "extern"
META_DIR = "meta" SRC_DIR = Path("src")
TARGETS_DIR = os.path.join(META_DIR, "targets") META_DIR = Path("meta")
TARGETS_DIR = META_DIR / "targets"
DEFAULT_REPO_TEMPLATES = "cute-engineering/cutekit-templates" DEFAULT_REPO_TEMPLATES = "cute-engineering/cutekit-templates"
DESCRIPTION = "A build system and package manager for low-level software development" DESCRIPTION = "A build system and package manager for low-level software development"
PROJECT_LOG_FILE = os.path.join(PROJECT_CK_DIR, "cutekit.log") PROJECT_LOG_FILE = PROJECT_CK_DIR / "cutekit.log"
GLOBAL_LOG_FILE = os.path.join( GLOBAL_LOG_FILE = GLOBAL_CK_DIR / "cutekit.log"
os.path.expanduser("~"), ".cutekit", "cutekit.log")

View file

@ -1,303 +0,0 @@
from typing import cast, Optional, Protocol, Iterable
from itertools import chain
from pathlib import Path
import os
import logging
from cutekit.model import ProjectManifest, TargetManifest, ComponentManifest, Props, Type, Tool, Tools
from cutekit import const, shell, jexpr, utils, rules, mixins, project
logger = logging.getLogger(__name__)
class IContext(Protocol):
target: TargetManifest
def builddir(self) -> str:
...
class ComponentInstance:
enabled: bool = True
disableReason = ""
manifest: ComponentManifest
sources: list[str] = []
res: list[str] = []
resolved: list[str] = []
context: IContext
def __init__(
self,
enabled: bool,
disableReason: str,
manifest: ComponentManifest,
sources: list[str],
res: list[str],
resolved: list[str]):
self.enabled = enabled
self.disableReason = disableReason
self.manifest = manifest
self.sources = sources
self.res = res
self.resolved = resolved
def id(self) -> str:
return self.manifest.id
def isLib(self):
return self.manifest.type == Type.LIB
def objdir(self) -> str:
return os.path.join(self.context.builddir(), f"{self.manifest.id}/obj")
def resdir(self) -> str:
return os.path.join(self.context.builddir(), f"{self.manifest.id}/res")
def objsfiles(self) -> list[tuple[str, str]]:
def toOFile(s: str) -> str:
return os.path.join(self.objdir(), s.replace(os.path.join(self.manifest.dirname(), ''), '') + ".o")
return list(map(lambda s: (s, toOFile(s)), self.sources))
def resfiles(self) -> list[tuple[str, str, str]]:
def toAssetFile(s: str) -> str:
return os.path.join(self.resdir(), s.replace(os.path.join(self.manifest.dirname(), 'res/'), ''))
def toAssetId(s: str) -> str:
return s.replace(os.path.join(self.manifest.dirname(), 'res/'), '')
return list(map(lambda s: (s, toAssetFile(s), toAssetId(s)), self.res))
def outfile(self) -> str:
if self.isLib():
return os.path.join(self.context.builddir(), self.manifest.id, f"lib/{self.manifest.id}.a")
else:
return os.path.join(self.context.builddir(), self.manifest.id, f"bin/{self.manifest.id}.out")
def cinclude(self) -> str:
if "cpp-root-include" in self.manifest.props:
return self.manifest.dirname()
elif self.manifest.type == Type.LIB:
return str(Path(self.manifest.dirname()).parent)
else:
return ""
class Context(IContext):
target: TargetManifest
instances: list[ComponentInstance]
tools: Tools
def enabledInstances(self) -> Iterable[ComponentInstance]:
return filter(lambda x: x.enabled, self.instances)
def __init__(self, target: TargetManifest, instances: list[ComponentInstance], tools: Tools):
self.target = target
self.instances = instances
self.tools = tools
def componentByName(self, name: str) -> Optional[ComponentInstance]:
result = list(filter(lambda x: x.manifest.id == name, self.instances))
if len(result) == 0:
return None
return result[0]
def cincls(self) -> list[str]:
includes = list(filter(lambda x: x != "", map(
lambda x: x.cinclude(), self.enabledInstances())))
return utils.uniq(includes)
def cdefs(self) -> list[str]:
return self.target.cdefs()
def hashid(self) -> str:
return utils.hash((self.target.props, [self.tools[t].toJson() for t in self.tools]))[0:8]
def builddir(self) -> str:
return os.path.join(const.BUILD_DIR, f"{self.target.id}-{self.hashid()[:8]}")
def loadAllTargets() -> list[TargetManifest]:
projectRoot = project.root()
if projectRoot is None:
return []
pj = loadProject(projectRoot)
paths = list(
map(lambda e: os.path.join(const.EXTERN_DIR,
e, const.TARGETS_DIR), pj.extern.keys())
) + [const.TARGETS_DIR]
ret = []
for entry in paths:
files = shell.find(entry, ["*.json"])
ret += list(map(lambda path: TargetManifest(jexpr.evalRead(path), path), files))
return ret
def loadProject(path: str) -> ProjectManifest:
path = os.path.join(path, "project.json")
return ProjectManifest(jexpr.evalRead(path), path)
def loadTarget(id: str) -> TargetManifest:
try:
return next(filter(lambda t: t.id == id, loadAllTargets()))
except StopIteration:
raise RuntimeError(f"Target '{id}' not found")
def loadAllComponents() -> list[ComponentManifest]:
files = shell.find(const.SRC_DIR, ["manifest.json"])
files += shell.find(const.EXTERN_DIR, ["manifest.json"])
return list(
map(
lambda path: ComponentManifest(jexpr.evalRead(path), path),
files))
def filterDisabled(components: list[ComponentManifest], target: TargetManifest) -> tuple[list[ComponentManifest], list[ComponentManifest]]:
return list(filter(lambda c: c.isEnabled(target)[0], components)), \
list(filter(lambda c: not c.isEnabled(target)[0], components))
def providerFor(what: str, components: list[ComponentManifest]) -> tuple[Optional[str], str]:
result: list[ComponentManifest] = list(
filter(lambda c: c.id == what, components))
if len(result) == 0:
# Try to find a provider
result = list(filter(lambda x: (what in x.provides), components))
if len(result) == 0:
logger.error(f"No provider for '{what}'")
return (None, f"No provider for '{what}'")
if len(result) > 1:
ids = list(map(lambda x: x.id, result))
logger.error(f"Multiple providers for '{what}': {result}")
return (None, f"Multiple providers for '{what}': {','.join(ids)}")
return (result[0].id, "")
def resolveDeps(componentSpec: str, components: list[ComponentManifest], target: TargetManifest) -> tuple[bool, str, list[str]]:
mapping = dict(map(lambda c: (c.id, c), components))
def resolveInner(what: str, stack: list[str] = []) -> tuple[bool, str, list[str]]:
result: list[str] = []
what = target.route(what)
resolved, unresolvedReason = providerFor(what, components)
if resolved is None:
return False, unresolvedReason, []
if resolved in stack:
raise RuntimeError(f"Dependency loop: {stack} -> {resolved}")
stack.append(resolved)
for req in mapping[resolved].requires:
keep, unresolvedReason, reqs = resolveInner(req, stack)
if not keep:
stack.pop()
logger.error(f"Dependency '{req}' not met for '{resolved}'")
return False, unresolvedReason, []
result.extend(reqs)
stack.pop()
result.insert(0, resolved)
return True, "", result
enabled, unresolvedReason, resolved = resolveInner(componentSpec)
return enabled, unresolvedReason, resolved
def instanciate(componentSpec: str, components: list[ComponentManifest], target: TargetManifest) -> Optional[ComponentInstance]:
manifest = next(filter(lambda c: c.id == componentSpec, components))
wildcards = set(
chain(*map(lambda rule: rule.fileIn, rules.rules.values())))
sources = shell.find(
manifest.subdirs, list(wildcards), recusive=False)
res = shell.find(os.path.join(manifest.dirname(), "res"))
enabled, unresolvedReason, resolved = resolveDeps(
componentSpec, components, target)
return ComponentInstance(enabled, unresolvedReason, manifest, sources, res, resolved[1:])
def instanciateDisabled(component: ComponentManifest, target: TargetManifest) -> ComponentInstance:
return ComponentInstance(
enabled=False,
disableReason=component.isEnabled(target)[1],
manifest=component,
sources=[],
res=[],
resolved=[])
context: dict[str, Context] = {}
def contextFor(targetSpec: str, props: Props = {}) -> Context:
if targetSpec in context:
return context[targetSpec]
logger.info(f"Loading context for '{targetSpec}'")
targetEls = targetSpec.split(":")
if targetEls[0] == "":
targetEls[0] = "host-" + shell.uname().machine
target = loadTarget(targetEls[0])
target.props |= props
components = loadAllComponents()
components, disabled = filterDisabled(components, target)
tools: Tools = {}
for toolSpec in target.tools:
tool = target.tools[toolSpec]
tools[toolSpec] = Tool(
strict=False,
cmd=tool.cmd,
args=tool.args,
files=tool.files)
tools[toolSpec].args += rules.rules[toolSpec].args
for m in targetEls[1:]:
mixin = mixins.byId(m)
tools = mixin(target, tools)
for component in components:
for toolSpec in component.tools:
tool = component.tools[toolSpec]
tools[toolSpec].args += tool.args
instances: list[ComponentInstance] = list(
map(lambda c: instanciateDisabled(c, target), disabled))
instances += cast(list[ComponentInstance], list(filter(lambda e: e is not None, map(lambda c: instanciate(
c.id, components, target), components))))
context[targetSpec] = Context(
target,
instances,
tools,
)
for instance in instances:
instance.context = context[targetSpec]
return context[targetSpec]

View file

@ -1,59 +1,95 @@
import os import os
from typing import Optional from typing import Optional, cast
from cutekit.context import Context
from cutekit import vt100 from . import vt100, cli, model
def view(context: Context, scope: Optional[str] = None, showExe: bool = True, showDisabled: bool = False): def view(
from graphviz import Digraph registry: model.Registry,
target: model.Target,
scope: Optional[str] = None,
showExe: bool = True,
showDisabled: bool = False,
):
from graphviz import Digraph # type: ignore
g = Digraph(context.target.id, filename='graph.gv') g = Digraph(target.id, filename="graph.gv")
g.attr('graph', splines='ortho', rankdir='BT', ranksep='1.5') g.attr("graph", splines="ortho", rankdir="BT", ranksep="1.5")
g.attr('node', shape='ellipse') g.attr("node", shape="ellipse")
g.attr( g.attr(
'graph', label=f"<<B>{scope or 'Full Dependency Graph'}</B><BR/>{context.target.id}>", labelloc='t') "graph",
label=f"<<B>{scope or 'Full Dependency Graph'}</B><BR/>{target.id}>",
labelloc="t",
)
scopeInstance = None scopeInstance = None
if scope is not None: if scope is not None:
scopeInstance = context.componentByName(scope) scopeInstance = registry.lookup(scope, model.Component)
for instance in context.instances: for component in registry.iterEnabled(target):
if not instance.isLib() and not showExe: if not component.type == model.Kind.LIB and not showExe:
continue continue
if scopeInstance is not None and \ if (
instance.manifest.id != scope and \ scopeInstance is not None
instance.manifest.id not in scopeInstance.resolved: and component.id != scope
and component.id not in scopeInstance.resolved[target.id].resolved
):
continue continue
if instance.enabled: if component.resolved[target.id].enabled:
fillcolor = "lightgrey" if instance.isLib() else "lightblue" fillcolor = "lightgrey" if component.type == model.Kind.LIB else "lightblue"
shape = "plaintext" if not scope == instance.manifest.id else 'box' shape = "plaintext" if not scope == component.id else "box"
g.node(instance.manifest.id, f"<<B>{instance.manifest.id}</B><BR/>{vt100.wordwrap(instance.manifest.decription, 40,newline='<BR/>')}>", g.node(
shape=shape, style="filled", fillcolor=fillcolor) component.id,
f"<<B>{component.id}</B><BR/>{vt100.wordwrap(component.decription, 40,newline='<BR/>')}>",
shape=shape,
style="filled",
fillcolor=fillcolor,
)
for req in instance.manifest.requires: for req in component.requires:
g.edge(instance.manifest.id, req) g.edge(component.id, req)
for req in instance.manifest.provides: for req in component.provides:
isChosen = context.target.routing.get( isChosen = target.routing.get(req, None) == component.id
req, None) == instance.manifest.id
g.edge(req, instance.manifest.id, arrowhead="none", color=( g.edge(
"blue" if isChosen else "black")) req,
component.id,
arrowhead="none",
color=("blue" if isChosen else "black"),
)
elif showDisabled: elif showDisabled:
g.node(instance.manifest.id, f"<<B>{instance.manifest.id}</B><BR/>{vt100.wordwrap(instance.manifest.decription, 40,newline='<BR/>')}<BR/><BR/><I>{vt100.wordwrap(instance.disableReason, 40,newline='<BR/>')}</I>>", g.node(
shape="plaintext", style="filled", fontcolor="#999999", fillcolor="#eeeeee") component.id,
f"<<B>{component.id}</B><BR/>{vt100.wordwrap(component.decription, 40,newline='<BR/>')}<BR/><BR/><I>{vt100.wordwrap(str(component.resolved[target.id].reason), 40,newline='<BR/>')}</I>>",
shape="plaintext",
style="filled",
fontcolor="#999999",
fillcolor="#eeeeee",
)
for req in instance.manifest.requires: for req in component.requires:
g.edge(instance.manifest.id, req, color="#aaaaaa") g.edge(component.id, req, color="#aaaaaa")
for req in instance.manifest.provides: for req in component.provides:
g.edge(req, instance.manifest.id, g.edge(req, component.id, arrowhead="none", color="#aaaaaa")
arrowhead="none", color="#aaaaaa")
g.view(filename=os.path.join(context.builddir(), "graph.gv")) g.view(filename=str(target.builddir / "graph.gv"))
@cli.command("g", "graph", "Show the dependency graph")
def graphCmd(args: cli.Args):
registry = model.Registry.use(args)
target = model.Target.use(args)
scope = cast(Optional[str], args.tryConsumeOpt("scope"))
onlyLibs = args.consumeOpt("only-libs", False) is True
showDisabled = args.consumeOpt("show-disabled", False) is True
view(registry, target, scope=scope, showExe=not onlyLibs, showDisabled=showDisabled)

View file

@ -1,17 +1,17 @@
import os import os
from typing import Any, cast, Callable, Final
import json import json
from pathlib import Path
import cutekit.shell as shell from typing import Any, cast, Callable, Final
from cutekit.compat import ensureSupportedManifest from . import shell, compat
Json = Any Json = Any
Builtin = Callable[..., Json] Builtin = Callable[..., Json]
BUILTINS: Final[dict[str, Builtin]] = { BUILTINS: Final[dict[str, Builtin]] = {
"uname": lambda arg, ctx: getattr(shell.uname(), arg).lower(), "uname": lambda arg, ctx: getattr(shell.uname(), arg).lower(),
"include": lambda arg, ctx: evalRead(arg, compatibilityCheck=False), "include": lambda arg, ctx: evalRead(arg),
"evalRead": lambda arg, ctx: evalRead(arg, compatibilityCheck=False), "evalRead": lambda arg, ctx: evalRead(arg),
"join": lambda lhs, rhs, ctx: cast( "join": lambda lhs, rhs, ctx: cast(
Json, {**lhs, **rhs} if isinstance(lhs, dict) else lhs + rhs Json, {**lhs, **rhs} if isinstance(lhs, dict) else lhs + rhs
), ),
@ -28,7 +28,7 @@ BUILTINS: Final[dict[str, Builtin]] = {
} }
def eval(jexpr: Json, filePath: str) -> Json: def eval(jexpr: Json, filePath: Path) -> Json:
if isinstance(jexpr, dict): if isinstance(jexpr, dict):
result = {} result = {}
for k in cast(dict[str, Json], jexpr): for k in cast(dict[str, Json], jexpr):
@ -50,7 +50,7 @@ def eval(jexpr: Json, filePath: str) -> Json:
return jexpr return jexpr
def read(path: str) -> Json: def read(path: Path) -> Json:
try: try:
with open(path, "r") as f: with open(path, "r") as f:
return json.load(f) return json.load(f)
@ -58,8 +58,6 @@ def read(path: str) -> Json:
raise RuntimeError(f"Failed to read {path}") raise RuntimeError(f"Failed to read {path}")
def evalRead(path: str, compatibilityCheck: bool = True) -> Json: def evalRead(path: Path) -> Json:
data = read(path) data = read(path)
if compatibilityCheck:
ensureSupportedManifest(data, path)
return eval(data, path) return eval(data, path)

View file

@ -1,31 +1,29 @@
from typing import Callable from typing import Callable
from cutekit.model import TargetManifest, Tools
Mixin = Callable[[TargetManifest, Tools], Tools] from . import model
Mixin = Callable[[model.Target, model.Tools], model.Tools]
def patchToolArgs(tools: Tools, toolSpec: str, args: list[str]): def patchToolArgs(tools: model.Tools, toolSpec: str, args: list[str]):
tools[toolSpec].args += args tools[toolSpec].args += args
def prefixToolCmd(tools: Tools, toolSpec: str, prefix: str): def prefixToolCmd(tools: model.Tools, toolSpec: str, prefix: str):
tools[toolSpec].cmd = prefix + " " + tools[toolSpec].cmd tools[toolSpec].cmd = prefix + " " + tools[toolSpec].cmd
def mixinCache(target: TargetManifest, tools: Tools) -> Tools: def mixinCache(target: model.Target, tools: model.Tools) -> model.Tools:
prefixToolCmd(tools, "cc", "ccache") prefixToolCmd(tools, "cc", "ccache")
prefixToolCmd(tools, "cxx", "ccache") prefixToolCmd(tools, "cxx", "ccache")
return tools return tools
def makeMixinSan(san: str) -> Mixin: def makeMixinSan(san: str) -> Mixin:
def mixinSan(target: TargetManifest, tools: Tools) -> Tools: def mixinSan(target: model.Target, tools: model.Tools) -> model.Tools:
patchToolArgs( patchToolArgs(tools, "cc", [f"-fsanitize={san}"])
tools, "cc", [f"-fsanitize={san}"]) patchToolArgs(tools, "cxx", [f"-fsanitize={san}"])
patchToolArgs( patchToolArgs(tools, "ld", [f"-fsanitize={san}"])
tools, "cxx", [f"-fsanitize={san}"])
patchToolArgs(
tools, "ld", [f"-fsanitize={san}"])
return tools return tools
@ -33,7 +31,7 @@ def makeMixinSan(san: str) -> Mixin:
def makeMixinOptimize(level: str) -> Mixin: def makeMixinOptimize(level: str) -> Mixin:
def mixinOptimize(target: TargetManifest, tools: Tools) -> Tools: def mixinOptimize(target: model.Target, tools: model.Tools) -> model.Tools:
patchToolArgs(tools, "cc", [f"-O{level}"]) patchToolArgs(tools, "cc", [f"-O{level}"])
patchToolArgs(tools, "cxx", [f"-O{level}"]) patchToolArgs(tools, "cxx", [f"-O{level}"])
@ -42,7 +40,7 @@ def makeMixinOptimize(level: str) -> Mixin:
return mixinOptimize return mixinOptimize
def mixinDebug(target: TargetManifest, tools: Tools) -> Tools: def mixinDebug(target: model.Target, tools: model.Tools) -> model.Tools:
patchToolArgs(tools, "cc", ["-g", "-gdwarf-4"]) patchToolArgs(tools, "cc", ["-g", "-gdwarf-4"])
patchToolArgs(tools, "cxx", ["-g", "-gdwarf-4"]) patchToolArgs(tools, "cxx", ["-g", "-gdwarf-4"])
@ -50,7 +48,7 @@ def mixinDebug(target: TargetManifest, tools: Tools) -> Tools:
def makeMixinTune(tune: str) -> Mixin: def makeMixinTune(tune: str) -> Mixin:
def mixinTune(target: TargetManifest, tools: Tools) -> Tools: def mixinTune(target: model.Target, tools: model.Tools) -> model.Tools:
patchToolArgs(tools, "cc", [f"-mtune={tune}"]) patchToolArgs(tools, "cc", [f"-mtune={tune}"])
patchToolArgs(tools, "cxx", [f"-mtune={tune}"]) patchToolArgs(tools, "cxx", [f"-mtune={tune}"])

View file

@ -1,17 +1,23 @@
import os import os
from enum import Enum
from typing import Any
import logging import logging
from cutekit.jexpr import Json
from enum import Enum
from typing import Any, Generator, Optional, Type, cast
from pathlib import Path
from dataclasses_json import DataClassJsonMixin
from dataclasses import dataclass, field
logger = logging.getLogger(__name__) from cutekit import const, shell
from . import jexpr, compat, utils, cli, vt100
_logger = logging.getLogger(__name__)
Props = dict[str, Any] Props = dict[str, Any]
class Type(Enum): class Kind(Enum):
UNKNOWN = "unknown" UNKNOWN = "unknown"
PROJECT = "project" PROJECT = "project"
TARGET = "target" TARGET = "target"
@ -19,260 +25,290 @@ class Type(Enum):
EXE = "exe" EXE = "exe"
class Manifest: # --- Manifest --------------------------------------------------------------- #
id: str = ""
type: Type = Type.UNKNOWN
path: str = ""
def __init__(
self, json: Json = None, path: str = "", strict: bool = True, **kwargs: Any
):
if json is not None:
if "id" not in json:
raise RuntimeError("Missing id")
self.id = json["id"]
if "type" not in json and strict:
raise RuntimeError("Missing type")
self.type = Type(json["type"])
self.path = path
elif strict:
raise RuntimeError("Missing json")
for key in kwargs:
setattr(self, key, kwargs[key])
def toJson(self) -> Json:
return {"id": self.id, "type": self.type.value, "path": self.path}
def __str__(self):
return f"Manifest(id={self.id}, type={self.type}, path={self.path})"
def __repr__(self):
return f"Manifest({id})"
def dirname(self) -> str:
return os.path.dirname(self.path)
class Extern: @dataclass
git: str = "" class Manifest(DataClassJsonMixin):
tag: str = "" id: str
type: Kind = field(default=Kind.UNKNOWN)
path: Path = field(default=Path())
def __init__(self, json: Json = None, strict: bool = True, **kwargs: Any): @staticmethod
if json is not None: def parse(path: Path, data: dict[str, Any]) -> "Manifest":
if "git" not in json and strict: """
raise RuntimeError("Missing git") Parse a manifest from a given path and data
"""
compat.ensureSupportedManifest(data, path)
kind = Kind(data["type"])
del data["$schema"]
obj = KINDS[kind].from_dict(data)
obj.path = path
return obj
self.git = json["git"] @staticmethod
def load(path: Path) -> "Manifest":
"""
Load a manifest from a given path
"""
return Manifest.parse(path, jexpr.evalRead(path))
if "tag" not in json and strict: def dirname(self) -> Path:
raise RuntimeError("Missing tag") """
Return the directory of the manifest
"""
return self.path.parent
self.tag = json["tag"] def subpath(self, path) -> Path:
elif strict: return self.dirname() / path
raise RuntimeError("Missing json")
for key in kwargs: def ensureType(self, t: Type[utils.T]) -> utils.T:
setattr(self, key, kwargs[key]) """
Ensure that the manifest is of a given type
def toJson(self) -> Json: """
return {"git": self.git, "tag": self.tag} if not isinstance(self, t):
raise RuntimeError(
def __str__(self): f"{self.path} should be a {type.__name__} manifest but is a {self.__class__.__name__} manifest"
return f"Extern(git={self.git}, tag={self.tag})" )
return cast(utils.T, self)
def __repr__(self):
return f"Extern({self.git})"
class ProjectManifest(Manifest): # --- Project ---------------------------------------------------------------- #
description: str = ""
extern: dict[str, Extern] = {}
def __init__( _project: Optional["Project"] = None
self, json: Json = None, path: str = "", strict: bool = True, **kwargs: Any
):
if json is not None:
if "description" not in json and strict:
raise RuntimeError("Missing description")
self.description = json["description"]
self.extern = {k: Extern(v) for k, v in json.get("extern", {}).items()}
elif strict:
raise RuntimeError("Missing json")
super().__init__(json, path, strict, **kwargs)
def toJson(self) -> Json:
return {
**super().toJson(),
"description": self.description,
"extern": {k: v.toJson() for k, v in self.extern.items()},
}
def __str__(self):
return f"ProjectManifest(id={self.id}, type={self.type}, path={self.path}, description={self.description}, extern={self.extern})"
def __repr__(self):
return f"ProjectManifest({self.id})"
class Tool: @dataclass
cmd: str = "" class Extern(DataClassJsonMixin):
args: list[str] = [] git: str
files: list[str] = [] tag: str
def __init__(self, json: Json = None, strict: bool = True, **kwargs: Any):
if json is not None:
if "cmd" not in json and strict:
raise RuntimeError("Missing cmd")
self.cmd = json.get("cmd", self.cmd) @dataclass
class Project(Manifest):
description: str = field(default="(No description)")
extern: dict[str, Extern] = field(default_factory=dict)
if "args" not in json and strict: @property
raise RuntimeError("Missing args") def externDirs(self) -> list[Path]:
res = map(lambda e: const.EXTERN_DIR / e, self.extern.keys())
return list(res)
self.args = json.get("args", []) @staticmethod
def root() -> Optional[Path]:
"""
Find the root of the project by looking for a project.json
"""
cwd = Path.cwd()
while str(cwd) != cwd.root:
if (cwd / "project.json").is_file():
return cwd
cwd = cwd.parent
return None
self.files = json.get("files", []) @staticmethod
elif strict: def chdir() -> None:
raise RuntimeError("Missing json") """
Change the current working directory to the root of the project
"""
path = Project.root()
if path is None:
raise RuntimeError(
"No project.json found in this directory or any parent directory"
)
os.chdir(path)
for key in kwargs: @staticmethod
setattr(self, key, kwargs[key]) def at(path: str | Path) -> Optional["Project"]:
path = Path(path) / "project.json"
if not path.exists():
return None
return Manifest.load(path).ensureType(Project)
def toJson(self) -> Json: @staticmethod
return {"cmd": self.cmd, "args": self.args, "files": self.files} def ensure() -> "Project":
root = Project.root()
if root is None:
raise RuntimeError(
"No project.json found in this directory or any parent directory"
)
os.chdir(root)
return Manifest.load(Path(root / "project.json")).ensureType(
Project
)
def __str__(self): @staticmethod
return f"Tool(cmd={self.cmd}, args={self.args}, files={self.files})" def fetchs(extern: dict[str | Path, Extern]):
for extSpec, ext in extern.items():
extPath = const.EXTERN_DIR / extSpec
def __repr__(self): if extPath.exists():
return f"Tool({self.cmd})" print(f"Skipping {extSpec}, already installed")
continue
print(f"Installing {extSpec}-{ext.tag} from {ext.git}...")
shell.popen(
"git",
"clone",
"--quiet",
"--depth",
"1",
"--branch",
ext.tag,
ext.git,
extPath,
)
project = Project.at(extPath)
if project is not None:
Project.fetchs(project.extern)
@staticmethod
def use(args: cli.Args) -> "Project":
global _project
if _project is None:
_project = Project.ensure()
return _project
@cli.command("i", "install", "Install required external packages")
def installCmd(args: cli.Args):
project = Project.use(args)
Project.fetchs(project.extern)
@cli.command("I", "init", "Initialize a new project")
def initCmd(args: cli.Args):
import requests
repo = args.consumeOpt("repo", const.DEFAULT_REPO_TEMPLATES)
list = args.consumeOpt("list")
template = args.consumeArg()
name = Path(args.consumeArg())
_logger.info("Fetching registry...")
r = requests.get(f"https://raw.githubusercontent.com/{repo}/main/registry.json")
if r.status_code != 200:
_logger.error("Failed to fetch registry")
exit(1)
registry = r.json()
if list:
print(
"\n".join(f"* {entry['id']} - {entry['description']}" for entry in registry)
)
return
if not template:
raise RuntimeError("Template not specified")
def template_match(t: jexpr.Json) -> str:
return t["id"] == template
if not any(filter(template_match, registry)):
raise LookupError(f"Couldn't find a template named {template}")
if not name:
_logger.info(f"No name was provided, defaulting to {template}")
name = template
if name.exists():
raise RuntimeError(f"Directory {name} already exists")
print(f"Creating project {name} from template {template}...")
shell.cloneDir(f"https://github.com/{repo}", template, name)
print(f"Project {name} created\n")
print("We suggest that you begin by typing:")
print(f" {vt100.GREEN}cd {name}{vt100.RESET}")
print(
f" {vt100.GREEN}cutekit install{vt100.BRIGHT_BLACK} # Install external packages{vt100.RESET}"
)
print(
f" {vt100.GREEN}cutekit build{vt100.BRIGHT_BLACK} # Build the project{vt100.RESET}"
)
# --- Target ----------------------------------------------------------------- #
@dataclass
class Tool(DataClassJsonMixin):
cmd: str = field(default="")
args: list[str] = field(default_factory=list)
files: list[str] = field(default_factory=list)
Tools = dict[str, Tool] Tools = dict[str, Tool]
class TargetManifest(Manifest): @dataclass
props: Props class Target(Manifest):
tools: Tools props: Props = field(default_factory=dict)
routing: dict[str, str] tools: Tools = field(default_factory=dict)
routing: dict[str, str] = field(default_factory=dict)
def __init__( @property
self, json: Json = None, path: str = "", strict: bool = True, **kwargs: Any def hashid(self) -> str:
): return utils.hash((self.props, [v.to_dict() for k, v in self.tools.items()]))
if json is not None:
if "props" not in json and strict:
raise RuntimeError("Missing props")
self.props = json["props"] @property
def builddir(self) -> Path:
return const.BUILD_DIR / f"{self.id}-{self.hashid[:8]}"
if "tools" not in json and strict: @staticmethod
raise RuntimeError("Missing tools") def use(args: cli.Args) -> "Target":
registry = Registry.use(args)
self.tools = {k: Tool(v) for k, v in json["tools"].items()} targetSpec = str(args.consumeOpt("target", "host-" + shell.uname().machine))
return registry.ensure(targetSpec, Target)
self.routing = json.get("routing", {})
super().__init__(json, path, strict, **kwargs)
def toJson(self) -> Json:
return {
**super().toJson(),
"props": self.props,
"tools": {k: v.toJson() for k, v in self.tools.items()},
"routing": self.routing,
}
def __repr__(self):
return f"TargetManifest({self.id})"
def route(self, componentSpec: str): def route(self, componentSpec: str):
"""
Route a component spec to a target specific component spec
"""
return ( return (
self.routing[componentSpec] self.routing[componentSpec]
if componentSpec in self.routing if componentSpec in self.routing
else componentSpec else componentSpec
) )
def cdefs(self) -> list[str]:
defines: list[str] = []
def sanatize(s: str) -> str: # --- Component -------------------------------------------------------------- #
return s.lower().replace(" ", "_").replace("-", "_").replace(".", "_")
for key in self.props:
prop = self.props[key]
propStr = str(prop)
if isinstance(prop, bool):
if prop:
defines += [f"-D__ck_{sanatize(key)}__"]
else:
defines += [f"-D__ck_{sanatize(key)}_{sanatize(propStr)}__"]
defines += [f"-D__ck_{sanatize(key)}_value={propStr}"]
return defines
class ComponentManifest(Manifest): @dataclass
decription: str = "(No description)" class Resolved:
props: Props = {} reason: Optional[str] = None
tools: Tools = {} resolved: list[str] = field(default_factory=list)
enableIf: dict[str, list[Any]] = {}
requires: list[str] = []
provides: list[str] = []
subdirs: list[str] = []
def __init__( @property
self, json: Json = None, path: str = "", strict: bool = True, **kwargs: Any def enabled(self) -> bool:
): return self.reason is None
if json is not None:
self.decription = json.get("description", self.decription)
self.props = json.get("props", self.props)
self.tools = {
k: Tool(v, strict=False) for k, v in json.get("tools", {}).items()
}
self.enableIf = json.get("enableIf", self.enableIf)
self.requires = json.get("requires", self.requires)
self.provides = json.get("provides", self.provides)
self.subdirs = list(
map(
lambda x: os.path.join(os.path.dirname(path), x),
json.get("subdirs", [""]),
)
)
super().__init__(json, path, strict, **kwargs)
def toJson(self) -> Json: @dataclass
return { class Component(Manifest):
**super().toJson(), decription: str = field(default="(No description)")
"description": self.decription, props: Props = field(default_factory=dict)
"props": self.props, tools: Tools = field(default_factory=dict)
"tools": {k: v.toJson() for k, v in self.tools.items()}, enableIf: dict[str, list[Any]] = field(default_factory=dict)
"enableIf": self.enableIf, requires: list[str] = field(default_factory=list)
"requires": self.requires, provides: list[str] = field(default_factory=list)
"provides": self.provides, subdirs: list[str] = field(default_factory=list)
"subdirs": self.subdirs, injects: list[str] = field(default_factory=list)
} resolved: dict[str, Resolved] = field(default_factory=dict)
def __repr__(self): def isEnabled(self, target: Target) -> tuple[bool, str]:
return f"ComponentManifest({self.id})"
def isEnabled(self, target: TargetManifest) -> tuple[bool, str]:
for k, v in self.enableIf.items(): for k, v in self.enableIf.items():
if k not in target.props: if k not in target.props:
logger.info(f"Component {self.id} disabled by missing {k} in target") _logger.info(f"Component {self.id} disabled by missing {k} in target")
return False, f"Missing props '{k}' in target" return False, f"Missing props '{k}' in target"
if target.props[k] not in v: if target.props[k] not in v:
vStrs = [f"'{str(x)}'" for x in v] vStrs = [f"'{str(x)}'" for x in v]
logger.info( _logger.info(
f"Component {self.id} disabled by {k}={target.props[k]} not in {v}" f"Component {self.id} disabled by {k}={target.props[k]} not in {v}"
) )
return ( return (
@ -281,3 +317,298 @@ class ComponentManifest(Manifest):
) )
return True, "" return True, ""
KINDS: dict[Kind, Type[Manifest]] = {
Kind.PROJECT: Project,
Kind.TARGET: Target,
Kind.LIB: Component,
Kind.EXE: Component,
}
# --- Dependency resolution -------------------------------------------------- #
@dataclass
class Resolver:
_registry: "Registry"
_target: Target
_mappings: dict[str, list[Component]] = field(default_factory=dict)
_cache: dict[str, Resolved] = field(default_factory=dict)
_baked = False
def _bake(self):
"""
Bake the resolver by building a mapping of all
components that provide a given spec.
"""
if self._baked:
return
for c in self._registry.iter(Component):
for p in c.provides + [c.id]:
if p not in self._mappings and [0]:
self._mappings[p] = []
self._mappings[p].append(c)
# Overide with target routing since it has priority
# over component provides and id
for k, v in self._target.routing.items():
component = self._registry.lookup(v, Component)
self._mappings[k] = [component] if component else []
self._baked = True
def _provider(self, spec: str) -> tuple[Optional[str], str]:
"""
Returns the provider for a given spec.
"""
result = self._mappings.get(spec, [])
if len(result) == 1:
enabled, reason = result[0].isEnabled(self._target)
if not enabled:
return (None, reason)
def checkIsEnabled(c: Component) -> bool:
enabled, reason = c.isEnabled(self._target)
if not enabled:
_logger.info(f"Component {c.id} cannot provide '{spec}': {reason}")
return enabled
result = list(filter(checkIsEnabled, result))
if result == []:
return (None, f"No provider for '{spec}'")
if len(result) > 1:
ids = list(map(lambda x: x.id, result))
return (None, f"Multiple providers for '{spec}': {','.join(ids)}")
return (result[0].id, "")
def resolve(self, what: str, stack: list[str] = []) -> Resolved:
"""
Resolve a given spec to a list of components.
"""
self._bake()
if what in self._cache:
return self._cache[what]
keep, unresolvedReason = self._provider(what)
if not keep:
_logger.error(f"Dependency '{what}' not found: {unresolvedReason}")
self._cache[what] = Resolved(reason=unresolvedReason)
return self._cache[what]
if keep in self._cache:
return self._cache[keep]
if keep in stack:
raise RuntimeError(
f"Dependency loop while resolving '{what}': {stack} -> {keep}"
)
stack.append(keep)
component = self._registry.lookup(keep, Component)
if not component:
return Resolved(reason="No provider for 'myembed'")
result: list[str] = []
for req in component.requires:
reqResolved = self.resolve(req, stack)
if reqResolved.reason:
stack.pop()
self._cache[keep] = Resolved(reason=reqResolved.reason)
return self._cache[keep]
result.extend(reqResolved.resolved)
stack.pop()
result.insert(0, keep)
self._cache[keep] = Resolved(resolved=utils.uniq(result))
return self._cache[keep]
# --- Registry --------------------------------------------------------------- #
_registry: Optional["Registry"] = None
@dataclass
class Registry(DataClassJsonMixin):
project: Project
manifests: dict[str, Manifest] = field(default_factory=dict)
def _append(self, m: Manifest):
"""
Append a manifest to the model
"""
if m.id in self.manifests:
raise RuntimeError(
f"Duplicated manifest '{m.id}' at '{m.path}' already loaded from '{self.manifests[m.id].path}'"
)
self.manifests[m.id] = m
def iter(self, type: Type[utils.T]) -> Generator[utils.T, None, None]:
"""
Iterate over all manifests of a given type
"""
for m in self.manifests.values():
if isinstance(m, type):
yield m
def iterEnabled(self, target: Target) -> Generator[Component, None, None]:
for c in self.iter(Component):
resolve = c.resolved[target.id]
if resolve.enabled:
yield c
def lookup(self, name: str, type: Type[utils.T]) -> Optional[utils.T]:
"""
Lookup a manifest of a given type by name
"""
if name in self.manifests:
m = self.manifests[name]
if isinstance(m, type):
return m
return None
def ensure(self, name: str, type: Type[utils.T]) -> utils.T:
"""
Ensure that a manifest of a given type exists
and return it.
"""
m = self.lookup(name, type)
if not m:
raise RuntimeError(f"Could not find {type.__name__} '{name}'")
return m
@staticmethod
def use(args: cli.Args) -> "Registry":
global _registry
if _registry is not None:
return _registry
project = Project.use(args)
mixins = str(args.consumeOpt("mixins", "")).split(",")
if mixins == [""]:
mixins = []
props = cast(dict[str, str], args.consumePrefix("prop:"))
_registry = Registry.load(project, mixins, props)
return _registry
@staticmethod
def load(project: Project, mixins: list[str], props: Props) -> "Registry":
registry = Registry(project)
registry._append(project)
# Lookup and load all extern projects
for externDir in project.externDirs:
projectPath = externDir / "project.json"
manifestPath = externDir / "manifest.json"
if projectPath.exists():
registry._append(Manifest.load(projectPath).ensureType(Project))
elif manifestPath.exists():
# For simple library allow to have a manifest.json instead of a project.json
registry._append(
Manifest.load(manifestPath).ensureType(Component)
)
else:
_logger.warn(
"Extern project does not have a project.json or manifest.json"
)
# Load all manifests from projects
for project in list(registry.iter(Project)):
targetDir = project.parent / const.TARGETS_DIR
targetFiles = targetDir.glob("*.json")
for targetFile in targetFiles:
registry._append(Manifest.load(Path(targetFile)).ensureType(Target))
componentDir = project.parent / const.COMPONENTS_DIR
rootComponent = project.parent / "manifest.json"
componentFiles = list(componentDir.glob("manifest.json"))
if rootComponent.exists():
componentFiles += [rootComponent]
for componentFile in componentFiles:
registry._append(
Manifest.load(componentFile).ensureType(Component)
)
# Resolve all dependencies for all targets
for target in registry.iter(Target):
target.props |= props
resolver = Resolver(registry, target)
# Apply injects
for c in registry.iter(Component):
if c.isEnabled(target)[0]:
for inject in c.injects:
victim = registry.lookup(inject, Component)
if not victim:
raise RuntimeError(f"Cannot find component '{inject}'")
victim.requires += [c.id]
# Resolve all components
for c in registry.iter(Component):
resolved = resolver.resolve(c.id)
if resolved.reason:
_logger.info(f"Component '{c.id}' disabled: {resolved.reason}")
c.resolved[target.id] = resolved
# Resolve tooling
tools: Tools = target.tools
from . import mixins as mxs
for mix in mixins:
mixin = mxs.byId(mix)
tools = mixin(target, tools)
# Apply tooling from components
for c in registry.iter(Component):
if c.resolved[target.id].enabled:
for k, v in c.tools.items():
tools[k].args += v.args
return registry
@cli.command("l", "list", "List all components and targets")
def listCmd(args: cli.Args):
registry = Registry.use(args)
components = list(registry.iter(Component))
targets = list(registry.iter(Target))
vt100.title("Components")
if len(components) == 0:
print(vt100.p("(No components available)"))
else:
print(vt100.p(", ".join(map(lambda m: m.id, components))))
print()
vt100.title("Targets")
if len(targets) == 0:
print(vt100.p("(No targets available)"))
else:
print(vt100.p(", ".join(map(lambda m: m.id, targets))))
print()

View file

@ -23,13 +23,13 @@ use Python.
""" """
import textwrap import textwrap
from typing import TextIO, Union
from cutekit.utils import asList from typing import TextIO, Union
from . import utils
def escapePath(word: str) -> str: def escapePath(word: str) -> str:
return word.replace('$ ', '$$ ').replace(' ', '$ ').replace(':', '$:') return word.replace("$ ", "$$ ").replace(" ", "$ ").replace(":", "$:")
VarValue = Union[int, str, list[str], None] VarValue = Union[int, str, list[str], None]
@ -42,92 +42,98 @@ class Writer(object):
self.width = width self.width = width
def newline(self) -> None: def newline(self) -> None:
self.output.write('\n') self.output.write("\n")
def comment(self, text: str) -> None: def comment(self, text: str) -> None:
for line in textwrap.wrap(text, self.width - 2, break_long_words=False, for line in textwrap.wrap(
break_on_hyphens=False): text, self.width - 2, break_long_words=False, break_on_hyphens=False
self.output.write('# ' + line + '\n') ):
self.output.write("# " + line + "\n")
def separator(self, text : str) -> None: def separator(self, text: str) -> None:
self.output.write(f"# --- {text} ---" + '-' * self.output.write(
(self.width - 10 - len(text)) + " #\n\n") f"# --- {text} ---" + "-" * (self.width - 10 - len(text)) + " #\n\n"
)
def variable(self, key: str, value: VarValue, indent: int = 0) -> None: def variable(self, key: str, value: VarValue, indent: int = 0) -> None:
if value is None: if value is None:
return return
if isinstance(value, list): if isinstance(value, list):
value = ' '.join(filter(None, value)) # Filter out empty strings. value = " ".join(filter(None, value)) # Filter out empty strings.
self._line('%s = %s' % (key, value), indent) self._line("%s = %s" % (key, value), indent)
def pool(self, name: str, depth: int) -> None: def pool(self, name: str, depth: int) -> None:
self._line('pool %s' % name) self._line("pool %s" % name)
self.variable('depth', depth, indent=1) self.variable("depth", depth, indent=1)
def rule(self, def rule(
name: str, self,
command: VarValue, name: str,
description: Union[str, None] = None, command: VarValue,
depfile: VarValue = None, description: Union[str, None] = None,
generator: VarValue = False, depfile: VarValue = None,
pool: VarValue = None, generator: VarValue = False,
restat: bool = False, pool: VarValue = None,
rspfile: VarValue = None, restat: bool = False,
rspfile_content: VarValue = None, rspfile: VarValue = None,
deps: VarValue = None) -> None: rspfile_content: VarValue = None,
self._line('rule %s' % name) deps: VarValue = None,
self.variable('command', command, indent=1) ) -> None:
self._line("rule %s" % name)
self.variable("command", command, indent=1)
if description: if description:
self.variable('description', description, indent=1) self.variable("description", description, indent=1)
if depfile: if depfile:
self.variable('depfile', depfile, indent=1) self.variable("depfile", depfile, indent=1)
if generator: if generator:
self.variable('generator', '1', indent=1) self.variable("generator", "1", indent=1)
if pool: if pool:
self.variable('pool', pool, indent=1) self.variable("pool", pool, indent=1)
if restat: if restat:
self.variable('restat', '1', indent=1) self.variable("restat", "1", indent=1)
if rspfile: if rspfile:
self.variable('rspfile', rspfile, indent=1) self.variable("rspfile", rspfile, indent=1)
if rspfile_content: if rspfile_content:
self.variable('rspfile_content', rspfile_content, indent=1) self.variable("rspfile_content", rspfile_content, indent=1)
if deps: if deps:
self.variable('deps', deps, indent=1) self.variable("deps", deps, indent=1)
def build(self, def build(
outputs: Union[str, list[str]], self,
rule: str, outputs: Union[str, list[str]],
inputs: Union[VarPath, None], rule: str,
implicit: VarPath = None, inputs: Union[VarPath, None],
order_only: VarPath = None, implicit: VarPath = None,
variables: Union[dict[str, str], None] = None, order_only: VarPath = None,
implicit_outputs: VarPath = None, variables: Union[dict[str, str], None] = None,
pool: Union[str, None] = None, implicit_outputs: VarPath = None,
dyndep: Union[str, None] = None) -> list[str]: pool: Union[str, None] = None,
outputs = asList(outputs) dyndep: Union[str, None] = None,
) -> list[str]:
outputs = utils.asList(outputs)
out_outputs = [escapePath(x) for x in outputs] out_outputs = [escapePath(x) for x in outputs]
all_inputs = [escapePath(x) for x in asList(inputs)] all_inputs = [escapePath(x) for x in utils.asList(inputs)]
if implicit: if implicit:
implicit = [escapePath(x) for x in asList(implicit)] implicit = [escapePath(x) for x in utils.asList(implicit)]
all_inputs.append('|') all_inputs.append("|")
all_inputs.extend(implicit) all_inputs.extend(implicit)
if order_only: if order_only:
order_only = [escapePath(x) for x in asList(order_only)] order_only = [escapePath(x) for x in utils.asList(order_only)]
all_inputs.append('||') all_inputs.append("||")
all_inputs.extend(order_only) all_inputs.extend(order_only)
if implicit_outputs: if implicit_outputs:
implicit_outputs = [escapePath(x) implicit_outputs = [escapePath(x) for x in utils.asList(implicit_outputs)]
for x in asList(implicit_outputs)] out_outputs.append("|")
out_outputs.append('|')
out_outputs.extend(implicit_outputs) out_outputs.extend(implicit_outputs)
self._line('build %s: %s' % (' '.join(out_outputs), self._line(
' '.join([rule] + all_inputs))) "build %s: %s" % (" ".join(out_outputs), " ".join([rule] + all_inputs))
)
if pool is not None: if pool is not None:
self._line(' pool = %s' % pool) self._line(" pool = %s" % pool)
if dyndep is not None: if dyndep is not None:
self._line(' dyndep = %s' % dyndep) self._line(" dyndep = %s" % dyndep)
if variables: if variables:
iterator = iter(variables.items()) iterator = iter(variables.items())
@ -138,58 +144,59 @@ class Writer(object):
return outputs return outputs
def include(self, path: str) -> None: def include(self, path: str) -> None:
self._line('include %s' % path) self._line("include %s" % path)
def subninja(self, path: str) -> None: def subninja(self, path: str) -> None:
self._line('subninja %s' % path) self._line("subninja %s" % path)
def default(self, paths: VarPath) -> None: def default(self, paths: VarPath) -> None:
self._line('default %s' % ' '.join(asList(paths))) self._line("default %s" % " ".join(utils.asList(paths)))
def _count_dollars_before_index(self, s: str, i: int) -> int: def _count_dollars_before_index(self, s: str, i: int) -> int:
"""Returns the number of '$' characters right in front of s[i].""" """Returns the number of '$' characters right in front of s[i]."""
dollar_count = 0 dollar_count = 0
dollar_index = i - 1 dollar_index = i - 1
while dollar_index > 0 and s[dollar_index] == '$': while dollar_index > 0 and s[dollar_index] == "$":
dollar_count += 1 dollar_count += 1
dollar_index -= 1 dollar_index -= 1
return dollar_count return dollar_count
def _line(self, text: str, indent: int = 0) -> None: def _line(self, text: str, indent: int = 0) -> None:
"""Write 'text' word-wrapped at self.width characters.""" """Write 'text' word-wrapped at self.width characters."""
leading_space = ' ' * indent leading_space = " " * indent
while len(leading_space) + len(text) > self.width: while len(leading_space) + len(text) > self.width:
# The text is too wide; wrap if possible. # The text is too wide; wrap if possible.
# Find the rightmost space that would obey our width constraint and # Find the rightmost space that would obey our width constraint and
# that's not an escaped space. # that's not an escaped space.
available_space = self.width - len(leading_space) - len(' $') available_space = self.width - len(leading_space) - len(" $")
space = available_space space = available_space
while True: while True:
space = text.rfind(' ', 0, space) space = text.rfind(" ", 0, space)
if (space < 0 or if space < 0 or self._count_dollars_before_index(text, space) % 2 == 0:
self._count_dollars_before_index(text, space) % 2 == 0):
break break
if space < 0: if space < 0:
# No such space; just use the first unescaped space we can find. # No such space; just use the first unescaped space we can find.
space = available_space - 1 space = available_space - 1
while True: while True:
space = text.find(' ', space + 1) space = text.find(" ", space + 1)
if (space < 0 or if (
self._count_dollars_before_index(text, space) % 2 == 0): space < 0
or self._count_dollars_before_index(text, space) % 2 == 0
):
break break
if space < 0: if space < 0:
# Give up on breaking. # Give up on breaking.
break break
self.output.write(leading_space + text[0:space] + ' $\n') self.output.write(leading_space + text[0:space] + " $\n")
text = text[space+1:] text = text[space + 1 :]
# Subsequent lines are continuations, so indent them. # Subsequent lines are continuations, so indent them.
leading_space = ' ' * (indent+2) leading_space = " " * (indent + 2)
self.output.write(leading_space + text + '\n') self.output.write(leading_space + text + "\n")
def close(self) -> None: def close(self) -> None:
self.output.close() self.output.close()
@ -198,6 +205,6 @@ class Writer(object):
def escape(string: str) -> str: def escape(string: str) -> str:
"""Escape a string such that it can be embedded into a Ninja file without """Escape a string such that it can be embedded into a Ninja file without
further interpretation.""" further interpretation."""
assert '\n' not in string, 'Ninja syntax does not allow newlines' assert "\n" not in string, "Ninja syntax does not allow newlines"
# We only have one special metacharacter: '$'. # We only have one special metacharacter: '$'.
return string.replace('$', '$$') return string.replace("$", "$$")

View file

@ -1,18 +1,19 @@
import os import os
import logging import logging
from cutekit import shell, project, const, context from . import shell, model, const
import importlib.util as importlib import importlib.util as importlib
logger = logging.getLogger(__name__) _logger = logging.getLogger(__name__)
def load(path: str): def load(path: str):
logger.info(f"Loading plugin {path}") _logger.info(f"Loading plugin {path}")
spec = importlib.spec_from_file_location("plugin", path) spec = importlib.spec_from_file_location("plugin", path)
if not spec or not spec.loader: if not spec or not spec.loader:
logger.error(f"Failed to load plugin {path}") _logger.error(f"Failed to load plugin {path}")
return None return None
module = importlib.module_from_spec(spec) module = importlib.module_from_spec(spec)
@ -20,25 +21,23 @@ def load(path: str):
def loadAll(): def loadAll():
logger.info("Loading plugins...") _logger.info("Loading plugins...")
projectRoot = project.root() root = model.Project.root()
if projectRoot is None: if root is None:
logger.info("Not in project, skipping plugin loading") _logger.info("Not in project, skipping plugin loading")
return return
pj = context.loadProject(projectRoot) project = model.Project.at(root)
paths = list(map(lambda e: os.path.join(const.EXTERN_DIR, e), pj.extern.keys())) + ["."] paths = list(map(lambda e: const.EXTERN_DIR / e, project.extern.keys())) + ["."]
for dirname in paths: for dirname in paths:
pluginDir = os.path.join(projectRoot, dirname, const.META_DIR, "plugins") pluginDir = root / dirname / const.META_DIR / "plugins"
for files in shell.readdir(pluginDir): for script in pluginDir.glob("*.py"):
if files.endswith(".py"): plugin = load(script)
plugin = load(os.path.join(pluginDir, files))
if plugin:
logger.info(f"Loaded plugin {plugin.name}")
plugin.init()
if plugin:
_logger.info(f"Loaded plugin {plugin.name}")
plugin.init()

View file

@ -1,20 +0,0 @@
import os
from typing import Optional
def root() -> Optional[str]:
cwd = os.getcwd()
while cwd != "/":
if os.path.isfile(os.path.join(cwd, "project.json")):
return cwd
cwd = os.path.dirname(cwd)
return None
def chdir() -> None:
projectRoot = root()
if projectRoot is None:
raise RuntimeError(
"No project.json found in this directory or any parent directory")
os.chdir(projectRoot)

View file

@ -9,7 +9,15 @@ class Rule:
args: list[str] args: list[str]
deps: Optional[str] = None deps: Optional[str] = None
def __init__(self, id: str, fileIn: list[str], fileOut: list[str], rule: str, args: list[str] = [], deps: Optional[str] = None): def __init__(
self,
id: str,
fileIn: list[str],
fileOut: list[str],
rule: str,
args: list[str] = [],
deps: Optional[str] = None,
):
self.id = id self.id = id
self.fileIn = fileIn self.fileIn = fileIn
self.fileOut = fileOut self.fileOut = fileOut
@ -19,16 +27,22 @@ class Rule:
rules: dict[str, Rule] = { rules: dict[str, Rule] = {
"cc": Rule("cc", ["*.c"], ["*.o"], "-c -o $out $in -MD -MF $out.d $flags $cincs $cdefs", ["-std=gnu2x", "cc": Rule(
"-Wall", "cc",
"-Wextra", ["*.c"],
"-Werror"], "$out.d"), ["*.o"],
"cxx": Rule("cxx", ["*.cpp", "*.cc", "*.cxx"], ["*.o"], "-c -o $out $in -MD -MF $out.d $flags $cincs $cdefs", ["-std=gnu++2b", "-c -o $out $in -MD -MF $out.d $flags $cincs $cdefs",
"-Wall", ["-std=gnu2x", "-Wall", "-Wextra", "-Werror"],
"-Wextra", "$out.d",
"-Werror", ),
"-fno-exceptions", "cxx": Rule(
"-fno-rtti"], "$out.d"), "cxx",
["*.cpp", "*.cc", "*.cxx"],
["*.o"],
"-c -o $out $in -MD -MF $out.d $flags $cincs $cdefs",
["-std=gnu++2b", "-Wall", "-Wextra", "-Werror", "-fno-exceptions", "-fno-rtti"],
"$out.d",
),
"as": Rule("as", ["*.s", "*.asm", "*.S"], ["*.o"], "-o $out $in $flags"), "as": Rule("as", ["*.s", "*.asm", "*.S"], ["*.o"], "-o $out $in $flags"),
"ar": Rule("ar", ["*.o"], ["*.a"], "$flags $out $in"), "ar": Rule("ar", ["*.o"], ["*.a"], "$flags $out $in"),
"ld": Rule("ld", ["*.o", "*.a"], ["*.out"], "-o $out $in $flags"), "ld": Rule("ld", ["*.o", "*.a"], ["*.out"], "-o $out $in $flags"),

View file

@ -12,14 +12,17 @@ import logging
import tempfile import tempfile
from pathlib import Path
from typing import Optional from typing import Optional
from cutekit import const from . import const
logger = logging.getLogger(__name__) _logger = logging.getLogger(__name__)
class Uname: class Uname:
def __init__(self, sysname: str, nodename: str, release: str, version: str, machine: str): def __init__(
self, sysname: str, nodename: str, release: str, version: str, machine: str
):
self.sysname = sysname self.sysname = sysname
self.nodename = nodename self.nodename = nodename
self.release = release self.release = release
@ -47,79 +50,30 @@ def sha256sum(path: str) -> str:
return hashlib.sha256(f.read()).hexdigest() return hashlib.sha256(f.read()).hexdigest()
def find(path: str | list[str], wildcards: list[str] = [], recusive: bool = True) -> list[str]: def rmrf(path: Path) -> bool:
logger.info(f"Looking for files in {path} matching {wildcards}") _logger.info(f"Removing directory {path}")
result: list[str] = [] if not path.exists():
if isinstance(path, list):
for p in path:
result += find(p, wildcards, recusive)
return result
if not os.path.isdir(path):
return []
if recusive:
for root, _, files in os.walk(path):
for f in files:
if len(wildcards) == 0:
result.append(os.path.join(root, f))
else:
for wildcard in wildcards:
if fnmatch.fnmatch(f, wildcard):
result.append(os.path.join(root, f))
break
else:
for f in os.listdir(path):
if len(wildcards) == 0:
result.append(os.path.join(path, f))
else:
for wildcard in wildcards:
if fnmatch.fnmatch(f, wildcard):
result.append(os.path.join(path, f))
break
return result
def mkdir(path: str) -> str:
logger.info(f"Creating directory {path}")
try:
os.makedirs(path)
except OSError as exc:
if not (exc.errno == errno.EEXIST and os.path.isdir(path)):
raise
return path
def rmrf(path: str) -> bool:
logger.info(f"Removing directory {path}")
if not os.path.exists(path):
return False return False
shutil.rmtree(path, ignore_errors=True) shutil.rmtree(path, ignore_errors=True)
return True return True
def wget(url: str, path: Optional[str] = None) -> str: def wget(url: str, path: Optional[Path] = None) -> Path:
import requests import requests
if path is None: if path is None:
path = os.path.join( path = const.CACHE_DIR / hashlib.sha256(url.encode("utf-8")).hexdigest()
const.CACHE_DIR,
hashlib.sha256(url.encode('utf-8')).hexdigest())
if os.path.exists(path): if path.exists():
return path return path
logger.info(f"Downloading {url} to {path}") _logger.info(f"Downloading {url} to {path}")
r = requests.get(url, stream=True) r = requests.get(url, stream=True)
r.raise_for_status() r.raise_for_status()
mkdir(os.path.dirname(path)) path.parent.mkdir(parents=True, exist_ok=True)
with open(path, 'wb') as f: with path.open("wb") as f:
for chunk in r.iter_content(chunk_size=8192): for chunk in r.iter_content(chunk_size=8192):
if chunk: if chunk:
f.write(chunk) f.write(chunk)
@ -128,17 +82,20 @@ def wget(url: str, path: Optional[str] = None) -> str:
def exec(*args: str, quiet: bool = False) -> bool: def exec(*args: str, quiet: bool = False) -> bool:
logger.info(f"Executing {args}") _logger.info(f"Executing {args}")
try: try:
proc = subprocess.run( proc = subprocess.run(
args, stdout=sys.stdout if not quiet else subprocess.PIPE, stderr=sys.stderr if not quiet else subprocess.PIPE) args,
stdout=sys.stdout if not quiet else subprocess.PIPE,
stderr=sys.stderr if not quiet else subprocess.PIPE,
)
if proc.stdout: if proc.stdout:
logger.info(proc.stdout.decode('utf-8')) _logger.info(proc.stdout.decode("utf-8"))
if proc.stderr: if proc.stderr:
logger.error(proc.stderr.decode('utf-8')) _logger.error(proc.stderr.decode("utf-8"))
except FileNotFoundError: except FileNotFoundError:
raise RuntimeError(f"{args[0]}: Command not found") raise RuntimeError(f"{args[0]}: Command not found")
@ -150,14 +107,13 @@ def exec(*args: str, quiet: bool = False) -> bool:
raise RuntimeError(f"{args[0]}: Segmentation fault") raise RuntimeError(f"{args[0]}: Segmentation fault")
if proc.returncode != 0: if proc.returncode != 0:
raise RuntimeError( raise RuntimeError(f"{args[0]}: Process exited with code {proc.returncode}")
f"{args[0]}: Process exited with code {proc.returncode}")
return True return True
def popen(*args: str) -> str: def popen(*args: str) -> str:
logger.info(f"Executing {args}") _logger.info(f"Executing {args}")
try: try:
proc = subprocess.run(args, stdout=subprocess.PIPE, stderr=sys.stderr) proc = subprocess.run(args, stdout=subprocess.PIPE, stderr=sys.stderr)
@ -168,48 +124,43 @@ def popen(*args: str) -> str:
raise RuntimeError(f"{args[0]}: Segmentation fault") raise RuntimeError(f"{args[0]}: Segmentation fault")
if proc.returncode != 0: if proc.returncode != 0:
raise RuntimeError( raise RuntimeError(f"{args[0]}: Process exited with code {proc.returncode}")
f"{args[0]}: Process exited with code {proc.returncode}")
return proc.stdout.decode('utf-8') return proc.stdout.decode("utf-8")
def readdir(path: str) -> list[str]: def cp(src: Path, dst: Path):
logger.info(f"Reading directory {path}") _logger.info(f"Copying {src} to {dst}")
try:
return os.listdir(path)
except FileNotFoundError:
return []
def cp(src: str, dst: str):
logger.info(f"Copying {src} to {dst}")
shutil.copy(src, dst) shutil.copy(src, dst)
def mv(src: str, dst: str): def mv(src: Path, dst: Path):
logger.info(f"Moving {src} to {dst}") _logger.info(f"Moving {src} to {dst}")
shutil.move(src, dst) shutil.move(src, dst)
def cpTree(src: str, dst: str): def cpTree(src: Path, dst: Path):
logger.info(f"Copying {src} to {dst}") _logger.info(f"Copying {src} to {dst}")
shutil.copytree(src, dst, dirs_exist_ok=True) shutil.copytree(src, dst, dirs_exist_ok=True)
def cloneDir(url: str, path: str, dest: str) -> str: def cloneDir(url: str, path: Path, dest: Path) -> Path:
with tempfile.TemporaryDirectory() as tmp: with tempfile.TemporaryDirectory() as tmp:
mkdir(tmp) tmp = Path(tmp)
exec(*["git", "clone", "-n", "--depth=1", tmp.mkdir(parents=True, exist_ok=True)
"--filter=tree:0", url, tmp, "-q"], quiet=True) exec(
exec(*["git", "-C", tmp, "sparse-checkout", *["git", "clone", "-n", "--depth=1", "--filter=tree:0", url, tmp, "-q"],
"set", "--no-cone", path, "-q"], quiet=True) quiet=True,
)
exec(
*["git", "-C", tmp, "sparse-checkout", "set", "--no-cone", path, "-q"],
quiet=True,
)
exec(*["git", "-C", tmp, "checkout", "-q", "--no-progress"], quiet=True) exec(*["git", "-C", tmp, "checkout", "-q", "--no-progress"], quiet=True)
mv(os.path.join(tmp, path), dest) mv(tmp / path, dest)
return dest return dest
@ -232,7 +183,7 @@ def latest(cmd: str) -> str:
if cmd in LATEST_CACHE: if cmd in LATEST_CACHE:
return LATEST_CACHE[cmd] return LATEST_CACHE[cmd]
logger.info(f"Finding latest version of {cmd}") _logger.info(f"Finding latest version of {cmd}")
regex: re.Pattern[str] regex: re.Pattern[str]
@ -254,14 +205,14 @@ def latest(cmd: str) -> str:
versions.sort() versions.sort()
chosen = versions[-1] chosen = versions[-1]
logger.info(f"Chosen {chosen} as latest version of {cmd}") _logger.info(f"Chosen {chosen} as latest version of {cmd}")
LATEST_CACHE[cmd] = chosen LATEST_CACHE[cmd] = chosen
return chosen return chosen
def which(cmd: str) -> str | None: def which(cmd: str) -> Optional[str]:
""" """
Find the path of a command Find the path of a command
""" """

View file

@ -3,11 +3,11 @@ from typing import Any, TypeVar, cast, Optional, Union
import json import json
import hashlib import hashlib
T = TypeVar('T') T = TypeVar("T")
def uniq(l: list[str]) -> list[str]: def uniq(l: list[T]) -> list[T]:
result: list[str] = [] result: list[T] = []
for i in l: for i in l:
if i in result: if i in result:
result.remove(i) result.remove(i)
@ -15,7 +15,9 @@ def uniq(l: list[str]) -> list[str]:
return result return result
def hash(obj: Any, keys: list[str] = [], cls: Optional[type[json.JSONEncoder]] = None) -> str: def hash(
obj: Any, keys: list[str] = [], cls: Optional[type[json.JSONEncoder]] = None
) -> str:
toHash = {} toHash = {}
if len(keys) == 0: if len(keys) == 0:
toHash = obj toHash = obj
@ -28,7 +30,7 @@ def hash(obj: Any, keys: list[str] = [], cls: Optional[type[json.JSONEncoder]] =
def camelCase(s: str) -> str: def camelCase(s: str) -> str:
s = ''.join(x for x in s.title() if x != '_' and x != '-') s = "".join(x for x in s.title() if x != "_" and x != "-")
s = s[0].lower() + s[1:] s = s[0].lower() + s[1:]
return s return s
@ -60,4 +62,6 @@ def asList(i: Optional[Union[T, list[T]]]) -> list[T]:
def isNewer(path1: str, path2: str) -> bool: def isNewer(path1: str, path2: str) -> bool:
return not os.path.exists(path2) or os.path.getmtime(path1) > os.path.getmtime(path2) return not os.path.exists(path2) or os.path.getmtime(path1) > os.path.getmtime(
path2
)

View file

@ -28,10 +28,6 @@ CROSSED = "\033[9m"
RESET = "\033[0m" RESET = "\033[0m"
def title(text: str):
print(f"{BOLD}{text}{RESET}:")
def wordwrap(text: str, width: int = 60, newline: str = "\n") -> str: def wordwrap(text: str, width: int = 60, newline: str = "\n") -> str:
result = "" result = ""
curr = 0 curr = 0
@ -49,3 +45,11 @@ def wordwrap(text: str, width: int = 60, newline: str = "\n") -> str:
def indent(text: str, indent: int = 4) -> str: def indent(text: str, indent: int = 4) -> str:
return " " * indent + text.replace("\n", "\n" + " " * indent) return " " * indent + text.replace("\n", "\n" + " " * indent)
def title(text: str):
print(f"{BOLD}{text}{RESET}:")
def p(text: str):
return indent(wordwrap(text))

View file

@ -1,5 +1,4 @@
# Extending cutekit
# Extending cutekit
By writing custom Python plugins, you can extend Cutekit to do whatever you want. By writing custom Python plugins, you can extend Cutekit to do whatever you want.
@ -9,24 +8,11 @@ Then you can import cutekit and change/add whatever you want.
For example you can add a new command to the CLI: For example you can add a new command to the CLI:
```python ```python
import os from cutekit import cli
import json
import magic
import logging
from pathlib import Path
@cli.command("h", "hello", "Print hello world")
from cutekit import shell, builder, const, project def bootCmd(args: cli.Args) -> None:
from cutekit.cmds import Cmd, append
from cutekit.args import Args
from typing import Callable
def bootCmd(args: Args) -> None:
project.chdir()
print("Hello world!") print("Hello world!")
append(Cmd("h", "hello", "Print hello world", bootCmd))
``` ```
This feature is used - for example - by [SkiftOS](https://github.com/skift-org/skift/blob/main/meta/plugins/start-cmd.py) to add the `start` command, that build packages and run a virtual machine. This feature is used - for example - by [SkiftOS](https://github.com/skift-org/skift/blob/main/meta/plugins/start-cmd.py) to add the `start` command, that build packages and run a virtual machine.

View file

@ -14,7 +14,11 @@ authors = [
readme = "README.md" readme = "README.md"
requires-python = ">=3.10" requires-python = ">=3.10"
license = { text = "MIT" } license = { text = "MIT" }
dependencies = ["requests ~= 2.28.0", "graphviz ~= 0.20.1"] dependencies = [
"requests ~= 2.28.0",
"graphviz ~= 0.20.1",
"dataclasses-json ~= 0.6.2",
]
dynamic = ["version"] dynamic = ["version"]
[project.scripts] [project.scripts]

View file

@ -1,2 +0,0 @@
requests ~= 2.28.0
graphviz ~= 0.20.1

90
tests/test_resolver.py Normal file
View file

@ -0,0 +1,90 @@
from cutekit import model
def test_direct_deps():
r = model.Registry("")
r._append(model.Component("myapp", requires=["mylib"]))
r._append(model.Component("mylib"))
t = model.Target("host")
res = model.Resolver(r, t)
resolved = res.resolve("myapp")
assert resolved.reason is None
assert resolved.resolved == ["myapp", "mylib"]
def test_indirect_deps():
r = model.Registry("")
r._append(model.Component("myapp", requires=["mylib"]))
r._append(model.Component("mylib", requires=["myembed"]))
r._append(model.Component("myimpl", provides=["myembed"]))
t = model.Target("host")
res = model.Resolver(r, t)
assert res.resolve("myapp").resolved == ["myapp", "mylib", "myimpl"]
def test_deps_routing():
r = model.Registry("")
r._append(model.Component("myapp", requires=["mylib"]))
r._append(model.Component("mylib", requires=["myembed"]))
r._append(model.Component("myimplA", provides=["myembed"]))
r._append(model.Component("myimplB", provides=["myembed"]))
t = model.Target("host", routing={"myembed": "myimplB"})
res = model.Resolver(r, t)
assert res.resolve("myapp").resolved == ["myapp", "mylib", "myimplB"]
t = model.Target("host", routing={"myembed": "myimplA"})
res = model.Resolver(r, t)
assert res.resolve("myapp").resolved == ["myapp", "mylib", "myimplA"]
t = model.Target("host", routing={"myembed": "myimplC"})
res = model.Resolver(r, t)
assert res.resolve("myapp").reason == "No provider for 'myembed'"
def test_deps_routing_with_props():
r = model.Registry("")
r._append(model.Component("myapp", requires=["mylib"]))
r._append(model.Component("mylib", requires=["myembed"]))
r._append(
model.Component("myimplA", provides=["myembed"], enableIf={"myprop": ["a"]})
)
r._append(
model.Component("myimplB", provides=["myembed"], enableIf={"myprop": ["b"]})
)
t = model.Target("host", routing={"myembed": "myimplB"}, props={"myprop": "b"})
res = model.Resolver(r, t)
assert res.resolve("myapp").resolved == ["myapp", "mylib", "myimplB"]
t = model.Target("host", routing={"myembed": "myimplA"}, props={"myprop": "a"})
res = model.Resolver(r, t)
assert res.resolve("myapp").resolved == ["myapp", "mylib", "myimplA"]
t = model.Target("host", routing={"myembed": "myimplC"}, props={"myprop": "c"})
res = model.Resolver(r, t)
resolved = res.resolve("myapp")
assert resolved.reason == "No provider for 'myembed'"
def test_deps_routing_with_props_and_requires():
r = model.Registry("")
r._append(model.Component("myapp", requires=["mylib"]))
r._append(model.Component("mylib", requires=["myembed"]))
r._append(
model.Component("myimplA", provides=["myembed"], enableIf={"myprop": ["a"]})
)
r._append(
model.Component("myimplB", provides=["myembed"], enableIf={"myprop": ["b"]})
)
t = model.Target("host", routing={"myembed": "myimplB"}, props={"myprop": "b"})
res = model.Resolver(r, t)
assert res.resolve("myapp").resolved == ["myapp", "mylib", "myimplB"]
t = model.Target("host", routing={"myembed": "myimplA"}, props={"myprop": "a"})
res = model.Resolver(r, t)
assert res.resolve("myapp").resolved == ["myapp", "mylib", "myimplA"]
t = model.Target("host", routing={"myembed": "myimplC"}, props={"myprop": "c"})
res = model.Resolver(r, t)
assert res.resolve("myapp").reason == "No provider for 'myembed'"