Adjusted typing
This commit is contained in:
parent
d0da609ba9
commit
6649488b0b
|
@ -6,7 +6,16 @@ import dataclasses as dt
|
|||
|
||||
from functools import partial
|
||||
from pathlib import Path
|
||||
from typing import Any, NewType, Optional, Union, Callable, Generic, get_origin, get_args
|
||||
from typing import (
|
||||
Any,
|
||||
NewType,
|
||||
Optional,
|
||||
Union,
|
||||
Callable,
|
||||
Generic,
|
||||
get_origin,
|
||||
get_args,
|
||||
)
|
||||
|
||||
from . import const, vt100, utils
|
||||
|
||||
|
@ -25,30 +34,34 @@ class Arg(Generic[utils.T]):
|
|||
description: str
|
||||
default: Optional[utils.T] = None
|
||||
|
||||
def __get__(self, instance, owner):
|
||||
def __get__(self, instance, owner) -> utils.T:
|
||||
if instance is None:
|
||||
return self
|
||||
return self # type: ignore
|
||||
|
||||
return instance.__dict__.get(self.longName, self.default)
|
||||
|
||||
|
||||
@dt.dataclass
|
||||
class FreeFormArg(Generic[utils.T]):
|
||||
longName: str
|
||||
description: str
|
||||
default: Optional[utils.T] = None
|
||||
|
||||
def __get__(self, instance, owner):
|
||||
def __get__(self, instance, owner) -> utils.T:
|
||||
if instance is None:
|
||||
return self
|
||||
return self # type: ignore
|
||||
|
||||
return instance.__dict__.get(self.longName, self.default)
|
||||
|
||||
return self.default
|
||||
|
||||
class ParserState(enum.Enum):
|
||||
FreeForm = enum.auto()
|
||||
ShortArg = enum.auto()
|
||||
|
||||
|
||||
RawArg = NewType("RawArg", str)
|
||||
|
||||
|
||||
class CutekitArgs:
|
||||
cmd: FreeFormArg[str] = FreeFormArg("Command to execute")
|
||||
verbose: Arg[bool] = Arg("v", "verbose", "Enable verbose logging")
|
||||
|
@ -57,7 +70,7 @@ class CutekitArgs:
|
|||
podName: Arg[str] = Arg("n", "pod-name", "The name of the pod", default="")
|
||||
|
||||
|
||||
def parse(argv: list[str], argType: type) -> Any:
|
||||
def parse(argv: list[str], argType: type[utils.T]) -> utils.T:
|
||||
def set_value(options: dict[str, Any], name: str, value: Any):
|
||||
if name is not options:
|
||||
options[name] = value
|
||||
|
@ -82,7 +95,7 @@ def parse(argv: list[str], argType: type) -> Any:
|
|||
found_optional = True
|
||||
elif found_optional:
|
||||
raise RuntimeError(
|
||||
f"Required arguments must come before optional arguments"
|
||||
"Required arguments must come before optional arguments"
|
||||
)
|
||||
else:
|
||||
required_freeforms.append(arg)
|
||||
|
@ -107,7 +120,7 @@ def parse(argv: list[str], argType: type) -> Any:
|
|||
match state:
|
||||
case ParserState.FreeForm:
|
||||
if argv[i] == "--":
|
||||
freeargs = argv[i + 1:]
|
||||
freeargs = argv[i + 1 :]
|
||||
i += 1
|
||||
break
|
||||
if argv[i].startswith("--"):
|
||||
|
@ -153,7 +166,7 @@ def parse(argv: list[str], argType: type) -> Any:
|
|||
f"Missing arguments: {', '.join(required_freeforms[len(freeforms):])}"
|
||||
)
|
||||
if len(freeforms) > len(freeforms_all):
|
||||
raise RuntimeError(f"Too many arguments")
|
||||
raise RuntimeError("Too many arguments")
|
||||
|
||||
for i, freeform in enumerate(freeforms):
|
||||
setattr(result, freeforms_all[i], freeform)
|
||||
|
@ -174,9 +187,9 @@ def parse(argv: list[str], argType: type) -> Any:
|
|||
setattr(result, key, field_type(value))
|
||||
|
||||
raw_args = [arg[0] for arg in argType.__annotations__.items() if arg[1] is RawArg]
|
||||
|
||||
|
||||
if len(raw_args) > 1:
|
||||
raise RuntimeError(f"Only one RawArg is allowed")
|
||||
raise RuntimeError("Only one RawArg is allowed")
|
||||
elif len(raw_args) == 1:
|
||||
setattr(result, raw_args[0], freeargs)
|
||||
|
||||
|
@ -222,7 +235,7 @@ def command(shortName: Optional[str], longName: str, helpText: str):
|
|||
helpText,
|
||||
Path(calframe[1].filename).parent != Path(__file__).parent,
|
||||
fn,
|
||||
argType
|
||||
argType,
|
||||
)
|
||||
|
||||
return fn
|
||||
|
@ -314,7 +327,7 @@ def versionCmd():
|
|||
print(f"CuteKit v{const.VERSION_STR}")
|
||||
|
||||
|
||||
def exec(cmd: str, args: list[str], cmds: dict[str, Command]=commands):
|
||||
def exec(cmd: str, args: list[str], cmds: dict[str, Command] = commands):
|
||||
for c in cmds.values():
|
||||
if c.shortName == cmd or c.longName == cmd:
|
||||
if len(c.subcommands) > 0:
|
||||
|
@ -322,9 +335,9 @@ def exec(cmd: str, args: list[str], cmds: dict[str, Command]=commands):
|
|||
return
|
||||
else:
|
||||
if c.argType is not None:
|
||||
c.callback(parse(args[1:], c.argType)) # type: ignore
|
||||
c.callback(parse(args[1:], c.argType)) # type: ignore
|
||||
else:
|
||||
c.callback() # type: ignore
|
||||
c.callback() # type: ignore
|
||||
return
|
||||
|
||||
raise RuntimeError(f"Unknown command {cmd}")
|
||||
|
|
|
@ -7,7 +7,7 @@ from enum import Enum
|
|||
from typing import Any, Generator, Optional, Type, cast
|
||||
from pathlib import Path
|
||||
from dataclasses_json import DataClassJsonMixin
|
||||
from typing import Union
|
||||
|
||||
|
||||
from cutekit import const, shell
|
||||
|
||||
|
@ -184,14 +184,21 @@ def _():
|
|||
|
||||
|
||||
@cli.command("i", "model/install", "Install required external packages")
|
||||
def _(args: Any):
|
||||
def _():
|
||||
project = Project.use()
|
||||
Project.fetchs(project.extern)
|
||||
|
||||
|
||||
class InitArgs:
|
||||
repo: cli.Arg[str] = cli.Arg("r", "repo", "Repository to use for templates", default=const.DEFAULT_REPO_TEMPLATES)
|
||||
list: cli.Arg[bool] = cli.Arg("l", "list", "List available templates", default=False)
|
||||
repo: cli.Arg[str] = cli.Arg(
|
||||
"r",
|
||||
"repo",
|
||||
"Repository to use for templates",
|
||||
default=const.DEFAULT_REPO_TEMPLATES,
|
||||
)
|
||||
list: cli.Arg[bool] = cli.Arg(
|
||||
"l", "list", "List available templates", default=False
|
||||
)
|
||||
|
||||
template: cli.FreeFormArg[str] = cli.FreeFormArg("Template to use")
|
||||
name: cli.FreeFormArg[Optional[str]] = cli.FreeFormArg("Name of the project")
|
||||
|
@ -203,7 +210,9 @@ def _(args: InitArgs):
|
|||
|
||||
_logger.info("Fetching registry...")
|
||||
|
||||
r = requests.get(f"https://raw.githubusercontent.com/{args.repo}/main/registry.json")
|
||||
r = requests.get(
|
||||
f"https://raw.githubusercontent.com/{args.repo}/main/registry.json"
|
||||
)
|
||||
|
||||
if r.status_code != 200:
|
||||
_logger.error("Failed to fetch registry")
|
||||
|
@ -264,6 +273,17 @@ DEFAULT_TOOLS: Tools = {
|
|||
}
|
||||
|
||||
|
||||
class RegistryArgs:
|
||||
mixins: cli.Arg[str] = cli.Arg("m", "mixins", "Mixins to apply", default="")
|
||||
# props: cli.Arg[dict[str]] = cli.Arg("p", "props", "Properties to set", default="")
|
||||
|
||||
|
||||
class TargetArgs(RegistryArgs):
|
||||
target = cli.Arg(
|
||||
"t", "target", "The target to use", default="host-" + shell.uname().machine
|
||||
)
|
||||
|
||||
|
||||
@dt.dataclass
|
||||
class Target(Manifest):
|
||||
props: Props = dt.field(default_factory=dict)
|
||||
|
@ -288,9 +308,9 @@ class Target(Manifest):
|
|||
return os.path.join(const.BUILD_DIR, f"{self.id}{postfix}")
|
||||
|
||||
@staticmethod
|
||||
def use(args: Any, props: Props = {}) -> "Target":
|
||||
def use(args: TargetArgs, props: Props = {}) -> "Target":
|
||||
registry = Registry.use(args, props)
|
||||
targetSpec = str(args.consumeOpt("target", "host-" + shell.uname().machine))
|
||||
targetSpec = args.target
|
||||
return registry.ensure(targetSpec, Target)
|
||||
|
||||
def route(self, componentSpec: str):
|
||||
|
@ -537,7 +557,7 @@ class Registry(DataClassJsonMixin):
|
|||
return m
|
||||
|
||||
@staticmethod
|
||||
def use(args: Any, props: Props = {}) -> "Registry":
|
||||
def use(args: RegistryArgs, props: Props = {}) -> "Registry":
|
||||
global _registry
|
||||
|
||||
if _registry is not None:
|
||||
|
@ -554,8 +574,8 @@ class Registry(DataClassJsonMixin):
|
|||
mixins = args.mixins.split(",")
|
||||
if mixins == [""]:
|
||||
mixins = []
|
||||
|
||||
#props |= cast(dict[str, str], args.consumePrefix("prop:"))
|
||||
|
||||
# props |= args.props
|
||||
_registry = Registry.load(project, mixins, props)
|
||||
return _registry
|
||||
|
||||
|
@ -647,11 +667,8 @@ class Registry(DataClassJsonMixin):
|
|||
return r
|
||||
|
||||
|
||||
class ListArgs:
|
||||
mixins: cli.Arg[str] = cli.Arg("m", "mixins", "Mixins to apply", default="")
|
||||
|
||||
@cli.command("l", "model/list", "List all components and targets")
|
||||
def _(args: ListArgs):
|
||||
def _(args: RegistryArgs):
|
||||
registry = Registry.use(args)
|
||||
|
||||
components = list(registry.iter(Component))
|
||||
|
|
|
@ -131,7 +131,7 @@ def _(args: PodArgs):
|
|||
project = model.Project.ensure()
|
||||
|
||||
if not args.name.startswith(podPrefix):
|
||||
name: str = f"{podPrefix}{name}"
|
||||
name: str = f"{podPrefix}{args.name}"
|
||||
else:
|
||||
name = args.name
|
||||
|
||||
|
@ -183,7 +183,7 @@ class KillPodArgs:
|
|||
def _(args: KillPodArgs):
|
||||
client = docker.from_env()
|
||||
if not args.name.startswith(podPrefix):
|
||||
name: str = f"{podPrefix}{name}"
|
||||
name: str = f"{podPrefix}{args.name}"
|
||||
else:
|
||||
name = args.name
|
||||
|
||||
|
@ -218,7 +218,7 @@ class PodExecArgs:
|
|||
@cli.command("e", "pod/exec", "Execute a command in a pod")
|
||||
def podExecCmd(args: PodExecArgs):
|
||||
if not args.name.startswith(podPrefix):
|
||||
name: str = f"{podPrefix}{name}"
|
||||
name: str = f"{podPrefix}{args.name}"
|
||||
else:
|
||||
name = args.name
|
||||
|
||||
|
@ -231,4 +231,4 @@ def podExecCmd(args: PodExecArgs):
|
|||
@cli.command("s", "pod/shell", "Open a shell in a pod")
|
||||
def _(args: PodExecArgs):
|
||||
args.cmd = "/bin/bash"
|
||||
podExecCmd(args)
|
||||
podExecCmd(args)
|
||||
|
|
|
@ -399,8 +399,16 @@ def _(args: cli.Args):
|
|||
pass
|
||||
|
||||
|
||||
class DebugArgs:
|
||||
debugger = cli.Arg[str](
|
||||
"d", "debugger", "Debugger to use (lldb, gdb)", default="lldb"
|
||||
)
|
||||
wait = cli.Arg[bool]("w", "wait", "Wait for debugger to attach")
|
||||
extra = cli.RawArg
|
||||
|
||||
|
||||
@cli.command("d", "debug", "Debug a program")
|
||||
def _(args: cli.Args):
|
||||
def _(args: DebugArgs):
|
||||
wait = args.consumeOpt("wait", False) is True
|
||||
debugger = args.consumeOpt("debugger", "lldb")
|
||||
command = [str(args.consumeArg()), *args.extra]
|
||||
|
|
Loading…
Reference in a new issue