This commit is contained in:
Keyboard Slayer 2024-01-19 14:01:18 +00:00 committed by GitHub
commit cdf331cf4d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 515 additions and 225 deletions

View file

@ -28,8 +28,14 @@ def ensure(version: tuple[int, int, int]):
) )
def setupLogger(verbose: bool): class LoggerArgs:
if verbose: verbose = cli.Arg[bool]("v", "verbose", "Enable verbose logging", default=False)
class logger:
@staticmethod
def setup(args: LoggerArgs):
if args.verbose:
logging.basicConfig( logging.basicConfig(
level=logging.DEBUG, level=logging.DEBUG,
format=f"{vt100.CYAN}%(asctime)s{vt100.RESET} {vt100.YELLOW}%(levelname)s{vt100.RESET} %(name)s: %(message)s", format=f"{vt100.CYAN}%(asctime)s{vt100.RESET} {vt100.YELLOW}%(levelname)s{vt100.RESET} %(name)s: %(message)s",
@ -52,16 +58,22 @@ def setupLogger(verbose: bool):
) )
def main() -> int: class MainArgs(pods.PodArgs, plugins.PluginArgs, LoggerArgs):
try: pass
shell.mkdir(const.GLOBAL_CK_DIR)
args = cli.parse(sys.argv[1:])
setupLogger(args.consumeOpt("verbose", False) is True)
@cli.command(None, "/", const.DESCRIPTION)
def _(args: MainArgs):
shell.mkdir(const.GLOBAL_CK_DIR)
logger.setup(args)
const.setup() const.setup()
plugins.setup(args) plugins.setup(args)
pods.setup(args) pods.setup(args, sys.argv[1:])
cli.exec(args)
def main() -> int:
try:
cli.exec(sys.argv)
return 0 return 0
except RuntimeError as e: except RuntimeError as e:

View file

@ -3,7 +3,7 @@ import logging
import dataclasses as dt import dataclasses as dt
from pathlib import Path from pathlib import Path
from typing import Callable, Literal, TextIO, Union from typing import Callable, Literal, TextIO, Union, Any
from . import shell, rules, model, ninja, const, cli from . import shell, rules, model, ninja, const, cli
@ -15,7 +15,7 @@ class Scope:
registry: model.Registry registry: model.Registry
@staticmethod @staticmethod
def use(args: cli.Args, props: model.Props = {}) -> "Scope": def use(args: Any, props: model.Props = {}) -> "Scope":
registry = model.Registry.use(args, props) registry = model.Registry.use(args, props)
return Scope(registry) return Scope(registry)
@ -32,7 +32,7 @@ class TargetScope(Scope):
target: model.Target target: model.Target
@staticmethod @staticmethod
def use(args: cli.Args, props: model.Props = {}) -> "TargetScope": def use(args: Any, props: model.Props = {}) -> "TargetScope":
registry = model.Registry.use(args, props) registry = model.Registry.use(args, props)
target = model.Target.use(args, props) target = model.Target.use(args, props)
return TargetScope(registry, target) return TargetScope(registry, target)
@ -336,12 +336,12 @@ def build(
@cli.command("b", "builder", "Build/Run/Clean a component or all components") @cli.command("b", "builder", "Build/Run/Clean a component or all components")
def _(args: cli.Args): def _():
pass pass
@cli.command("b", "builder/build", "Build a component or all components") @cli.command("b", "builder/build", "Build a component or all components")
def _(args: cli.Args): def _(args: Any):
scope = TargetScope.use(args) scope = TargetScope.use(args)
componentSpec = args.consumeArg() componentSpec = args.consumeArg()
component = None component = None
@ -350,20 +350,24 @@ def _(args: cli.Args):
build(scope, component if component is not None else "all")[0] build(scope, component if component is not None else "all")[0]
class RunArgs(model.RegistryArgs, shell.DebuggerArgs):
debug = cli.Arg[bool]("d", "debug", "Enable debug mode", default=False)
profile = cli.Arg[bool]("p", "profile", "Enable profiling", default=False)
component = cli.FreeFormArg(
"component", "Component to run", default="__main__"
)
extra: cli.RawArg
@cli.command("r", "builder/run", "Run a component") @cli.command("r", "builder/run", "Run a component")
def runCmd(args: cli.Args): def runCmd(args: RunArgs):
debug = args.consumeOpt("debug", False) is True scope = TargetScope.use(args, {"debug": args.debug})
profile = args.consumeOpt("profile", False) is True
wait = args.consumeOpt("wait", False) is True
debugger = args.consumeOpt("debugger", "lldb")
componentSpec = args.consumeArg() or "__main__"
scope = TargetScope.use(args, {"debug": debug})
component = scope.registry.lookup( component = scope.registry.lookup(
componentSpec, model.Component, includeProvides=True args.component, model.Component, includeProvides=True
) )
if component is None: if component is None:
raise RuntimeError(f"Component {componentSpec} not found") raise RuntimeError(f"Component {args.component} not found")
product = build(scope, component)[0] product = build(scope, component)[0]
@ -374,39 +378,39 @@ def runCmd(args: cli.Args):
try: try:
command = [str(product.path), *args.extra] command = [str(product.path), *args.extra]
if debug: if args.debug:
shell.debug(command, debugger=debugger, wait=wait) shell.debug(command, debugger=args.debugger, wait=args.wait)
elif profile: elif args.profile:
shell.profile(command) shell.profile(command)
else: else:
shell.exec(*command) shell.exec(*command)
except Exception as e: except Exception as e:
cli.error(e) cli.error(str(e))
@cli.command("t", "builder/test", "Run all test targets") @cli.command("t", "builder/test", "Run all test targets")
def _(args: cli.Args): def _(args: RunArgs):
# This is just a wrapper around the `run` command that try # This is just a wrapper around the `run` command that try
# to run a special hook component named __tests__. # to run a special hook component named __tests__.
args.args.insert(0, "__tests__") args.component = "__tests__"
runCmd(args) runCmd(args)
@cli.command("d", "builder/debug", "Debug a component") @cli.command("d", "builder/debug", "Debug a component")
def _(args: cli.Args): def _(args: RunArgs):
# This is just a wrapper around the `run` command that # This is just a wrapper around the `run` command that
# always enable debug mode. # always enable debug mode.
args.opts["debug"] = True args.debug = True
runCmd(args) runCmd(args)
@cli.command("c", "builder/clean", "Clean build files") @cli.command("c", "builder/clean", "Clean build files")
def _(args: cli.Args): def _():
model.Project.use(args) model.Project.use()
shell.rmrf(const.BUILD_DIR) shell.rmrf(const.BUILD_DIR)
@cli.command("n", "builder/nuke", "Clean all build files and caches") @cli.command("n", "builder/nuke", "Clean all build files and caches")
def _(args: cli.Args): def _():
model.Project.use(args) model.Project.use()
shell.rmrf(const.PROJECT_CK_DIR) shell.rmrf(const.PROJECT_CK_DIR)

View file

@ -1,81 +1,214 @@
import enum
import inspect import inspect
import logging import logging
import sys import sys
import dataclasses as dt import dataclasses as dt
from functools import partial
from pathlib import Path from pathlib import Path
from typing import Optional, Union, Callable from typing import (
Any,
NewType,
Optional,
Union,
Callable,
Generic,
cast,
get_origin,
get_args,
)
from . import const, vt100 from . import const, vt100, utils
Value = Union[str, bool, int] Value = Union[str, bool, int]
_logger = logging.getLogger(__name__) _logger = logging.getLogger(__name__)
class Args: # --- Arg parsing -------------------------------------------------------------
opts: dict[str, Value]
args: list[str]
extra: list[str]
def __init__(self):
self.opts = {}
self.args = []
self.extra = []
def consumePrefix(self, prefix: str) -> dict[str, Value]:
result: dict[str, Value] = {}
copy = self.opts.copy()
for key, value in copy.items():
if key.startswith(prefix):
result[key[len(prefix) :]] = value
del self.opts[key]
return result
def consumeOpt(self, key: str, default: Value = False) -> Value:
if key in self.opts:
result = self.opts[key]
del self.opts[key]
return result
return default
def tryConsumeOpt(self, key: str) -> Optional[Value]:
if key in self.opts:
result = self.opts[key]
del self.opts[key]
return result
return None
def consumeArg(self, default: Optional[str] = None) -> Optional[str]:
if len(self.args) == 0:
return default
first = self.args[0]
del self.args[0]
return first
def parse(args: list[str]) -> Args: @dt.dataclass
result = Args() class Arg(Generic[utils.T]):
shortName: str
longName: str
description: str
default: Optional[utils.T] = None
for i in range(len(args)): def __get__(self, instance, owner) -> utils.T:
arg = args[i] if instance is None:
if arg.startswith("--") and not arg == "--": return self # type: ignore
if "=" in arg:
key, value = arg[2:].split("=", 1) return instance.__dict__.get(self.longName, self.default)
result.opts[key] = value
def __set__(self, instance, value: utils.T):
instance.__dict__[self.longName] = value
@dt.dataclass
class FreeFormArg(Generic[utils.T]):
longName: str
description: str
default: Optional[utils.T] = None
def __get__(self, instance, owner) -> utils.T:
if instance is None:
return self # type: ignore
return instance.__dict__.get(self.longName, self.default)
def __set__(self, instance, value: utils.T):
instance.__dict__[self.longName] = value
@dt.dataclass
class RawArg:
longName: str
description: str
def __get__(self, instance, owner) -> list[str]:
if instance is None:
return self # type: ignore
return instance.__dict__.get(self.longName, [])
def __set__(self, instance, value):
instance.__dict__[self.longName] = value
class ParserState(enum.Enum):
FreeForm = enum.auto()
ShortArg = enum.auto()
def parse(argv: list[str], argType: type[utils.T]) -> utils.T:
def set_value(options: dict[str, Any], name: str, value: Any):
if name is not options:
options[name] = value
else: else:
result.opts[arg[2:]] = True raise RuntimeError(f"{name} is already set")
elif arg == "--":
result.extra += args[i + 1 :] def is_optional(t: type) -> bool:
return get_origin(t) is Union and type(None) in get_args(t)
def freeforms_get(argType: type) -> tuple[list[str], list[str]]:
freeforms = []
required_freeforms = []
found_optional = False
for arg, anno in [
arg
for arg in argType.__annotations__.items()
if get_origin(arg[1]) is FreeFormArg
]:
freeforms.append(arg)
if is_optional(get_args(anno)[0]):
found_optional = True
elif found_optional:
raise RuntimeError(
"Required arguments must come before optional arguments"
)
else:
required_freeforms.append(arg)
return (freeforms, required_freeforms)
result = argType()
options: dict[str, Any] = {}
args: dict[str, partial] = {}
freeforms: list[Any] = []
state = ParserState.FreeForm
current_arg: Optional[str] = None
for arg in dir(argType):
if isinstance(getattr(argType, arg), Arg):
args[getattr(argType, arg).shortName] = partial(set_value, options, arg)
args[getattr(argType, arg).longName] = partial(set_value, options, arg)
i = 0
while i < len(argv):
match state:
case ParserState.FreeForm:
if argv[i] == "--":
freeargs = argv[i + 1 :]
i += 1
break break
if argv[i].startswith("--"):
if "=" in argv[i]:
# --name=value
name, value = argv[i][2:].split("=", 1)
if name in args:
args[name](value)
else: else:
result.args.append(arg) # --name -> the value will be True
if argv[i][2:] in args:
args[argv[i][2:]](True)
elif argv[i].startswith("-"):
if len(argv[i][1:]) > 1:
for c in argv[i][1:]:
# -abc -> a, b, c are all True
if c in args:
args[c](True)
else:
state = ParserState.ShortArg
current_arg = argv[i][1:]
else:
freeforms.append(argv[i])
i += 1
case ParserState.ShortArg:
if argv[i].startswith("-"):
# -a -b 4 -> a is True
if current_arg in args:
args[current_arg](True)
else:
# -a 4 -> a is 4
if current_arg in args:
args[current_arg](argv[i])
i += 1
current_arg = None
state = ParserState.FreeForm
freeforms_all, required_freeforms = freeforms_get(argType)
if len(freeforms) < len(required_freeforms):
raise RuntimeError(
f"Missing arguments: {', '.join(required_freeforms[len(freeforms):])}"
)
if len(freeforms) > len(freeforms_all):
raise RuntimeError("Too many arguments")
for i, freeform in enumerate(freeforms):
setattr(result, freeforms_all[i], freeform)
# missing arguments
missing = set(
[
arg[0]
for arg in argType.__annotations__
if get_origin(arg[1]) is Arg and getattr(argType, arg[0]).default is None
]
) - set(options.keys())
if missing:
raise RuntimeError(f"Missing arguments: {', '.join(missing)}")
for key, value in options.items():
# print(getattr(argType, key).type())
field_type = get_args(argType.__annotations__[key])[0]
setattr(result, key, field_type(value))
raw_args = [arg[0] for arg in argType.__annotations__.items() if arg[1] is RawArg]
if len(raw_args) > 1:
raise RuntimeError("Only one RawArg is allowed")
elif len(raw_args) == 1:
setattr(result, raw_args[0], freeargs)
return result return result
Callback = Callable[[Args], None] Callback = Callable[[Any], None] | Callable[[], None]
@dt.dataclass @dt.dataclass
@ -85,30 +218,59 @@ class Command:
helpText: str helpText: str
isPlugin: bool isPlugin: bool
callback: Callback callback: Callback
argType: Optional[type]
subcommands: dict[str, "Command"] = dt.field(default_factory=dict) subcommands: dict[str, "Command"] = dt.field(default_factory=dict)
def help(self):
print(f"{self.longName} - {self.helpText}")
print()
self.usage()
def usage(self):
usage = f"Usage: {self.longName}"
if self.argType is not None:
usage += " [args...]"
if len(self.subcommands) > 0:
usage += " <command> [args...]"
print(usage)
commands: dict[str, Command] = {} commands: dict[str, Command] = {}
rootCommand: Optional[Command] = None
def command(shortName: Optional[str], longName: str, helpText: str): def command(shortName: Optional[str], longName: str, helpText: str):
curframe = inspect.currentframe() curframe = inspect.currentframe()
calframe = inspect.getouterframes(curframe, 2) calframe = inspect.getouterframes(curframe, 2)
def wrap(fn: Callable[[Args], None]): def wrap(fn: utils.T) -> utils.T:
_logger.debug(f"Registering command {longName}") _logger.debug(f"Registering command {longName}")
if len(fn.__annotations__) == 0:
argType = None
else:
argType = list(fn.__annotations__.values())[0]
path = longName.split("/") path = longName.split("/")
command = Command(
shortName,
path[-1] if longName != "/" else "/",
helpText,
Path(calframe[1].filename).parent != Path(__file__).parent,
cast(Callback, fn),
argType,
)
if longName == "/":
global rootCommand
rootCommand = command
else:
parent = commands parent = commands
for p in path[:-1]: for p in path[:-1]:
parent = parent[p].subcommands parent = parent[p].subcommands
parent[path[-1]] = Command( parent[path[-1]] = command
shortName,
path[-1],
helpText,
Path(calframe[1].filename).parent != Path(__file__).parent,
fn,
)
return fn return fn
@ -119,7 +281,7 @@ def command(shortName: Optional[str], longName: str, helpText: str):
@command("u", "usage", "Show usage information") @command("u", "usage", "Show usage information")
def usage(args: Optional[Args] = None): def usage():
print(f"Usage: {const.ARGV0} <command> [args...]") print(f"Usage: {const.ARGV0} <command> [args...]")
@ -150,7 +312,7 @@ def ask(msg: str, default: Optional[bool] = None) -> bool:
@command("h", "help", "Show this help message") @command("h", "help", "Show this help message")
def helpCmd(args: Args): def helpCmd():
usage() usage()
print() print()
@ -195,23 +357,53 @@ def helpCmd(args: Args):
@command("v", "version", "Show current version") @command("v", "version", "Show current version")
def versionCmd(args: Args): def versionCmd():
print(f"CuteKit v{const.VERSION_STR}") print(f"CuteKit v{const.VERSION_STR}")
def exec(args: Args, cmds=commands): def _exec(args: list[str], cmd: Command):
cmd = args.consumeArg() # let's slice the arguments for this command and the sub command
# [-a -b] [sub-cmd -c -d]
if cmd is None: selfArgs = []
raise RuntimeError("No command specified") if len(cmd.subcommands) > 0:
while len(args) > 0 and args[0].startswith("-"):
for c in cmds.values(): selfArgs.append(args.pop(0))
if c.shortName == cmd or c.longName == cmd:
if len(c.subcommands) > 0:
exec(args, c.subcommands)
return
else: else:
c.callback(args) selfArgs = args
if "-h" in selfArgs or "--help" in selfArgs:
cmd.help()
return return
raise RuntimeError(f"Unknown command {cmd}") if "-u" in selfArgs or "--usage" in selfArgs:
cmd.usage()
return
if cmd.argType is not None:
parsedArgs = parse(selfArgs, cmd.argType)
cmd.callback(parsedArgs) # type: ignore
else:
cmd.callback() # type: ignore
if cmd.subcommands:
if len(args) == 0:
error("Missing subcommand")
cmd.usage()
return
for c in cmd.subcommands.values():
if c.shortName == args[0] or c.longName == args[0]:
_exec(args, c)
return
raise RuntimeError(f"Unknown command {args[0]}")
def exec(args: list[str]):
if not rootCommand:
raise RuntimeError("No root command")
rootCommand.longName = Path(args[0]).name
rootCommand.subcommands = commands
_exec(args[1:], rootCommand)

View file

@ -83,13 +83,20 @@ def view(
g.view(filename=os.path.join(target.builddir, "graph.gv")) g.view(filename=os.path.join(target.builddir, "graph.gv"))
class GraphCmd(model.TargetArgs):
scope = cli.Arg("s", "scope", "Scope to show", default="")
onlyLibs = cli.Arg("l", "only-libs", "Only show libraries", default=False)
showDisabled = cli.Arg(
"d", "show-disabled", "Show disabled components", default=False
)
@cli.command("g", "graph", "Show the dependency graph") @cli.command("g", "graph", "Show the dependency graph")
def _(args: cli.Args): def _(args: GraphCmd):
registry = model.Registry.use(args) view(
target = model.Target.use(args) model.Registry.use(args),
model.Target.use(args),
scope = cast(Optional[str], args.tryConsumeOpt("scope")) scope=args.scope,
onlyLibs = args.consumeOpt("only-libs", False) is True showExe=not args.onlyLibs,
showDisabled = args.consumeOpt("show-disabled", False) is True showDisabled=args.showDisabled,
)
view(registry, target, scope=scope, showExe=not onlyLibs, showDisabled=showDisabled)

View file

@ -7,7 +7,7 @@ from enum import Enum
from typing import Any, Generator, Optional, Type, cast from typing import Any, Generator, Optional, Type, cast
from pathlib import Path from pathlib import Path
from dataclasses_json import DataClassJsonMixin from dataclasses_json import DataClassJsonMixin
from typing import Union
from cutekit import const, shell from cutekit import const, shell
@ -171,7 +171,7 @@ class Project(Manifest):
Project.fetchs(project.extern) Project.fetchs(project.extern)
@staticmethod @staticmethod
def use(args: cli.Args) -> "Project": def use() -> "Project":
global _project global _project
if _project is None: if _project is None:
_project = Project.ensure() _project = Project.ensure()
@ -179,29 +179,37 @@ class Project(Manifest):
@cli.command("m", "model", "Manage the model") @cli.command("m", "model", "Manage the model")
def _(args: cli.Args): def _():
pass pass
@cli.command("i", "model/install", "Install required external packages") @cli.command("i", "model/install", "Install required external packages")
def _(args: cli.Args): def _():
project = Project.use(args) project = Project.use()
Project.fetchs(project.extern) Project.fetchs(project.extern)
class InitArgs:
repo = cli.Arg(
"r",
"repo",
"Repository to use for templates",
default=const.DEFAULT_REPO_TEMPLATES,
)
list = cli.Arg("l", "list", "List available templates", default=False)
template = cli.FreeFormArg("template", "Template to use")
name = cli.FreeFormArg("name", "Name of the project")
@cli.command("I", "model/init", "Initialize a new project") @cli.command("I", "model/init", "Initialize a new project")
def _(args: cli.Args): def _(args: InitArgs):
import requests import requests
repo = args.consumeOpt("repo", const.DEFAULT_REPO_TEMPLATES)
list = args.consumeOpt("list")
template = args.consumeArg()
name = args.consumeArg()
_logger.info("Fetching registry...") _logger.info("Fetching registry...")
r = requests.get(f"https://raw.githubusercontent.com/{repo}/main/registry.json") r = requests.get(
f"https://raw.githubusercontent.com/{args.repo}/main/registry.json"
)
if r.status_code != 200: if r.status_code != 200:
_logger.error("Failed to fetch registry") _logger.error("Failed to fetch registry")
@ -209,30 +217,29 @@ def _(args: cli.Args):
registry = r.json() registry = r.json()
if list: if args.list:
print( print(
"\n".join(f"* {entry['id']} - {entry['description']}" for entry in registry) "\n".join(f"* {entry['id']} - {entry['description']}" for entry in registry)
) )
return return
if not template:
raise RuntimeError("Template not specified")
def template_match(t: jexpr.Json) -> str: def template_match(t: jexpr.Json) -> str:
return t["id"] == template return t["id"] == args.template
if not any(filter(template_match, registry)): if not any(filter(template_match, registry)):
raise LookupError(f"Couldn't find a template named {template}") raise LookupError(f"Couldn't find a template named {args.template}")
if not name: if args.name is None:
_logger.info(f"No name was provided, defaulting to {template}") _logger.info(f"No name was provided, defaulting to {args.template}")
name = template name = args.template
else:
name = args.name
if os.path.exists(name): if os.path.exists(name):
raise RuntimeError(f"Directory {name} already exists") raise RuntimeError(f"Directory {name} already exists")
print(f"Creating project {name} from template {template}...") print(f"Creating project {name} from template {args.template}...")
shell.cloneDir(f"https://github.com/{repo}", template, name) shell.cloneDir(f"https://github.com/{args.repo}", args.template, name)
print(f"Project {name} created\n") print(f"Project {name} created\n")
print("We suggest that you begin by typing:") print("We suggest that you begin by typing:")
@ -263,6 +270,17 @@ DEFAULT_TOOLS: Tools = {
} }
class RegistryArgs:
mixins = cli.Arg[list[str]]("m", "mixins", "Mixins to apply", default=[])
# props = cli.Arg[dict[str]]("p", "props", "Properties to set", default="")
class TargetArgs(RegistryArgs):
target = cli.Arg(
"t", "target", "The target to use", default="host-" + shell.uname().machine
)
@dt.dataclass @dt.dataclass
class Target(Manifest): class Target(Manifest):
props: Props = dt.field(default_factory=dict) props: Props = dt.field(default_factory=dict)
@ -287,9 +305,9 @@ class Target(Manifest):
return os.path.join(const.BUILD_DIR, f"{self.id}{postfix}") return os.path.join(const.BUILD_DIR, f"{self.id}{postfix}")
@staticmethod @staticmethod
def use(args: cli.Args, props: Props = {}) -> "Target": def use(args: TargetArgs, props: Props = {}) -> "Target":
registry = Registry.use(args, props) registry = Registry.use(args, props)
targetSpec = str(args.consumeOpt("target", "host-" + shell.uname().machine)) targetSpec = args.target
return registry.ensure(targetSpec, Target) return registry.ensure(targetSpec, Target)
def route(self, componentSpec: str): def route(self, componentSpec: str):
@ -536,17 +554,25 @@ class Registry(DataClassJsonMixin):
return m return m
@staticmethod @staticmethod
def use(args: cli.Args, props: Props = {}) -> "Registry": def use(args: RegistryArgs, props: Props = {}) -> "Registry":
global _registry global _registry
if _registry is not None: if _registry is not None:
return _registry return _registry
project = Project.use(args) project = Project.use()
mixins = str(args.consumeOpt("mixins", "")).split(",")
if not hasattr(args, "mixins"):
mixins = []
else:
if not isinstance(args.mixins, str):
raise RuntimeError("Mixins attribute on provided args is not a string")
else:
mixins = args.mixins.split(",")
if mixins == [""]: if mixins == [""]:
mixins = [] mixins = []
props |= cast(dict[str, str], args.consumePrefix("prop:"))
# props |= args.props
_registry = Registry.load(project, mixins, props) _registry = Registry.load(project, mixins, props)
return _registry return _registry
@ -639,7 +665,7 @@ class Registry(DataClassJsonMixin):
@cli.command("l", "model/list", "List all components and targets") @cli.command("l", "model/list", "List all components and targets")
def _(args: cli.Args): def _(args: RegistryArgs):
registry = Registry.use(args) registry = Registry.use(args)
components = list(registry.iter(Component)) components = list(registry.iter(Component))

View file

@ -51,6 +51,10 @@ def loadAll():
load(os.path.join(pluginDir, files)) load(os.path.join(pluginDir, files))
def setup(args: cli.Args): class PluginArgs:
if not bool(args.consumeOpt("safemode", False)): safemode = cli.Arg[bool]("s", "safemode", "Skip loading plugins", default=False)
def setup(args: PluginArgs):
if not args.safemode:
loadAll() loadAll()

View file

@ -77,20 +77,22 @@ IMAGES: dict[str, Image] = {
} }
def setup(args: cli.Args): class PodArgs:
pod = cli.Arg[bool]("p", "enable-pod", "Enable pod", default=False)
podName = cli.Arg[str]("n", "pod-name", "The name of the pod", default="")
def setup(args: PodArgs, argv: list[str]):
""" """
Reincarnate cutekit within a docker container, this is Reincarnate cutekit within a docker container, this is
useful for cross-compiling useful for cross-compiling
""" """
pod = args.consumeOpt("pod", False) if not args.pod:
if not pod:
return return
if isinstance(pod, str):
pod = pod.strip() pod = args.podName.strip() or defaultPodName
pod = podPrefix + pod pod = podPrefix + args.podName
if pod is True:
pod = defaultPodName
assert isinstance(pod, str)
model.Project.ensure() model.Project.ensure()
print(f"Reincarnating into pod '{pod[len(podPrefix) :]}'...") print(f"Reincarnating into pod '{pod[len(podPrefix) :]}'...")
try: try:
@ -103,7 +105,7 @@ def setup(args: cli.Args):
pod, pod,
"/tools/cutekit/entrypoint.sh", "/tools/cutekit/entrypoint.sh",
"--reincarnated", "--reincarnated",
*args.args, *argv,
) )
sys.exit(0) sys.exit(0)
except Exception: except Exception:
@ -111,7 +113,7 @@ def setup(args: cli.Args):
@cli.command("p", "pod", "Manage pods") @cli.command("p", "pod", "Manage pods")
def _(args: cli.Args): def _():
pass pass
@ -121,18 +123,27 @@ def tryDecode(data: Optional[bytes], default: str = "") -> str:
return data.decode() return data.decode()
class PodCreateArgs:
name: cli.Arg[str] = cli.Arg(
"n", "name", "The name of the pod to use", default="default"
)
image = cli.Arg[str]("i", "image", "The image to use", default=defaultPodImage)
@cli.command("c", "pod/create", "Create a new pod") @cli.command("c", "pod/create", "Create a new pod")
def _(args: cli.Args): def _(args: PodCreateArgs):
""" """
Create a new development pod with cutekit installed and the current Create a new development pod with cutekit installed and the current
project mounted at /project project mounted at /project
""" """
project = model.Project.ensure() project = model.Project.ensure()
name = str(args.consumeOpt("name", defaultPodName)) if not args.name.startswith(podPrefix):
if not name.startswith(podPrefix): name: str = f"{podPrefix}{args.name}"
name = f"{podPrefix}{name}" else:
image = IMAGES[str(args.consumeOpt("image", defaultPodImage))] name = args.name
image = IMAGES[args.image]
client = docker.from_env() client = docker.from_env()
try: try:
@ -173,12 +184,19 @@ def _(args: cli.Args):
print(f"Created pod '{name[len(podPrefix) :]}' from image '{image.image}'") print(f"Created pod '{name[len(podPrefix) :]}' from image '{image.image}'")
class KillPodArgs:
name: cli.Arg[str] = cli.Arg(
"n", "name", "The name of the pod to kill", default=defaultPodName
)
@cli.command("k", "pod/kill", "Stop and remove a pod") @cli.command("k", "pod/kill", "Stop and remove a pod")
def _(args: cli.Args): def _(args: KillPodArgs):
client = docker.from_env() client = docker.from_env()
name = str(args.consumeOpt("name", defaultPodName)) if not args.name.startswith(podPrefix):
if not name.startswith(podPrefix): name: str = f"{podPrefix}{args.name}"
name = f"{podPrefix}{name}" else:
name = args.name
try: try:
container = client.containers.get(name) container = client.containers.get(name)
@ -189,14 +207,8 @@ def _(args: cli.Args):
raise RuntimeError(f"Pod '{name[len(podPrefix):]}' does not exist") raise RuntimeError(f"Pod '{name[len(podPrefix):]}' does not exist")
@cli.command("s", "pod/shell", "Open a shell in a pod")
def _(args: cli.Args):
args.args.insert(0, "/bin/bash")
podExecCmd(args)
@cli.command("l", "pod/list", "List all pods") @cli.command("l", "pod/list", "List all pods")
def _(args: cli.Args): def _():
client = docker.from_env() client = docker.from_env()
hasPods = False hasPods = False
for container in client.containers.list(all=True): for container in client.containers.list(all=True):
@ -209,17 +221,26 @@ def _(args: cli.Args):
print(vt100.p("(No pod found)")) print(vt100.p("(No pod found)"))
@cli.command("e", "pod/exec", "Execute a command in a pod") class PodExecArgs:
def podExecCmd(args: cli.Args): name = cli.Arg("n", "name", "The name of the pod to use", default=defaultPodName)
name = str(args.consumeOpt("name", defaultPodName)) cmd = cli.FreeFormArg("cmd", "The command to execute", default="/bin/bash")
if not name.startswith(podPrefix): args = cli.FreeFormArg("args", "The arguments to pass to the command")
name = f"{podPrefix}{name}"
cmd = args.consumeArg()
if cmd is None: @cli.command("e", "pod/exec", "Execute a command in a pod")
raise RuntimeError("Missing command to execute") def podExecCmd(args: PodExecArgs):
if not args.name.startswith(podPrefix):
name: str = f"{podPrefix}{args.name}"
else:
name = args.name
try: try:
shell.exec("docker", "exec", "-it", name, cmd, *args.extra) shell.exec("docker", "exec", "-it", name, args.cmd, *args.args)
except Exception: except Exception:
raise RuntimeError(f"Pod '{name[len(podPrefix):]}' does not exist") raise RuntimeError(f"Pod '{name[len(podPrefix):]}' does not exist")
@cli.command("s", "pod/shell", "Open a shell in a pod")
def _(args: PodExecArgs):
args.cmd = "/bin/bash"
podExecCmd(args)

View file

@ -395,27 +395,51 @@ def compress(path: str, dest: Optional[str] = None, format: str = "zstd") -> str
@cli.command("s", "scripts", "Manage scripts") @cli.command("s", "scripts", "Manage scripts")
def _(args: cli.Args): def _():
pass pass
class DebuggerArgs:
debugger = cli.Arg[str](
"d", "debugger", "Debugger to use (lldb, gdb)", default="lldb"
)
wait = cli.Arg[bool]("w", "wait", "Wait for debugger to attach")
class DebugArgs(DebuggerArgs):
cmd = cli.FreeFormArg[str]("cmd", "Command to debug", default="a.out")
args = cli.RawArg("args", "Arguments to pass to the command")
@cli.command("d", "debug", "Debug a program") @cli.command("d", "debug", "Debug a program")
def _(args: cli.Args): def _(args: DebugArgs):
wait = args.consumeOpt("wait", False) is True command = [args.cmd, *args.args]
debugger = args.consumeOpt("debugger", "lldb") debug(command, args.debugger, args.wait)
command = [str(args.consumeArg()), *args.extra]
debug(command, debugger=debugger, wait=wait)
class ProfilerArgs:
pass
class ProfileArgs:
cmd = cli.FreeFormArg[str]("cmd", "Command to profile", default="a.out")
extra: cli.RawArg
@cli.command("p", "profile", "Profile a program") @cli.command("p", "profile", "Profile a program")
def _(args: cli.Args): def _(args: ProfileArgs):
command = [str(args.consumeArg()), *args.extra] command = [args.cmd, *args.extra]
profile(command) profile(command)
class CompressArgs:
dest = cli.Arg[str]("d", "dest", "Destination file")
format = cli.Arg[str](
"f", "format", "Compression format (zip, zstd, gzip)", default="zstd"
)
path = cli.FreeFormArg[str]("path", "Path to compress")
@cli.command("c", "compress", "Compress a file or directory") @cli.command("c", "compress", "Compress a file or directory")
def _(args: cli.Args): def _(args: CompressArgs):
path = str(args.consumeArg()) compress(args.path, args.dest, args.format)
dest = args.consumeOpt("dest", None)
format = args.consumeOpt("format", "zstd")
compress(path, dest, format)