feat: new argument parser

This commit is contained in:
Jordan ⌨️ 2024-01-19 10:50:37 +01:00
parent 16225f50d4
commit d0da609ba9
7 changed files with 315 additions and 173 deletions

View file

@ -55,13 +55,13 @@ def setupLogger(verbose: bool):
def main() -> int:
try:
shell.mkdir(const.GLOBAL_CK_DIR)
args = cli.parse(sys.argv[1:])
setupLogger(args.consumeOpt("verbose", False) is True)
args = cli.parse(sys.argv[1:], cli.CutekitArgs)
setupLogger(args.verbose)
const.setup()
plugins.setup(args)
pods.setup(args)
cli.exec(args)
pods.setup(args, sys.argv[1:])
cli.exec(args.cmd, sys.argv[1:])
return 0
except RuntimeError as e:

View file

@ -3,7 +3,7 @@ import logging
import dataclasses as dt
from pathlib import Path
from typing import Callable, Literal, TextIO, Union
from typing import Callable, Literal, TextIO, Union, Any
from . import shell, rules, model, ninja, const, cli
@ -15,7 +15,7 @@ class Scope:
registry: model.Registry
@staticmethod
def use(args: cli.Args, props: model.Props = {}) -> "Scope":
def use(args: Any, props: model.Props = {}) -> "Scope":
registry = model.Registry.use(args, props)
return Scope(registry)
@ -32,7 +32,7 @@ class TargetScope(Scope):
target: model.Target
@staticmethod
def use(args: cli.Args, props: model.Props = {}) -> "TargetScope":
def use(args: Any, props: model.Props = {}) -> "TargetScope":
registry = model.Registry.use(args, props)
target = model.Target.use(args, props)
return TargetScope(registry, target)
@ -336,12 +336,12 @@ def build(
@cli.command("b", "builder", "Build/Run/Clean a component or all components")
def _(args: cli.Args):
def _():
pass
@cli.command("b", "builder/build", "Build a component or all components")
def _(args: cli.Args):
def _(args: Any):
scope = TargetScope.use(args)
componentSpec = args.consumeArg()
component = None
@ -350,20 +350,25 @@ def _(args: cli.Args):
build(scope, component if component is not None else "all")[0]
class RunArgs:
debug: cli.Arg[bool] = cli.Arg("d", "debug", "Enable debug mode", default=False)
profile: cli.Arg[bool] = cli.Arg("p", "profile", "Enable profiling", default=False)
wait: cli.Arg[bool] = cli.Arg("w", "wait", "Wait for debugger to attach", default=False)
debugger: cli.Arg[str] = cli.Arg("g", "debugger", "Debugger to use", default="lldb")
mixins: cli.Arg[str] = cli.Arg("m", "mixins", "Mixins to apply", default="")
componentSpec: cli.FreeFormArg[str] = cli.FreeFormArg("Component to run", default="__main__")
extra: cli.RawArg
@cli.command("r", "builder/run", "Run a component")
def runCmd(args: cli.Args):
debug = args.consumeOpt("debug", False) is True
profile = args.consumeOpt("profile", False) is True
wait = args.consumeOpt("wait", False) is True
debugger = args.consumeOpt("debugger", "lldb")
componentSpec = args.consumeArg() or "__main__"
scope = TargetScope.use(args, {"debug": debug})
def runCmd(args: RunArgs):
scope = TargetScope.use(args, {"debug": args.debug})
component = scope.registry.lookup(
componentSpec, model.Component, includeProvides=True
args.componentSpec, model.Component, includeProvides=True
)
if component is None:
raise RuntimeError(f"Component {componentSpec} not found")
raise RuntimeError(f"Component {args.componentSpec} not found")
product = build(scope, component)[0]
@ -374,39 +379,40 @@ def runCmd(args: cli.Args):
try:
command = [str(product.path), *args.extra]
if debug:
shell.debug(command, debugger=debugger, wait=wait)
elif profile:
if args.debug:
shell.debug(command, debugger=args.debugger, wait=args.wait)
elif args.profile:
shell.profile(command)
else:
shell.exec(*command)
except Exception as e:
cli.error(e)
cli.error(str(e))
@cli.command("t", "builder/test", "Run all test targets")
def _(args: cli.Args):
def _(args: RunArgs):
# This is just a wrapper around the `run` command that try
# to run a special hook component named __tests__.
args.args.insert(0, "__tests__")
args.componentSpec = "__tests__"
runCmd(args)
@cli.command("d", "builder/debug", "Debug a component")
def _(args: cli.Args):
def _(args: RunArgs):
# This is just a wrapper around the `run` command that
# always enable debug mode.
args.opts["debug"] = True
args.debug = True
runCmd(args)
@cli.command("c", "builder/clean", "Clean build files")
def _(args: cli.Args):
model.Project.use(args)
def _():
model.Project.use()
shell.rmrf(const.BUILD_DIR)
@cli.command("n", "builder/nuke", "Clean all build files and caches")
def _(args: cli.Args):
model.Project.use(args)
def _():
model.Project.use()
shell.rmrf(const.PROJECT_CK_DIR)

View file

@ -1,81 +1,189 @@
import enum
import inspect
import logging
import sys
import dataclasses as dt
from functools import partial
from pathlib import Path
from typing import Optional, Union, Callable
from typing import Any, NewType, Optional, Union, Callable, Generic, get_origin, get_args
from . import const, vt100
from . import const, vt100, utils
Value = Union[str, bool, int]
_logger = logging.getLogger(__name__)
class Args:
opts: dict[str, Value]
args: list[str]
extra: list[str]
def __init__(self):
self.opts = {}
self.args = []
self.extra = []
def consumePrefix(self, prefix: str) -> dict[str, Value]:
result: dict[str, Value] = {}
copy = self.opts.copy()
for key, value in copy.items():
if key.startswith(prefix):
result[key[len(prefix) :]] = value
del self.opts[key]
return result
def consumeOpt(self, key: str, default: Value = False) -> Value:
if key in self.opts:
result = self.opts[key]
del self.opts[key]
return result
return default
def tryConsumeOpt(self, key: str) -> Optional[Value]:
if key in self.opts:
result = self.opts[key]
del self.opts[key]
return result
return None
def consumeArg(self, default: Optional[str] = None) -> Optional[str]:
if len(self.args) == 0:
return default
first = self.args[0]
del self.args[0]
return first
# --- Arg parsing -------------------------------------------------------------
def parse(args: list[str]) -> Args:
result = Args()
@dt.dataclass
class Arg(Generic[utils.T]):
shortName: str
longName: str
description: str
default: Optional[utils.T] = None
for i in range(len(args)):
arg = args[i]
if arg.startswith("--") and not arg == "--":
if "=" in arg:
key, value = arg[2:].split("=", 1)
result.opts[key] = value
else:
result.opts[arg[2:]] = True
elif arg == "--":
result.extra += args[i + 1 :]
break
def __get__(self, instance, owner):
if instance is None:
return self
return instance.__dict__.get(self.longName, self.default)
@dt.dataclass
class FreeFormArg(Generic[utils.T]):
description: str
default: Optional[utils.T] = None
def __get__(self, instance, owner):
if instance is None:
return self
return self.default
class ParserState(enum.Enum):
FreeForm = enum.auto()
ShortArg = enum.auto()
RawArg = NewType("RawArg", str)
class CutekitArgs:
cmd: FreeFormArg[str] = FreeFormArg("Command to execute")
verbose: Arg[bool] = Arg("v", "verbose", "Enable verbose logging")
safemode: Arg[bool] = Arg("s", "safe", "Enable safe mode")
pod: Arg[bool] = Arg("p", "enable-pod", "Enable pod", default=False)
podName: Arg[str] = Arg("n", "pod-name", "The name of the pod", default="")
def parse(argv: list[str], argType: type) -> Any:
def set_value(options: dict[str, Any], name: str, value: Any):
if name is not options:
options[name] = value
else:
result.args.append(arg)
raise RuntimeError(f"{name} is already set")
def is_optional(t: type) -> bool:
return get_origin(t) is Union and type(None) in get_args(t)
def freeforms_get(argType: type) -> tuple[list[str], list[str]]:
freeforms = []
required_freeforms = []
found_optional = False
for arg, anno in [
arg
for arg in argType.__annotations__.items()
if get_origin(arg[1]) is FreeFormArg
]:
freeforms.append(arg)
if is_optional(get_args(anno)[0]):
found_optional = True
elif found_optional:
raise RuntimeError(
f"Required arguments must come before optional arguments"
)
else:
required_freeforms.append(arg)
return (freeforms, required_freeforms)
result = argType()
options: dict[str, Any] = {}
args: dict[str, partial] = {}
freeforms: list[Any] = []
state = ParserState.FreeForm
current_arg: Optional[str] = None
for arg in dir(argType):
if isinstance(getattr(argType, arg), Arg):
args[getattr(argType, arg).shortName] = partial(set_value, options, arg)
args[getattr(argType, arg).longName] = partial(set_value, options, arg)
i = 0
while i < len(argv):
match state:
case ParserState.FreeForm:
if argv[i] == "--":
freeargs = argv[i + 1:]
i += 1
break
if argv[i].startswith("--"):
if "=" in argv[i]:
# --name=value
name, value = argv[i][2:].split("=", 1)
if name in args:
args[name](value)
else:
# --name -> the value will be True
if argv[i][2:] in args:
args[argv[i][2:]](True)
elif argv[i].startswith("-"):
if len(argv[i][1:]) > 1:
for c in argv[i][1:]:
# -abc -> a, b, c are all True
if c in args:
args[c](True)
else:
state = ParserState.ShortArg
current_arg = argv[i][1:]
else:
freeforms.append(argv[i])
i += 1
case ParserState.ShortArg:
if argv[i].startswith("-"):
# -a -b 4 -> a is True
if current_arg in args:
args[current_arg](True)
else:
# -a 4 -> a is 4
if current_arg in args:
args[current_arg](argv[i])
i += 1
current_arg = None
state = ParserState.FreeForm
freeforms_all, required_freeforms = freeforms_get(argType)
if len(freeforms) < len(required_freeforms):
raise RuntimeError(
f"Missing arguments: {', '.join(required_freeforms[len(freeforms):])}"
)
if len(freeforms) > len(freeforms_all):
raise RuntimeError(f"Too many arguments")
for i, freeform in enumerate(freeforms):
setattr(result, freeforms_all[i], freeform)
# missing arguments
missing = set(
[
arg[0]
for arg in argType.__annotations__
if get_origin(arg[1]) is Arg and getattr(argType, arg[0]).default is None
]
) - set(options.keys())
if missing:
raise RuntimeError(f"Missing arguments: {', '.join(missing)}")
for key, value in options.items():
field_type = get_args(argType.__annotations__[key])[0]
setattr(result, key, field_type(value))
raw_args = [arg[0] for arg in argType.__annotations__.items() if arg[1] is RawArg]
if len(raw_args) > 1:
raise RuntimeError(f"Only one RawArg is allowed")
elif len(raw_args) == 1:
setattr(result, raw_args[0], freeargs)
return result
Callback = Callable[[Args], None]
Callback = Callable[[Any], None] | Callable[[], None]
@dt.dataclass
@ -85,6 +193,7 @@ class Command:
helpText: str
isPlugin: bool
callback: Callback
argType: Optional[type]
subcommands: dict[str, "Command"] = dt.field(default_factory=dict)
@ -96,8 +205,13 @@ def command(shortName: Optional[str], longName: str, helpText: str):
curframe = inspect.currentframe()
calframe = inspect.getouterframes(curframe, 2)
def wrap(fn: Callable[[Args], None]):
def wrap(fn: Callback):
_logger.debug(f"Registering command {longName}")
if len(fn.__annotations__) == 0:
argType = None
else:
argType = list(fn.__annotations__.values())[0]
path = longName.split("/")
parent = commands
for p in path[:-1]:
@ -108,6 +222,7 @@ def command(shortName: Optional[str], longName: str, helpText: str):
helpText,
Path(calframe[1].filename).parent != Path(__file__).parent,
fn,
argType
)
return fn
@ -119,7 +234,7 @@ def command(shortName: Optional[str], longName: str, helpText: str):
@command("u", "usage", "Show usage information")
def usage(args: Optional[Args] = None):
def usage():
print(f"Usage: {const.ARGV0} <command> [args...]")
@ -150,7 +265,7 @@ def ask(msg: str, default: Optional[bool] = None) -> bool:
@command("h", "help", "Show this help message")
def helpCmd(args: Args):
def helpCmd():
usage()
print()
@ -195,23 +310,21 @@ def helpCmd(args: Args):
@command("v", "version", "Show current version")
def versionCmd(args: Args):
def versionCmd():
print(f"CuteKit v{const.VERSION_STR}")
def exec(args: Args, cmds=commands):
cmd = args.consumeArg()
if cmd is None:
raise RuntimeError("No command specified")
def exec(cmd: str, args: list[str], cmds: dict[str, Command]=commands):
for c in cmds.values():
if c.shortName == cmd or c.longName == cmd:
if len(c.subcommands) > 0:
exec(args, c.subcommands)
exec(args[0], args[1:], c.subcommands)
return
else:
c.callback(args)
if c.argType is not None:
c.callback(parse(args[1:], c.argType)) # type: ignore
else:
c.callback() # type: ignore
return
raise RuntimeError(f"Unknown command {cmd}")

View file

@ -83,13 +83,15 @@ def view(
g.view(filename=os.path.join(target.builddir, "graph.gv"))
class GraphCmd:
mixins: cli.Arg[str] = cli.Arg("m", "mixins", "Mixins to apply", default="")
scope: cli.Arg[str] = cli.Arg("s", "scope", "Scope to show", default="")
onlyLibs: cli.Arg[bool] = cli.Arg("l", "only-libs", "Only show libraries", default=False)
showDisabled: cli.Arg[bool] = cli.Arg("d", "show-disabled", "Show disabled components", default=False)
@cli.command("g", "graph", "Show the dependency graph")
def _(args: cli.Args):
def _(args: GraphCmd):
registry = model.Registry.use(args)
target = model.Target.use(args)
scope = cast(Optional[str], args.tryConsumeOpt("scope"))
onlyLibs = args.consumeOpt("only-libs", False) is True
showDisabled = args.consumeOpt("show-disabled", False) is True
view(registry, target, scope=scope, showExe=not onlyLibs, showDisabled=showDisabled)
view(registry, target, scope=args.scope, showExe=not args.onlyLibs, showDisabled=args.showDisabled)

View file

@ -171,7 +171,7 @@ class Project(Manifest):
Project.fetchs(project.extern)
@staticmethod
def use(args: cli.Args) -> "Project":
def use() -> "Project":
global _project
if _project is None:
_project = Project.ensure()
@ -179,29 +179,31 @@ class Project(Manifest):
@cli.command("m", "model", "Manage the model")
def _(args: cli.Args):
def _():
pass
@cli.command("i", "model/install", "Install required external packages")
def _(args: cli.Args):
project = Project.use(args)
def _(args: Any):
project = Project.use()
Project.fetchs(project.extern)
class InitArgs:
repo: cli.Arg[str] = cli.Arg("r", "repo", "Repository to use for templates", default=const.DEFAULT_REPO_TEMPLATES)
list: cli.Arg[bool] = cli.Arg("l", "list", "List available templates", default=False)
template: cli.FreeFormArg[str] = cli.FreeFormArg("Template to use")
name: cli.FreeFormArg[Optional[str]] = cli.FreeFormArg("Name of the project")
@cli.command("I", "model/init", "Initialize a new project")
def _(args: cli.Args):
def _(args: InitArgs):
import requests
repo = args.consumeOpt("repo", const.DEFAULT_REPO_TEMPLATES)
list = args.consumeOpt("list")
template = args.consumeArg()
name = args.consumeArg()
_logger.info("Fetching registry...")
r = requests.get(f"https://raw.githubusercontent.com/{repo}/main/registry.json")
r = requests.get(f"https://raw.githubusercontent.com/{args.repo}/main/registry.json")
if r.status_code != 200:
_logger.error("Failed to fetch registry")
@ -209,30 +211,29 @@ def _(args: cli.Args):
registry = r.json()
if list:
if args.list:
print(
"\n".join(f"* {entry['id']} - {entry['description']}" for entry in registry)
)
return
if not template:
raise RuntimeError("Template not specified")
def template_match(t: jexpr.Json) -> str:
return t["id"] == template
return t["id"] == args.template
if not any(filter(template_match, registry)):
raise LookupError(f"Couldn't find a template named {template}")
raise LookupError(f"Couldn't find a template named {args.template}")
if not name:
_logger.info(f"No name was provided, defaulting to {template}")
name = template
if args.name is None:
_logger.info(f"No name was provided, defaulting to {args.template}")
name = args.template
else:
name = args.name
if os.path.exists(name):
raise RuntimeError(f"Directory {name} already exists")
print(f"Creating project {name} from template {template}...")
shell.cloneDir(f"https://github.com/{repo}", template, name)
print(f"Creating project {name} from template {args.template}...")
shell.cloneDir(f"https://github.com/{args.repo}", args.template, name)
print(f"Project {name} created\n")
print("We suggest that you begin by typing:")
@ -287,7 +288,7 @@ class Target(Manifest):
return os.path.join(const.BUILD_DIR, f"{self.id}{postfix}")
@staticmethod
def use(args: cli.Args, props: Props = {}) -> "Target":
def use(args: Any, props: Props = {}) -> "Target":
registry = Registry.use(args, props)
targetSpec = str(args.consumeOpt("target", "host-" + shell.uname().machine))
return registry.ensure(targetSpec, Target)
@ -536,17 +537,25 @@ class Registry(DataClassJsonMixin):
return m
@staticmethod
def use(args: cli.Args, props: Props = {}) -> "Registry":
def use(args: Any, props: Props = {}) -> "Registry":
global _registry
if _registry is not None:
return _registry
project = Project.use(args)
mixins = str(args.consumeOpt("mixins", "")).split(",")
if mixins == [""]:
project = Project.use()
if not hasattr(args, "mixins"):
mixins = []
props |= cast(dict[str, str], args.consumePrefix("prop:"))
else:
if not isinstance(args.mixins, str):
raise RuntimeError("Mixins attribute on provided args is not a string")
else:
mixins = args.mixins.split(",")
if mixins == [""]:
mixins = []
#props |= cast(dict[str, str], args.consumePrefix("prop:"))
_registry = Registry.load(project, mixins, props)
return _registry
@ -638,8 +647,11 @@ class Registry(DataClassJsonMixin):
return r
class ListArgs:
mixins: cli.Arg[str] = cli.Arg("m", "mixins", "Mixins to apply", default="")
@cli.command("l", "model/list", "List all components and targets")
def _(args: cli.Args):
def _(args: ListArgs):
registry = Registry.use(args)
components = list(registry.iter(Component))

View file

@ -51,6 +51,6 @@ def loadAll():
load(os.path.join(pluginDir, files))
def setup(args: cli.Args):
if not bool(args.consumeOpt("safemode", False)):
def setup(args: cli.CutekitArgs):
if not args.safemode:
loadAll()

View file

@ -77,20 +77,17 @@ IMAGES: dict[str, Image] = {
}
def setup(args: cli.Args):
def setup(args: cli.CutekitArgs, argv: list[str]):
"""
Reincarnate cutekit within a docker container, this is
useful for cross-compiling
"""
pod = args.consumeOpt("pod", False)
if not pod:
if not args.pod:
return
if isinstance(pod, str):
pod = pod.strip()
pod = podPrefix + pod
if pod is True:
pod = defaultPodName
assert isinstance(pod, str)
pod = args.podName.strip() or defaultPodName
pod = podPrefix + args.podName
model.Project.ensure()
print(f"Reincarnating into pod '{pod[len(podPrefix) :]}'...")
try:
@ -103,7 +100,7 @@ def setup(args: cli.Args):
pod,
"/tools/cutekit/entrypoint.sh",
"--reincarnated",
*args.args,
*argv,
)
sys.exit(0)
except Exception:
@ -111,7 +108,7 @@ def setup(args: cli.Args):
@cli.command("p", "pod", "Manage pods")
def _(args: cli.Args):
def _():
pass
@ -121,18 +118,24 @@ def tryDecode(data: Optional[bytes], default: str = "") -> str:
return data.decode()
class PodArgs:
name: cli.Arg[str] = cli.Arg("n", "name", "The name of the pod to use", default=defaultPodName)
image = cli.Arg("i", "image", "The image to use", default=defaultPodImage)
@cli.command("c", "pod/create", "Create a new pod")
def _(args: cli.Args):
def _(args: PodArgs):
"""
Create a new development pod with cutekit installed and the current
project mounted at /project
"""
project = model.Project.ensure()
name = str(args.consumeOpt("name", defaultPodName))
if not name.startswith(podPrefix):
name = f"{podPrefix}{name}"
image = IMAGES[str(args.consumeOpt("image", defaultPodImage))]
if not args.name.startswith(podPrefix):
name: str = f"{podPrefix}{name}"
else:
name = args.name
image = IMAGES[args.image]
client = docker.from_env()
try:
@ -173,12 +176,16 @@ def _(args: cli.Args):
print(f"Created pod '{name[len(podPrefix) :]}' from image '{image.image}'")
class KillPodArgs:
name: cli.Arg[str] = cli.Arg("n", "name", "The name of the pod to kill", default=defaultPodName)
@cli.command("k", "pod/kill", "Stop and remove a pod")
def _(args: cli.Args):
def _(args: KillPodArgs):
client = docker.from_env()
name = str(args.consumeOpt("name", defaultPodName))
if not name.startswith(podPrefix):
name = f"{podPrefix}{name}"
if not args.name.startswith(podPrefix):
name: str = f"{podPrefix}{name}"
else:
name = args.name
try:
container = client.containers.get(name)
@ -189,14 +196,8 @@ def _(args: cli.Args):
raise RuntimeError(f"Pod '{name[len(podPrefix):]}' does not exist")
@cli.command("s", "pod/shell", "Open a shell in a pod")
def _(args: cli.Args):
args.args.insert(0, "/bin/bash")
podExecCmd(args)
@cli.command("l", "pod/list", "List all pods")
def _(args: cli.Args):
def _():
client = docker.from_env()
hasPods = False
for container in client.containers.list(all=True):
@ -209,17 +210,25 @@ def _(args: cli.Args):
print(vt100.p("(No pod found)"))
@cli.command("e", "pod/exec", "Execute a command in a pod")
def podExecCmd(args: cli.Args):
name = str(args.consumeOpt("name", defaultPodName))
if not name.startswith(podPrefix):
name = f"{podPrefix}{name}"
class PodExecArgs:
name: cli.Arg[str] = cli.Arg("n", "name", "The name of the pod to use", default=defaultPodName)
cmd: cli.FreeFormArg[str] = cli.FreeFormArg("The command to execute")
extra: cli.RawArg
cmd = args.consumeArg()
if cmd is None:
raise RuntimeError("Missing command to execute")
@cli.command("e", "pod/exec", "Execute a command in a pod")
def podExecCmd(args: PodExecArgs):
if not args.name.startswith(podPrefix):
name: str = f"{podPrefix}{name}"
else:
name = args.name
try:
shell.exec("docker", "exec", "-it", name, cmd, *args.extra)
shell.exec("docker", "exec", "-it", name, args.cmd, *args.extra)
except Exception:
raise RuntimeError(f"Pod '{name[len(podPrefix):]}' does not exist")
@cli.command("s", "pod/shell", "Open a shell in a pod")
def _(args: PodExecArgs):
args.cmd = "/bin/bash"
podExecCmd(args)