Refactored build context and dependecy resolution code

This commit is contained in:
Sleepy Monax 2023-11-13 19:21:54 +01:00
parent 244e8664e8
commit e1ff2307f5
12 changed files with 896 additions and 756 deletions

View file

@ -3,12 +3,20 @@ import os
import logging import logging
from . import ( from . import (
builder,
cli, cli,
compat,
const, const,
graph,
jexpr,
mixins,
model, model,
ninja,
plugins, plugins,
rules,
shell,
utils,
vt100, vt100,
cmds, # noqa: F401
) )

View file

@ -1,170 +1,290 @@
import os import os
import logging import logging
from typing import TextIO from pathlib import Path
from dataclasses import dataclass
from itertools import chain
from typing import Generator, TextIO, Union, cast
from . import shell, rules, model, ninja, context from . import shell, rules, model, ninja, const, utils, cli
_logger = logging.getLogger(__name__) _logger = logging.getLogger(__name__)
def gen(out: TextIO, context: context.Context): def aggregateCincs(target: model.Target, registry: model.Registry) -> set[str]:
writer = ninja.Writer(out) res = set()
target = context.target for c in registry.iterEnabled(target):
if "cpp-root-include" in c.props:
res.add(c.dirname())
elif c.type == model.Kind.LIB:
res.add(str(Path(c.dirname()).parent))
writer.comment("File generated by the build system, do not edit") return set(map(lambda i: f"-I{i}", res))
writer.newline()
writer.variable("builddir", context.builddir())
writer.separator("Tools")
writer.variable("cincs", " ".join(map(lambda i: f"-I{i}", context.cincls()))) def aggregateCdefs(target: model.Target) -> set[str]:
res = set()
writer.variable("cdefs", " ".join(context.cdefs())) def sanatize(s: str) -> str:
return s.lower().replace(" ", "_").replace("-", "_").replace(".", "_")
writer.newline() for k, v in target.props.items():
if isinstance(v, bool):
if v:
res.add(f"-D__ck_{sanatize(k)}__")
else:
res.add(f"-D__ck_{sanatize(k)}_{sanatize(str(v))}__")
res.add(f"-D__ck_{sanatize(k)}_value={str(v)}")
writer.rule("cp", "cp $in $out") return res
writer.newline()
def buildpath(target: model.Target, component: model.Component, path) -> Path:
return Path(target.builddir) / component.id / path
# --- Compilation ------------------------------------------------------------ #
def listSrc(component: model.Component) -> list[str]:
wildcards = set(chain(*map(lambda rule: rule.fileIn, rules.rules.values())))
dirs = [component.dirname()] + list(
map(lambda d: os.path.join(component.dirname(), d), component.subdirs)
)
return shell.find(dirs, list(wildcards), recusive=False)
def compileSrc(
w: ninja.Writer, target: model.Target, component: model.Component
) -> list[str]:
res: list[str] = []
for src in listSrc(component):
rel = Path(src).relative_to(component.dirname())
r = rules.byFileIn(src)
if r is None:
raise RuntimeError(f"Unknown rule for file {src}")
dest = buildpath(target, component, "obj") / rel.with_suffix(r.fileOut[0][1:])
t = target.tools[r.id]
w.build(str(dest), r.id, inputs=src, order_only=t.files)
res.append(str(dest))
return res
# --- Ressources ------------------------------------------------------------- #
def listRes(component: model.Component) -> list[str]:
return shell.find(str(component.subpath("res")))
def compileRes(
w: ninja.Writer,
target: model.Target,
component: model.Component,
) -> list[str]:
res: list[str] = []
for r in listRes(component):
rel = Path(r).relative_to(component.subpath("res"))
dest = buildpath(target, component, "res") / rel
w.build(str(dest), "cp", r)
res.append(str(dest))
return res
# --- Linking ---------------------------------------------------------------- #
def outfile(target: model.Target, component: model.Component) -> str:
if component.type == model.Kind.LIB:
return str(buildpath(target, component, f"lib/{component.id}.a"))
else:
return str(buildpath(target, component, f"bin/{component.id}.out"))
def collectLibs(
registry: model.Registry, target: model.Target, component: model.Component
) -> list[str]:
res: list[str] = []
for r in component.resolved[target.id].resolved:
req = registry.lookup(r, model.Component)
assert req is not None # model.Resolver has already checked this
if r == component.id:
continue
if not req.type == model.Kind.LIB:
raise RuntimeError(f"Component {r} is not a library")
res.append(outfile(target, req))
return res
def link(
w: ninja.Writer,
registry: model.Registry,
target: model.Target,
component: model.Component,
) -> str:
w.newline()
out = outfile(target, component)
objs: list[str] = compileSrc(w, target, component)
res = compileRes(w, target, component)
libs = collectLibs(registry, target, component)
if component.type == model.Kind.LIB:
w.build(out, "ar", objs, implicit=res)
else:
w.build(out, "ld", objs + libs, implicit=res)
return out
# --- Phony ------------------------------------------------------------------ #
def all(w: ninja.Writer, registry: model.Registry, target: model.Target) -> list[str]:
all: list[str] = []
for c in registry.iterEnabled(target):
all.append(link(w, registry, target, c))
w.build("all", "phony", all)
w.default("all")
return all
def gen(out: TextIO, target: model.Target, registry: model.Registry):
w = ninja.Writer(out)
w.comment("File generated by the build system, do not edit")
w.newline()
w.variable("builddir", target.builddir)
w.variable("hashid", target.hashid)
w.separator("Tools")
w.variable("cincs", " ".join(aggregateCincs(target, registry)))
w.variable("cdefs", " ".join(aggregateCdefs(target)))
w.newline()
w.rule("cp", "cp $in $out")
for i in target.tools: for i in target.tools:
tool = target.tools[i] tool = target.tools[i]
rule = rules.rules[i] rule = rules.rules[i]
writer.variable(i, tool.cmd) w.variable(i, tool.cmd)
writer.variable(i + "flags", " ".join(rule.args + tool.args)) w.variable(i + "flags", " ".join(rule.args + tool.args))
writer.rule( w.rule(
i, i,
f"{tool.cmd} {rule.rule.replace('$flags',f'${i}flags')}", f"{tool.cmd} {rule.rule.replace('$flags',f'${i}flags')}",
depfile=rule.deps, depfile=rule.deps,
) )
writer.newline() w.newline()
writer.separator("Components") w.separator("Build")
all: list[str] = [] all(w, registry, target)
for instance in context.enabledInstances():
objects = instance.objsfiles()
assets = instance.resfiles()
writer.comment(f"Component: {instance.manifest.id}")
writer.comment(f"Resolved: {', '.join(instance.resolved)}")
for obj in objects: @dataclass
r = rules.byFileIn(obj[0]) class Product:
if r is None: path: Path
raise RuntimeError(f"Unknown rule for file {obj[0]}") target: model.Target
t = target.tools[r.id] component: model.Component
writer.build(obj[1], r.id, obj[0], order_only=t.files)
for asset in assets:
writer.build(asset[1], "cp", asset[0])
writer.newline()
if instance.isLib():
writer.build(
instance.outfile(),
"ar",
list(map(lambda o: o[1], objects)),
implicit=list(map(lambda o: o[1], assets)),
)
else:
libraries: list[str] = []
for req in instance.resolved:
reqInstance = context.componentByName(req)
if reqInstance is None:
raise RuntimeError(f"Component {req} not found")
if not reqInstance.isLib():
raise RuntimeError(f"Component {req} is not a library")
libraries.append(reqInstance.outfile())
writer.build(
instance.outfile(),
"ld",
list(map(lambda o: o[1], objects)) + libraries,
implicit=list(map(lambda o: o[1], assets)),
)
all.append(instance.outfile())
writer.newline()
writer.separator("Phony targets")
writer.build("all", "phony", all)
writer.default("all")
def build( def build(
componentSpec: str, targetSpec: str, props: model.Props = {} target: model.Target,
) -> context.ComponentInstance: registry: model.Registry,
ctx = context.contextFor(targetSpec, props) components: Union[list[model.Component], model.Component, None] = None,
) -> list[Product]:
shell.mkdir(ctx.builddir()) all = False
ninjaPath = os.path.join(ctx.builddir(), "build.ninja") shell.mkdir(target.builddir)
ninjaPath = os.path.join(target.builddir, "build.ninja")
with open(ninjaPath, "w") as f: with open(ninjaPath, "w") as f:
gen(f, ctx) gen(f, target, registry)
instance = ctx.componentByName(componentSpec) if components is None:
all = True
components = list(registry.iterEnabled(target))
if instance is None: if isinstance(components, model.Component):
raise RuntimeError(f"Component {componentSpec} not found") components = [components]
if not instance.enabled: products: list[Product] = []
raise RuntimeError( for c in components:
f"Component {componentSpec} is disabled: {instance.disableReason}" products.append(
Product(
path=Path(outfile(target, c)),
target=target,
component=c,
)
) )
shell.exec("ninja", "-f", ninjaPath, instance.outfile()) outs = list(map(lambda p: str(p.path), products))
if all:
return instance
class Paths:
bin: str
lib: str
obj: str
def __init__(self, bin: str, lib: str, obj: str):
self.bin = bin
self.lib = lib
self.obj = obj
def buildAll(targetSpec: str, props: model.Props = {}) -> context.Context:
ctx = context.contextFor(targetSpec, props)
shell.mkdir(ctx.builddir())
ninjaPath = os.path.join(ctx.builddir(), "build.ninja")
with open(ninjaPath, "w") as f:
gen(f, ctx)
shell.exec("ninja", "-v", "-f", ninjaPath) shell.exec("ninja", "-v", "-f", ninjaPath)
else:
return ctx shell.exec("ninja", "-v", "-f", ninjaPath, *outs)
return products
def testAll(targetSpec: str): # --- Commands --------------------------------------------------------------- #
ctx = context.contextFor(targetSpec)
shell.mkdir(ctx.builddir())
ninjaPath = os.path.join(ctx.builddir(), "build.ninja")
with open(ninjaPath, "w") as f: @cli.command("b", "build", "Build a component or all components")
gen(f, ctx) def buildCmd(args: cli.Args):
registry = model.Registry.use(args)
target = model.Target.use(args)
componentSpec = args.consumeArg()
if componentSpec is None:
raise RuntimeError("No component specified")
component = registry.lookup(componentSpec, model.Component)
build(target, registry, component)[0]
shell.exec("ninja", "-v", "-f", ninjaPath, "all")
for instance in ctx.enabledInstances(): @cli.command("p", "project", "Show project information")
if instance.isLib(): def runCmd(args: cli.Args):
continue registry = model.Registry.use(args)
target = model.Target.use(args)
debug = args.consumeOpt("debug", False) is True
if instance.id().endswith("-tests"): componentSpec = args.consumeArg()
print(f"Running {instance.id()}") if componentSpec is None:
shell.exec(instance.outfile()) raise RuntimeError("No component specified")
component = registry.lookup(componentSpec, model.Component)
if component is None:
raise RuntimeError(f"Component {componentSpec} not found")
product = build(target, registry, component)[0]
os.environ["CK_TARGET"] = target.id
os.environ["CK_COMPONENT"] = product.component.id
os.environ["CK_BUILDDIR"] = target.builddir
shell.exec(*(["lldb", "-o", "run"] if debug else []), str(product.path), *args.args)
@cli.command("t", "test", "Run all test targets")
def testCmd(args: cli.Args):
# This is just a wrapper around the `run` command that try
# to run a special hook component named __tests__.
args.args.insert(0, "__tests__")
runCmd(args)
@cli.command("d", "debug", "Debug a component")
def debugCmd(args: cli.Args):
# This is just a wrapper around the `run` command that
# always enable debug mode.
args.opts["debug"] = True
runCmd(args)
@cli.command("c", "clean", "Clean build files")
def cleanCmd(args: cli.Args):
model.Project.use(args)
shell.rmrf(const.BUILD_DIR)
@cli.command("n", "nuke", "Clean all build files and caches")
def nukeCmd(args: cli.Args):
model.Project.use(args)
shell.rmrf(const.PROJECT_CK_DIR)

View file

@ -70,7 +70,7 @@ Callback = Callable[[Args], None]
@dataclass @dataclass
class Command: class Command:
shortName: str shortName: Optional[str]
longName: str longName: str
helpText: str helpText: str
isPlugin: bool isPlugin: bool
@ -86,7 +86,7 @@ def append(command: Command):
commands.sort(key=lambda c: c.shortName or c.longName) commands.sort(key=lambda c: c.shortName or c.longName)
def command(shortName: str, longName: str, helpText: str): def command(shortName: Optional[str], longName: str, helpText: str):
curframe = inspect.currentframe() curframe = inspect.currentframe()
calframe = inspect.getouterframes(curframe, 2) calframe = inspect.getouterframes(curframe, 2)
@ -103,7 +103,7 @@ def command(shortName: str, longName: str, helpText: str):
@command("u", "usage", "Show usage information") @command("u", "usage", "Show usage information")
def usage(args: Args | None = None): def usage(args: Optional[Args] = None):
print(f"Usage: {const.ARGV0} <command> [args...]") print(f"Usage: {const.ARGV0} <command> [args...]")
@ -122,7 +122,7 @@ def helpCmd(args: Args):
print() print()
vt100.title("Commands") vt100.title("Commands")
for cmd in commands: for cmd in sorted(commands, key=lambda c: c.shortName or c.longName):
pluginText = "" pluginText = ""
if cmd.isPlugin: if cmd.isPlugin:
pluginText = f"{vt100.CYAN}(plugin){vt100.RESET}" pluginText = f"{vt100.CYAN}(plugin){vt100.RESET}"
@ -140,7 +140,7 @@ def helpCmd(args: Args):
@command("v", "version", "Show current version") @command("v", "version", "Show current version")
def versionCmd(args: Args): def versionCmd(args: Args):
print(f"CuteKit v{const.VERSION_STR}\n") print(f"CuteKit v{const.VERSION_STR}")
def exec(args: Args): def exec(args: Args):

View file

@ -1,196 +0,0 @@
import logging
import os
from . import (
context,
shell,
const,
vt100,
builder,
cli,
model,
jexpr,
)
_logger = logging.getLogger(__name__)
@cli.command("p", "project", "Show project information")
def runCmd(args: cli.Args):
model.Project.chdir()
targetSpec = str(args.consumeOpt("target", "host-" + shell.uname().machine))
props = args.consumePrefix("prop:")
componentSpec = args.consumeArg()
if componentSpec is None:
raise RuntimeError("Component not specified")
component = builder.build(componentSpec, targetSpec, props)
os.environ["CK_TARGET"] = component.context.target.id
os.environ["CK_COMPONENT"] = component.id()
os.environ["CK_BUILDDIR"] = component.context.builddir()
shell.exec(component.outfile(), *args.args)
@cli.command("t", "test", "Run all test targets")
def testCmd(args: cli.Args):
model.Project.chdir()
targetSpec = str(args.consumeOpt("target", "host-" + shell.uname().machine))
builder.testAll(targetSpec)
@cli.command("d", "debug", "Debug a component")
def debugCmd(args: cli.Args):
model.Project.chdir()
targetSpec = str(args.consumeOpt("target", "host-" + shell.uname().machine))
props = args.consumePrefix("prop:")
componentSpec = args.consumeArg()
if componentSpec is None:
raise RuntimeError("Component not specified")
component = builder.build(componentSpec, targetSpec, props)
os.environ["CK_TARGET"] = component.context.target.id
os.environ["CK_COMPONENT"] = component.id()
os.environ["CK_BUILDDIR"] = component.context.builddir()
shell.exec("lldb", "-o", "run", component.outfile(), *args.args)
@cli.command("b", "build", "Build a component or all components")
def buildCmd(args: cli.Args):
model.Project.chdir()
targetSpec = str(args.consumeOpt("target", "host-" + shell.uname().machine))
props = args.consumePrefix("prop:")
componentSpec = args.consumeArg()
if componentSpec is None:
builder.buildAll(targetSpec, props)
else:
builder.build(componentSpec, targetSpec, props)
@cli.command("l", "list", "List all components and targets")
def listCmd(args: cli.Args):
model.Project.chdir()
components = context.loadAllComponents()
targets = context.loadAllTargets()
vt100.title("Components")
if len(components) == 0:
print(" (No components available)")
else:
print(vt100.indent(vt100.wordwrap(", ".join(map(lambda m: m.id, components)))))
print()
vt100.title("Targets")
if len(targets) == 0:
print(" (No targets available)")
else:
print(vt100.indent(vt100.wordwrap(", ".join(map(lambda m: m.id, targets)))))
print()
@cli.command("c", "clean", "Clean build files")
def cleanCmd(args: cli.Args):
model.Project.chdir()
shell.rmrf(const.BUILD_DIR)
@cli.command("n", "nuke", "Clean all build files and caches")
def nukeCmd(args: cli.Args):
model.Project.chdir()
shell.rmrf(const.PROJECT_CK_DIR)
def grabExtern(extern: dict[str, model.Extern]):
for extSpec, ext in extern.items():
extPath = os.path.join(const.EXTERN_DIR, extSpec)
if os.path.exists(extPath):
print(f"Skipping {extSpec}, already installed")
continue
print(f"Installing {extSpec}-{ext.tag} from {ext.git}...")
shell.popen(
"git", "clone", "--depth", "1", "--branch", ext.tag, ext.git, extPath
)
if os.path.exists(os.path.join(extPath, "project.json")):
grabExtern(context.loadProject(extPath).extern)
@cli.command("i", "install", "Install required external packages")
def installCmd(args: cli.Args):
model.Project.chdir()
pj = context.loadProject(".")
grabExtern(pj.extern)
@cli.command("I", "init", "Initialize a new project")
def initCmd(args: cli.Args):
import requests
repo = args.consumeOpt("repo", const.DEFAULT_REPO_TEMPLATES)
list = args.consumeOpt("list")
template = args.consumeArg()
name = args.consumeArg()
_logger.info("Fetching registry...")
r = requests.get(f"https://raw.githubusercontent.com/{repo}/main/registry.json")
if r.status_code != 200:
_logger.error("Failed to fetch registry")
exit(1)
registry = r.json()
if list:
print(
"\n".join(f"* {entry['id']} - {entry['description']}" for entry in registry)
)
return
if not template:
raise RuntimeError("Template not specified")
def template_match(t: jexpr.Json) -> str:
return t["id"] == template
if not any(filter(template_match, registry)):
raise LookupError(f"Couldn't find a template named {template}")
if not name:
_logger.info(f"No name was provided, defaulting to {template}")
name = template
if os.path.exists(name):
raise RuntimeError(f"Directory {name} already exists")
print(f"Creating project {name} from template {template}...")
shell.cloneDir(f"https://github.com/{repo}", template, name)
print(f"Project {name} created\n")
print("We suggest that you begin by typing:")
print(f" {vt100.GREEN}cd {name}{vt100.RESET}")
print(
f" {vt100.GREEN}cutekit install{vt100.BRIGHT_BLACK} # Install external packages{vt100.RESET}"
)
print(
f" {vt100.GREEN}cutekit build{vt100.BRIGHT_BLACK} # Build the project{vt100.RESET}"
)

View file

@ -1,351 +0,0 @@
from typing import cast, Optional, Protocol, Iterable
from itertools import chain
from pathlib import Path
import os
import logging
from . import const, shell, jexpr, utils, rules, mixins, model
_logger = logging.getLogger(__name__)
class IContext(Protocol):
target: model.Target
def builddir(self) -> str:
...
class ComponentInstance:
enabled: bool = True
disableReason = ""
manifest: model.Component
sources: list[str] = []
res: list[str] = []
resolved: list[str] = []
context: IContext
def __init__(
self,
enabled: bool,
disableReason: str,
manifest: model.Component,
sources: list[str],
res: list[str],
resolved: list[str],
):
self.enabled = enabled
self.disableReason = disableReason
self.manifest = manifest
self.sources = sources
self.res = res
self.resolved = resolved
def id(self) -> str:
return self.manifest.id
def isLib(self):
return self.manifest.type == model.Kind.LIB
def objdir(self) -> str:
return os.path.join(self.context.builddir(), f"{self.manifest.id}/obj")
def resdir(self) -> str:
return os.path.join(self.context.builddir(), f"{self.manifest.id}/res")
def objsfiles(self) -> list[tuple[str, str]]:
def toOFile(s: str) -> str:
return os.path.join(
self.objdir(),
s.replace(os.path.join(self.manifest.dirname(), ""), "") + ".o",
)
return list(map(lambda s: (s, toOFile(s)), self.sources))
def resfiles(self) -> list[tuple[str, str, str]]:
def toAssetFile(s: str) -> str:
return os.path.join(
self.resdir(),
s.replace(os.path.join(self.manifest.dirname(), "res/"), ""),
)
def toAssetId(s: str) -> str:
return s.replace(os.path.join(self.manifest.dirname(), "res/"), "")
return list(map(lambda s: (s, toAssetFile(s), toAssetId(s)), self.res))
def outfile(self) -> str:
if self.isLib():
return os.path.join(
self.context.builddir(), self.manifest.id, f"lib/{self.manifest.id}.a"
)
else:
return os.path.join(
self.context.builddir(), self.manifest.id, f"bin/{self.manifest.id}.out"
)
def cinclude(self) -> str:
if "cpp-root-include" in self.manifest.props:
return self.manifest.dirname()
elif self.manifest.type == model.Kind.LIB:
return str(Path(self.manifest.dirname()).parent)
else:
return ""
class Context(IContext):
target: model.Target
instances: list[ComponentInstance]
tools: model.Tools
def enabledInstances(self) -> Iterable[ComponentInstance]:
return filter(lambda x: x.enabled, self.instances)
def __init__(
self,
target: model.Target,
instances: list[ComponentInstance],
tools: model.Tools,
):
self.target = target
self.instances = instances
self.tools = tools
def componentByName(self, name: str) -> Optional[ComponentInstance]:
result = list(filter(lambda x: x.manifest.id == name, self.instances))
if len(result) == 0:
return None
return result[0]
def cincls(self) -> list[str]:
includes = list(
filter(
lambda x: x != "", map(lambda x: x.cinclude(), self.enabledInstances())
)
)
return utils.uniq(includes)
def cdefs(self) -> list[str]:
return self.target.cdefs()
def hashid(self) -> str:
return utils.hash(
(self.target.props, [self.tools[t].to_dict() for t in self.tools])
)[0:8]
def builddir(self) -> str:
return os.path.join(const.BUILD_DIR, f"{self.target.id}-{self.hashid()[:8]}")
def loadAllTargets() -> list[model.Target]:
projectRoot = model.Project.root()
if projectRoot is None:
return []
pj = loadProject(projectRoot)
paths = list(
map(
lambda e: os.path.join(const.EXTERN_DIR, e, const.TARGETS_DIR),
pj.extern.keys(),
)
) + [const.TARGETS_DIR]
ret = []
for entry in paths:
files = shell.find(entry, ["*.json"])
ret += list(
map(
lambda path: model.Manifest.load(Path(path)).ensureType(model.Target),
files,
)
)
return ret
def loadProject(path: str) -> model.Project:
path = os.path.join(path, "project.json")
return model.Manifest.load(Path(path)).ensureType(model.Project)
def loadTarget(id: str) -> model.Target:
try:
return next(filter(lambda t: t.id == id, loadAllTargets()))
except StopIteration:
raise RuntimeError(f"Target '{id}' not found")
def loadAllComponents() -> list[model.Component]:
files = shell.find(const.SRC_DIR, ["manifest.json"])
files += shell.find(const.EXTERN_DIR, ["manifest.json"])
return list(
map(
lambda path: model.Manifest.load(Path(path)).ensureType(model.Component),
files,
)
)
def filterDisabled(
components: list[model.Component], target: model.Target
) -> tuple[list[model.Component], list[model.Component]]:
return list(filter(lambda c: c.isEnabled(target)[0], components)), list(
filter(lambda c: not c.isEnabled(target)[0], components)
)
def providerFor(
what: str, components: list[model.Component]
) -> tuple[Optional[str], str]:
result: list[model.Component] = list(filter(lambda c: c.id == what, components))
if len(result) == 0:
# Try to find a provider
result = list(filter(lambda x: (what in x.provides), components))
if len(result) == 0:
_logger.error(f"No provider for '{what}'")
return (None, f"No provider for '{what}'")
if len(result) > 1:
ids = list(map(lambda x: x.id, result))
_logger.error(f"Multiple providers for '{what}': {result}")
return (None, f"Multiple providers for '{what}': {','.join(ids)}")
return (result[0].id, "")
def resolveDeps(
componentSpec: str, components: list[model.Component], target: model.Target
) -> tuple[bool, str, list[str]]:
mapping = dict(map(lambda c: (c.id, c), components))
def resolveInner(what: str, stack: list[str] = []) -> tuple[bool, str, list[str]]:
result: list[str] = []
what = target.route(what)
resolved, unresolvedReason = providerFor(what, components)
if resolved is None:
return False, unresolvedReason, []
if resolved in stack:
raise RuntimeError(f"Dependency loop: {stack} -> {resolved}")
stack.append(resolved)
for req in mapping[resolved].requires:
keep, unresolvedReason, reqs = resolveInner(req, stack)
if not keep:
stack.pop()
_logger.error(f"Dependency '{req}' not met for '{resolved}'")
return False, unresolvedReason, []
result.extend(reqs)
stack.pop()
result.insert(0, resolved)
return True, "", result
enabled, unresolvedReason, resolved = resolveInner(componentSpec)
return enabled, unresolvedReason, utils.uniq(resolved)
def instanciate(
componentSpec: str, components: list[model.Component], target: model.Target
) -> Optional[ComponentInstance]:
manifest = next(filter(lambda c: c.id == componentSpec, components))
wildcards = set(chain(*map(lambda rule: rule.fileIn, rules.rules.values())))
dirs = [manifest.dirname()] + list(
map(lambda d: os.path.join(manifest.dirname(), d), manifest.subdirs)
)
sources = shell.find(dirs, list(wildcards), recusive=False)
res = shell.find(os.path.join(manifest.dirname(), "res"))
enabled, unresolvedReason, resolved = resolveDeps(componentSpec, components, target)
return ComponentInstance(
enabled, unresolvedReason, manifest, sources, res, resolved[1:]
)
def instanciateDisabled(
component: model.Component, target: model.Target
) -> ComponentInstance:
return ComponentInstance(
enabled=False,
disableReason=component.isEnabled(target)[1],
manifest=component,
sources=[],
res=[],
resolved=[],
)
context: dict[str, Context] = {}
def contextFor(targetSpec: str, props: model.Props = {}) -> Context:
if targetSpec in context:
return context[targetSpec]
_logger.info(f"Loading context for '{targetSpec}'")
targetEls = targetSpec.split(":")
if targetEls[0] == "":
targetEls[0] = "host-" + shell.uname().machine
target = loadTarget(targetEls[0])
target.props |= props
components = loadAllComponents()
components, disabled = filterDisabled(components, target)
tools: model.Tools = {}
for toolSpec in target.tools:
tool = target.tools[toolSpec]
tools[toolSpec] = model.Tool(cmd=tool.cmd, args=tool.args, files=tool.files)
tools[toolSpec].args += rules.rules[toolSpec].args
for m in targetEls[1:]:
mixin = mixins.byId(m)
tools = mixin(target, tools)
for component in components:
for toolSpec in component.tools:
tool = component.tools[toolSpec]
tools[toolSpec].args += tool.args
instances: list[ComponentInstance] = list(
map(lambda c: instanciateDisabled(c, target), disabled)
)
instances += cast(
list[ComponentInstance],
list(
filter(
lambda e: e is not None,
map(lambda c: instanciate(c.id, components, target), components),
)
),
)
context[targetSpec] = Context(
target,
instances,
tools,
)
for instance in instances:
instance.context = context[targetSpec]
return context[targetSpec]

View file

@ -1,96 +1,95 @@
import os import os
from typing import cast from typing import Optional, cast
from . import vt100, context, cli, shell, model
from . import vt100, cli, model
def view( def view(
context: context.Context, registry: model.Registry,
scope: str | None = None, target: model.Target,
scope: Optional[str] = None,
showExe: bool = True, showExe: bool = True,
showDisabled: bool = False, showDisabled: bool = False,
): ):
from graphviz import Digraph # type: ignore from graphviz import Digraph # type: ignore
g = Digraph(context.target.id, filename="graph.gv") g = Digraph(target.id, filename="graph.gv")
g.attr("graph", splines="ortho", rankdir="BT", ranksep="1.5") g.attr("graph", splines="ortho", rankdir="BT", ranksep="1.5")
g.attr("node", shape="ellipse") g.attr("node", shape="ellipse")
g.attr( g.attr(
"graph", "graph",
label=f"<<B>{scope or 'Full Dependency Graph'}</B><BR/>{context.target.id}>", label=f"<<B>{scope or 'Full Dependency Graph'}</B><BR/>{target.id}>",
labelloc="t", labelloc="t",
) )
scopeInstance = None scopeInstance = None
if scope is not None: if scope is not None:
scopeInstance = context.componentByName(scope) scopeInstance = registry.lookup(scope, model.Component)
for instance in context.instances: for component in registry.iterEnabled(target):
if not instance.isLib() and not showExe: if not component.type == model.Kind.LIB and not showExe:
continue continue
if ( if (
scopeInstance is not None scopeInstance is not None
and instance.manifest.id != scope and component.id != scope
and instance.manifest.id not in scopeInstance.resolved and component.id not in scopeInstance.resolved[target.id].resolved
): ):
continue continue
if instance.enabled: if component.resolved[target.id].enabled:
fillcolor = "lightgrey" if instance.isLib() else "lightblue" fillcolor = "lightgrey" if component.type == model.Kind.LIB else "lightblue"
shape = "plaintext" if not scope == instance.manifest.id else "box" shape = "plaintext" if not scope == component.id else "box"
g.node( g.node(
instance.manifest.id, component.id,
f"<<B>{instance.manifest.id}</B><BR/>{vt100.wordwrap(instance.manifest.decription, 40,newline='<BR/>')}>", f"<<B>{component.id}</B><BR/>{vt100.wordwrap(component.decription, 40,newline='<BR/>')}>",
shape=shape, shape=shape,
style="filled", style="filled",
fillcolor=fillcolor, fillcolor=fillcolor,
) )
for req in instance.manifest.requires: for req in component.requires:
g.edge(instance.manifest.id, req) g.edge(component.id, req)
for req in instance.manifest.provides: for req in component.provides:
isChosen = context.target.routing.get(req, None) == instance.manifest.id isChosen = target.routing.get(req, None) == component.id
g.edge( g.edge(
req, req,
instance.manifest.id, component.id,
arrowhead="none", arrowhead="none",
color=("blue" if isChosen else "black"), color=("blue" if isChosen else "black"),
) )
elif showDisabled: elif showDisabled:
g.node( g.node(
instance.manifest.id, component.id,
f"<<B>{instance.manifest.id}</B><BR/>{vt100.wordwrap(instance.manifest.decription, 40,newline='<BR/>')}<BR/><BR/><I>{vt100.wordwrap(instance.disableReason, 40,newline='<BR/>')}</I>>", f"<<B>{component.id}</B><BR/>{vt100.wordwrap(component.decription, 40,newline='<BR/>')}<BR/><BR/><I>{vt100.wordwrap(str(component.resolved[target.id].reason), 40,newline='<BR/>')}</I>>",
shape="plaintext", shape="plaintext",
style="filled", style="filled",
fontcolor="#999999", fontcolor="#999999",
fillcolor="#eeeeee", fillcolor="#eeeeee",
) )
for req in instance.manifest.requires: for req in component.requires:
g.edge(instance.manifest.id, req, color="#aaaaaa") g.edge(component.id, req, color="#aaaaaa")
for req in instance.manifest.provides: for req in component.provides:
g.edge(req, instance.manifest.id, arrowhead="none", color="#aaaaaa") g.edge(req, component.id, arrowhead="none", color="#aaaaaa")
g.view(filename=os.path.join(context.builddir(), "graph.gv")) g.view(filename=os.path.join(target.builddir, "graph.gv"))
@cli.command("g", "graph", "Show the dependency graph") @cli.command("g", "graph", "Show the dependency graph")
def graphCmd(args: cli.Args): def graphCmd(args: cli.Args):
model.Project.chdir() registry = model.Registry.use(args)
target = model.Target.use(args)
targetSpec = str(args.consumeOpt("target", "host-" + shell.uname().machine)) scope = cast(Optional[str], args.tryConsumeOpt("scope"))
onlyLibs = args.consumeOpt("only-libs", False) is True
showDisabled = args.consumeOpt("show-disabled", False) is True
scope: str | None = cast(str | None, args.tryConsumeOpt("scope")) view(registry, target, scope=scope, showExe=not onlyLibs, showDisabled=showDisabled)
onlyLibs: bool = args.consumeOpt("only-libs", False) is True
showDisabled: bool = args.consumeOpt("show-disabled", False) is True
ctx = context.contextFor(targetSpec)
view(ctx, scope=scope, showExe=not onlyLibs, showDisabled=showDisabled)

View file

@ -3,12 +3,14 @@ import logging
from enum import Enum from enum import Enum
from typing import Any, Type, cast from typing import Any, Generator, Optional, Type, cast
from pathlib import Path from pathlib import Path
from dataclasses_json import DataClassJsonMixin, config from dataclasses_json import DataClassJsonMixin
from dataclasses import dataclass, field from dataclasses import dataclass, field
from . import jexpr, compat, utils from cutekit import const, shell
from . import jexpr, compat, utils, cli, vt100
_logger = logging.getLogger(__name__) _logger = logging.getLogger(__name__)
@ -23,6 +25,9 @@ class Kind(Enum):
EXE = "exe" EXE = "exe"
# --- Manifest --------------------------------------------------------------- #
@dataclass @dataclass
class Manifest(DataClassJsonMixin): class Manifest(DataClassJsonMixin):
id: str id: str
@ -31,6 +36,9 @@ class Manifest(DataClassJsonMixin):
@staticmethod @staticmethod
def parse(path: Path, data: dict[str, Any]) -> "Manifest": def parse(path: Path, data: dict[str, Any]) -> "Manifest":
"""
Parse a manifest from a given path and data
"""
compat.ensureSupportedManifest(data, path) compat.ensureSupportedManifest(data, path)
kind = Kind(data["type"]) kind = Kind(data["type"])
del data["$schema"] del data["$schema"]
@ -40,12 +48,24 @@ class Manifest(DataClassJsonMixin):
@staticmethod @staticmethod
def load(path: Path) -> "Manifest": def load(path: Path) -> "Manifest":
"""
Load a manifest from a given path
"""
return Manifest.parse(path, jexpr.evalRead(path)) return Manifest.parse(path, jexpr.evalRead(path))
def dirname(self) -> str: def dirname(self) -> str:
"""
Return the directory of the manifest
"""
return os.path.dirname(self.path) return os.path.dirname(self.path)
def subpath(self, path) -> Path:
return Path(self.dirname()) / path
def ensureType(self, t: Type[utils.T]) -> utils.T: def ensureType(self, t: Type[utils.T]) -> utils.T:
"""
Ensure that the manifest is of a given type
"""
if not isinstance(self, t): if not isinstance(self, t):
raise RuntimeError( raise RuntimeError(
f"{self.path} should be a {type.__name__} manifest but is a {self.__class__.__name__} manifest" f"{self.path} should be a {type.__name__} manifest but is a {self.__class__.__name__} manifest"
@ -53,6 +73,11 @@ class Manifest(DataClassJsonMixin):
return cast(utils.T, self) return cast(utils.T, self)
# --- Project ---------------------------------------------------------------- #
_project: Optional["Project"] = None
@dataclass @dataclass
class Extern(DataClassJsonMixin): class Extern(DataClassJsonMixin):
git: str git: str
@ -64,8 +89,16 @@ class Project(Manifest):
description: str = field(default="(No description)") description: str = field(default="(No description)")
extern: dict[str, Extern] = field(default_factory=dict) extern: dict[str, Extern] = field(default_factory=dict)
@property
def externDirs(self) -> list[str]:
res = map(lambda e: os.path.join(const.EXTERN_DIR, e), self.extern.keys())
return list(res)
@staticmethod @staticmethod
def root() -> str | None: def root() -> Optional[str]:
"""
Find the root of the project by looking for a project.json
"""
cwd = Path.cwd() cwd = Path.cwd()
while str(cwd) != cwd.root: while str(cwd) != cwd.root:
if (cwd / "project.json").is_file(): if (cwd / "project.json").is_file():
@ -75,6 +108,9 @@ class Project(Manifest):
@staticmethod @staticmethod
def chdir() -> None: def chdir() -> None:
"""
Change the current working directory to the root of the project
"""
path = Project.root() path = Project.root()
if path is None: if path is None:
raise RuntimeError( raise RuntimeError(
@ -82,6 +118,121 @@ class Project(Manifest):
) )
os.chdir(path) os.chdir(path)
@staticmethod
def at(path: str) -> Optional["Project"]:
path = os.path.join(path, "project.json")
if not os.path.exists(path):
return None
return Manifest.load(Path(path)).ensureType(Project)
@staticmethod
def ensure() -> "Project":
root = Project.root()
if root is None:
raise RuntimeError(
"No project.json found in this directory or any parent directory"
)
os.chdir(root)
return Manifest.load(Path(os.path.join(root, "project.json"))).ensureType(
Project
)
@staticmethod
def fetchs(extern: dict[str, Extern]):
for extSpec, ext in extern.items():
extPath = os.path.join(const.EXTERN_DIR, extSpec)
if os.path.exists(extPath):
print(f"Skipping {extSpec}, already installed")
continue
print(f"Installing {extSpec}-{ext.tag} from {ext.git}...")
shell.popen(
"git",
"clone",
"--quiet",
"--depth",
"1",
"--branch",
ext.tag,
ext.git,
extPath,
)
project = Project.at(extPath)
if project is not None:
Project.fetchs(project.extern)
@staticmethod
def use(args: cli.Args) -> "Project":
global _project
if _project is None:
_project = Project.ensure()
return _project
@cli.command("i", "install", "Install required external packages")
def installCmd(args: cli.Args):
project = Project.use(args)
Project.fetchs(project.extern)
@cli.command("I", "init", "Initialize a new project")
def initCmd(args: cli.Args):
import requests
repo = args.consumeOpt("repo", const.DEFAULT_REPO_TEMPLATES)
list = args.consumeOpt("list")
template = args.consumeArg()
name = args.consumeArg()
_logger.info("Fetching registry...")
r = requests.get(f"https://raw.githubusercontent.com/{repo}/main/registry.json")
if r.status_code != 200:
_logger.error("Failed to fetch registry")
exit(1)
registry = r.json()
if list:
print(
"\n".join(f"* {entry['id']} - {entry['description']}" for entry in registry)
)
return
if not template:
raise RuntimeError("Template not specified")
def template_match(t: jexpr.Json) -> str:
return t["id"] == template
if not any(filter(template_match, registry)):
raise LookupError(f"Couldn't find a template named {template}")
if not name:
_logger.info(f"No name was provided, defaulting to {template}")
name = template
if os.path.exists(name):
raise RuntimeError(f"Directory {name} already exists")
print(f"Creating project {name} from template {template}...")
shell.cloneDir(f"https://github.com/{repo}", template, name)
print(f"Project {name} created\n")
print("We suggest that you begin by typing:")
print(f" {vt100.GREEN}cd {name}{vt100.RESET}")
print(
f" {vt100.GREEN}cutekit install{vt100.BRIGHT_BLACK} # Install external packages{vt100.RESET}"
)
print(
f" {vt100.GREEN}cutekit build{vt100.BRIGHT_BLACK} # Build the project{vt100.RESET}"
)
# --- Target ----------------------------------------------------------------- #
@dataclass @dataclass
class Tool(DataClassJsonMixin): class Tool(DataClassJsonMixin):
@ -99,30 +250,42 @@ class Target(Manifest):
tools: Tools = field(default_factory=dict) tools: Tools = field(default_factory=dict)
routing: dict[str, str] = field(default_factory=dict) routing: dict[str, str] = field(default_factory=dict)
@property
def hashid(self) -> str:
return utils.hash((self.props, [v.to_dict() for k, v in self.tools.items()]))
@property
def builddir(self) -> str:
return os.path.join(const.BUILD_DIR, f"{self.id}-{self.hashid[:8]}")
@staticmethod
def use(args: cli.Args) -> "Target":
registry = Registry.use(args)
targetSpec = str(args.consumeOpt("target", "host-" + shell.uname().machine))
return registry.ensure(targetSpec, Target)
def route(self, componentSpec: str): def route(self, componentSpec: str):
"""
Route a component spec to a target specific component spec
"""
return ( return (
self.routing[componentSpec] self.routing[componentSpec]
if componentSpec in self.routing if componentSpec in self.routing
else componentSpec else componentSpec
) )
def cdefs(self) -> list[str]:
defines: list[str] = []
def sanatize(s: str) -> str: # --- Component -------------------------------------------------------------- #
return s.lower().replace(" ", "_").replace("-", "_").replace(".", "_")
for key in self.props:
prop = self.props[key]
propStr = str(prop)
if isinstance(prop, bool):
if prop:
defines += [f"-D__ck_{sanatize(key)}__"]
else:
defines += [f"-D__ck_{sanatize(key)}_{sanatize(propStr)}__"]
defines += [f"-D__ck_{sanatize(key)}_value={propStr}"]
return defines @dataclass
class Resolved:
reason: Optional[str] = None
resolved: list[str] = field(default_factory=list)
@property
def enabled(self) -> bool:
return self.reason is None
@dataclass @dataclass
@ -134,6 +297,8 @@ class Component(Manifest):
requires: list[str] = field(default_factory=list) requires: list[str] = field(default_factory=list)
provides: list[str] = field(default_factory=list) provides: list[str] = field(default_factory=list)
subdirs: list[str] = field(default_factory=list) subdirs: list[str] = field(default_factory=list)
injects: list[str] = field(default_factory=list)
resolved: dict[str, Resolved] = field(default_factory=dict)
def isEnabled(self, target: Target) -> tuple[bool, str]: def isEnabled(self, target: Target) -> tuple[bool, str]:
for k, v in self.enableIf.items(): for k, v in self.enableIf.items():
@ -160,3 +325,290 @@ KINDS: dict[Kind, Type[Manifest]] = {
Kind.LIB: Component, Kind.LIB: Component,
Kind.EXE: Component, Kind.EXE: Component,
} }
# --- Dependency resolution -------------------------------------------------- #
@dataclass
class Resolver:
_registry: "Registry"
_target: Target
_mappings: dict[str, list[Component]] = field(default_factory=dict)
_cache: dict[str, Resolved] = field(default_factory=dict)
_baked = False
def _bake(self):
"""
Bake the resolver by building a mapping of all
components that provide a given spec.
"""
if self._baked:
return
for c in self._registry.iter(Component):
for p in c.provides + [c.id]:
if p not in self._mappings and [0]:
self._mappings[p] = []
self._mappings[p].append(c)
# Overide with target routing since it has priority
# over component provides and id
for k, v in self._target.routing.items():
component = self._registry.lookup(v, Component)
self._mappings[k] = [component] if component else []
self._baked = True
def _provider(self, spec: str) -> tuple[Optional[str], str]:
"""
Returns the provider for a given spec.
"""
result = self._mappings.get(spec, [])
if len(result) == 1:
enabled, reason = result[0].isEnabled(self._target)
if not enabled:
return (None, reason)
def checkIsEnabled(c: Component) -> bool:
enabled, reason = c.isEnabled(self._target)
if not enabled:
_logger.info(f"Component {c.id} cannot provide '{spec}': {reason}")
return enabled
result = list(filter(checkIsEnabled, result))
if result == []:
return (None, f"No provider for '{spec}'")
if len(result) > 1:
ids = list(map(lambda x: x.id, result))
return (None, f"Multiple providers for '{spec}': {','.join(ids)}")
return (result[0].id, "")
def resolve(self, what: str, stack: list[str] = []) -> Resolved:
"""
Resolve a given spec to a list of components.
"""
self._bake()
if what in self._cache:
return self._cache[what]
keep, unresolvedReason = self._provider(what)
if not keep:
_logger.error(f"Dependency '{what}' not found: {unresolvedReason}")
self._cache[what] = Resolved(reason=unresolvedReason)
return self._cache[what]
if keep in self._cache:
return self._cache[keep]
if keep in stack:
raise RuntimeError(
f"Dependency loop while resolving '{what}': {stack} -> {keep}"
)
stack.append(keep)
component = self._registry.lookup(keep, Component)
if not component:
return Resolved(reason="No provider for 'myembed'")
result: list[str] = []
for req in component.requires:
reqResolved = self.resolve(req, stack)
if reqResolved.reason:
stack.pop()
self._cache[keep] = Resolved(reason=reqResolved.reason)
return self._cache[keep]
result.extend(reqResolved.resolved)
stack.pop()
result.insert(0, keep)
self._cache[keep] = Resolved(resolved=utils.uniq(result))
return self._cache[keep]
# --- Registry --------------------------------------------------------------- #
_registry: Optional["Registry"] = None
@dataclass
class Registry(DataClassJsonMixin):
project: Project
manifests: dict[str, Manifest] = field(default_factory=dict)
def _append(self, m: Manifest):
"""
Append a manifest to the model
"""
if m.id in self.manifests:
raise RuntimeError(
f"Duplicated manifest '{m.id}' at '{m.path}' already loaded from '{self.manifests[m.id].path}'"
)
self.manifests[m.id] = m
def iter(self, type: Type[utils.T]) -> Generator[utils.T, None, None]:
"""
Iterate over all manifests of a given type
"""
for m in self.manifests.values():
if isinstance(m, type):
yield m
def iterEnabled(self, target: Target) -> Generator[Component, None, None]:
for c in self.iter(Component):
resolve = c.resolved[target.id]
if resolve.enabled:
yield c
def lookup(self, name: str, type: Type[utils.T]) -> Optional[utils.T]:
"""
Lookup a manifest of a given type by name
"""
if name in self.manifests:
m = self.manifests[name]
if isinstance(m, type):
return m
return None
def ensure(self, name: str, type: Type[utils.T]) -> utils.T:
"""
Ensure that a manifest of a given type exists
and return it.
"""
m = self.lookup(name, type)
if not m:
raise RuntimeError(f"Could not find {type.__name__} '{name}'")
return m
@staticmethod
def use(args: cli.Args) -> "Registry":
global _registry
if _registry is not None:
return _registry
project = Project.use(args)
mixins = str(args.consumeOpt("mixins", "")).split(",")
if mixins == [""]:
mixins = []
props = cast(dict[str, str], args.consumePrefix("prop:"))
_registry = Registry.load(project, mixins, props)
return _registry
@staticmethod
def load(project: Project, mixins: list[str], props: Props) -> "Registry":
registry = Registry(project)
registry._append(project)
# Lookup and load all extern projects
for externDir in project.externDirs:
projectPath = os.path.join(externDir, "project.json")
manifestPath = os.path.join(externDir, "manifest.json")
if os.path.exists(projectPath):
registry._append(Manifest.load(Path(projectPath)).ensureType(Project))
elif os.path.exists(manifestPath):
# For simple library allow to have a manifest.json instead of a project.json
registry._append(
Manifest.load(Path(manifestPath)).ensureType(Component)
)
else:
_logger.warn(
"Extern project does not have a project.json or manifest.json"
)
# Load all manifests from projects
for project in list(registry.iter(Project)):
targetDir = os.path.join(project.dirname(), const.TARGETS_DIR)
targetFiles = shell.find(targetDir, ["*.json"])
for targetFile in targetFiles:
registry._append(Manifest.load(Path(targetFile)).ensureType(Target))
componentDir = os.path.join(project.dirname(), const.SRC_DIR)
rootComponent = os.path.join(project.dirname(), "manifest.json")
componentFiles = shell.find(componentDir, ["manifest.json"])
if os.path.exists(rootComponent):
componentFiles += [rootComponent]
for componentFile in componentFiles:
registry._append(
Manifest.load(Path(componentFile)).ensureType(Component)
)
# Resolve all dependencies for all targets
for target in registry.iter(Target):
target.props |= props
resolver = Resolver(registry, target)
# Apply injects
for c in registry.iter(Component):
if c.isEnabled(target)[0]:
for inject in c.injects:
victim = registry.lookup(inject, Component)
if not victim:
raise RuntimeError(f"Cannot find component '{inject}'")
victim.requires += [c.id]
# Resolve all components
for c in registry.iter(Component):
resolved = resolver.resolve(c.id)
if resolved.reason:
_logger.info(f"Component '{c.id}' disabled: {resolved.reason}")
c.resolved[target.id] = resolved
# Resolve tooling
tools: Tools = target.tools
from . import mixins as mxs
for mix in mixins:
mixin = mxs.byId(mix)
tools = mixin(target, tools)
# Apply tooling from components
for c in registry.iter(Component):
if c.resolved[target.id].enabled:
for k, v in c.tools.items():
tools[k].args += v.args
return registry
@cli.command("l", "list", "List all components and targets")
def listCmd(args: cli.Args):
registry = Registry.use(args)
components = list(registry.iter(Component))
targets = list(registry.iter(Target))
vt100.title("Components")
if len(components) == 0:
print(vt100.p("(No components available)"))
else:
print(vt100.p(", ".join(map(lambda m: m.id, components))))
print()
vt100.title("Targets")
if len(targets) == 0:
print(vt100.p("(No targets available)"))
else:
print(vt100.p(", ".join(map(lambda m: m.id, targets))))
print()

View file

@ -1,7 +1,7 @@
import os import os
import logging import logging
from . import shell, model, const, context from . import shell, model, const
import importlib.util as importlib import importlib.util as importlib
@ -23,19 +23,19 @@ def load(path: str):
def loadAll(): def loadAll():
_logger.info("Loading plugins...") _logger.info("Loading plugins...")
projectRoot = model.Project.root() root = model.Project.root()
if projectRoot is None: if root is None:
_logger.info("Not in project, skipping plugin loading") _logger.info("Not in project, skipping plugin loading")
return return
pj = context.loadProject(projectRoot) project = model.Project.at(root)
paths = list(map(lambda e: os.path.join(const.EXTERN_DIR, e), pj.extern.keys())) + [ paths = list(
"." map(lambda e: os.path.join(const.EXTERN_DIR, e), project.extern.keys())
] ) + ["."]
for dirname in paths: for dirname in paths:
pluginDir = os.path.join(projectRoot, dirname, const.META_DIR, "plugins") pluginDir = os.path.join(root, dirname, const.META_DIR, "plugins")
for files in shell.readdir(pluginDir): for files in shell.readdir(pluginDir):
if files.endswith(".py"): if files.endswith(".py"):

View file

@ -9,7 +9,15 @@ class Rule:
args: list[str] args: list[str]
deps: Optional[str] = None deps: Optional[str] = None
def __init__(self, id: str, fileIn: list[str], fileOut: list[str], rule: str, args: list[str] = [], deps: Optional[str] = None): def __init__(
self,
id: str,
fileIn: list[str],
fileOut: list[str],
rule: str,
args: list[str] = [],
deps: Optional[str] = None,
):
self.id = id self.id = id
self.fileIn = fileIn self.fileIn = fileIn
self.fileOut = fileOut self.fileOut = fileOut
@ -19,16 +27,22 @@ class Rule:
rules: dict[str, Rule] = { rules: dict[str, Rule] = {
"cc": Rule("cc", ["*.c"], ["*.o"], "-c -o $out $in -MD -MF $out.d $flags $cincs $cdefs", ["-std=gnu2x", "cc": Rule(
"-Wall", "cc",
"-Wextra", ["*.c"],
"-Werror"], "$out.d"), ["*.o"],
"cxx": Rule("cxx", ["*.cpp", "*.cc", "*.cxx"], ["*.o"], "-c -o $out $in -MD -MF $out.d $flags $cincs $cdefs", ["-std=gnu++2b", "-c -o $out $in -MD -MF $out.d $flags $cincs $cdefs",
"-Wall", ["-std=gnu2x", "-Wall", "-Wextra", "-Werror"],
"-Wextra", "$out.d",
"-Werror", ),
"-fno-exceptions", "cxx": Rule(
"-fno-rtti"], "$out.d"), "cxx",
["*.cpp", "*.cc", "*.cxx"],
["*.o"],
"-c -o $out $in -MD -MF $out.d $flags $cincs $cdefs",
["-std=gnu++2b", "-Wall", "-Wextra", "-Werror", "-fno-exceptions", "-fno-rtti"],
"$out.d",
),
"as": Rule("as", ["*.s", "*.asm", "*.S"], ["*.o"], "-o $out $in $flags"), "as": Rule("as", ["*.s", "*.asm", "*.S"], ["*.o"], "-o $out $in $flags"),
"ar": Rule("ar", ["*.o"], ["*.a"], "$flags $out $in"), "ar": Rule("ar", ["*.o"], ["*.a"], "$flags $out $in"),
"ld": Rule("ld", ["*.o", "*.a"], ["*.out"], "-o $out $in $flags"), "ld": Rule("ld", ["*.o", "*.a"], ["*.out"], "-o $out $in $flags"),

View file

@ -270,7 +270,7 @@ def latest(cmd: str) -> str:
return chosen return chosen
def which(cmd: str) -> str | None: def which(cmd: str) -> Optional[str]:
""" """
Find the path of a command Find the path of a command
""" """

View file

@ -28,10 +28,6 @@ CROSSED = "\033[9m"
RESET = "\033[0m" RESET = "\033[0m"
def title(text: str):
print(f"{BOLD}{text}{RESET}:")
def wordwrap(text: str, width: int = 60, newline: str = "\n") -> str: def wordwrap(text: str, width: int = 60, newline: str = "\n") -> str:
result = "" result = ""
curr = 0 curr = 0
@ -49,3 +45,11 @@ def wordwrap(text: str, width: int = 60, newline: str = "\n") -> str:
def indent(text: str, indent: int = 4) -> str: def indent(text: str, indent: int = 4) -> str:
return " " * indent + text.replace("\n", "\n" + " " * indent) return " " * indent + text.replace("\n", "\n" + " " * indent)
def title(text: str):
print(f"{BOLD}{text}{RESET}:")
def p(text: str):
return indent(wordwrap(text))

90
tests/test_resolver.py Normal file
View file

@ -0,0 +1,90 @@
from cutekit import model
def test_direct_deps():
r = model.Registry("")
r._append(model.Component("myapp", requires=["mylib"]))
r._append(model.Component("mylib"))
t = model.Target("host")
res = model.Resolver(r, t)
resolved = res.resolve("myapp")
assert resolved.reason is None
assert resolved.resolved == ["myapp", "mylib"]
def test_indirect_deps():
r = model.Registry("")
r._append(model.Component("myapp", requires=["mylib"]))
r._append(model.Component("mylib", requires=["myembed"]))
r._append(model.Component("myimpl", provides=["myembed"]))
t = model.Target("host")
res = model.Resolver(r, t)
assert res.resolve("myapp").resolved == ["myapp", "mylib", "myimpl"]
def test_deps_routing():
r = model.Registry("")
r._append(model.Component("myapp", requires=["mylib"]))
r._append(model.Component("mylib", requires=["myembed"]))
r._append(model.Component("myimplA", provides=["myembed"]))
r._append(model.Component("myimplB", provides=["myembed"]))
t = model.Target("host", routing={"myembed": "myimplB"})
res = model.Resolver(r, t)
assert res.resolve("myapp").resolved == ["myapp", "mylib", "myimplB"]
t = model.Target("host", routing={"myembed": "myimplA"})
res = model.Resolver(r, t)
assert res.resolve("myapp").resolved == ["myapp", "mylib", "myimplA"]
t = model.Target("host", routing={"myembed": "myimplC"})
res = model.Resolver(r, t)
assert res.resolve("myapp").reason == "No provider for 'myembed'"
def test_deps_routing_with_props():
r = model.Registry("")
r._append(model.Component("myapp", requires=["mylib"]))
r._append(model.Component("mylib", requires=["myembed"]))
r._append(
model.Component("myimplA", provides=["myembed"], enableIf={"myprop": ["a"]})
)
r._append(
model.Component("myimplB", provides=["myembed"], enableIf={"myprop": ["b"]})
)
t = model.Target("host", routing={"myembed": "myimplB"}, props={"myprop": "b"})
res = model.Resolver(r, t)
assert res.resolve("myapp").resolved == ["myapp", "mylib", "myimplB"]
t = model.Target("host", routing={"myembed": "myimplA"}, props={"myprop": "a"})
res = model.Resolver(r, t)
assert res.resolve("myapp").resolved == ["myapp", "mylib", "myimplA"]
t = model.Target("host", routing={"myembed": "myimplC"}, props={"myprop": "c"})
res = model.Resolver(r, t)
resolved = res.resolve("myapp")
assert resolved.reason == "No provider for 'myembed'"
def test_deps_routing_with_props_and_requires():
r = model.Registry("")
r._append(model.Component("myapp", requires=["mylib"]))
r._append(model.Component("mylib", requires=["myembed"]))
r._append(
model.Component("myimplA", provides=["myembed"], enableIf={"myprop": ["a"]})
)
r._append(
model.Component("myimplB", provides=["myembed"], enableIf={"myprop": ["b"]})
)
t = model.Target("host", routing={"myembed": "myimplB"}, props={"myprop": "b"})
res = model.Resolver(r, t)
assert res.resolve("myapp").resolved == ["myapp", "mylib", "myimplB"]
t = model.Target("host", routing={"myembed": "myimplA"}, props={"myprop": "a"})
res = model.Resolver(r, t)
assert res.resolve("myapp").resolved == ["myapp", "mylib", "myimplA"]
t = model.Target("host", routing={"myembed": "myimplC"}, props={"myprop": "c"})
res = model.Resolver(r, t)
assert res.resolve("myapp").reason == "No provider for 'myembed'"