import argparse
import asyncio
import logging
import os
import signal
from abc import ABCMeta, abstractmethod
from collections import OrderedDict
from importlib import import_module
from subprocess import Popen
from clinner.builder import Builder
from clinner.cli import CLI
from clinner.command import Type, command
from clinner.exceptions import CommandArgParseError, CommandTypeError
from clinner.settings import settings
__all__ = ["MainMeta", "BaseMain"]
class MainMeta(ABCMeta):
def __new__(mcs, name, bases, namespace): # noqa
def inject(self):
"""
Add all environment variables defined in all inject methods.
"""
# Gather inject methods from bases and current class_dict
methods = {k: v for b in bases for k, v in b.__dict__.items() if k.startswith("inject_")}
methods.update({k: v for k, v in namespace.items() if k.startswith("inject_")})
for method_name, method in methods.items():
method(self)
def add_arguments(self, parser, parser_class=None):
"""
Add command line arguments to parser.
:param parser: Parser.
:param parser_class: Parser class.
"""
self._commands_arguments(parser, parser_class)
for base in reversed(bases):
if hasattr(base, "add_arguments"):
getattr(base, "add_arguments")(self, parser)
if hasattr(self, "add_arguments"):
self.add_arguments(parser)
namespace["inject"] = inject
namespace["_add_arguments"] = add_arguments
cmds = {}
for command_fqn in namespace.get("commands", []):
try:
m, c = command_fqn.rsplit(".", 1)
if c not in namespace:
getattr(import_module(m), c)
cmds[c] = command.register[c]
except ValueError:
cmds[command_fqn] = command.register[command_fqn]
except (ImportError, AttributeError):
raise ImportError("Command not found '{}'".format(command_fqn))
namespace["_commands"] = OrderedDict(sorted(cmds.items(), key=lambda t: t[0])) if cmds else None
return super(MainMeta, mcs).__new__(mcs, name, bases, namespace)
class BaseMain(metaclass=MainMeta):
commands = []
description = None
def __init__(self, args=None, parse_args=True):
self.args, self.unknown_args = argparse.Namespace(), []
self.cli = CLI()
if parse_args:
self.args, self.unknown_args = self.parse_arguments(args=args)
# Set logging verbosity
if self.args.quiet:
self.cli.disable()
elif self.args.verbose == 1:
self.cli.set_level(logging.INFO)
elif self.args.verbose >= 2:
self.cli.set_level(logging.DEBUG)
else: # Default log level
self.cli.set_level(logging.WARNING)
# Inject parameters related to current stage as environment variables
self.inject()
# Get settings from args or envvar
self.settings = self.args.settings or os.environ.get("CLINNER_SETTINGS")
# Load settings
settings.build_from_module(self.settings)
def _commands_arguments(self, parser: "argparse.ArgumentParser", parser_class=None):
"""
Add arguments for each command to parser.
:param parser: Parser
"""
# Create subparser for each command
subparsers_kwargs = {"parser_class": lambda **kwargs: parser_class(self, **kwargs)} if parser_class else {}
subparsers = parser.add_subparsers(title="Commands", dest="command", **subparsers_kwargs)
subparsers.required = True
cmds = self._commands if self._commands is not None else command.register
for cmd_name, cmd in cmds.items():
subparser_opts = cmd["parser"]
if cmd["type"] == Type.SHELL:
subparser_opts["add_help"] = False
p = subparsers.add_parser(cmd_name, **subparser_opts)
if callable(cmd["arguments"]):
cmd["arguments"](p)
else:
for argument in cmd["arguments"]:
try:
if len(argument) == 2:
args, kwargs = argument
elif len(argument) == 1:
args = argument[0]
kwargs = {}
else:
args, kwargs = None, None
assert isinstance(args, (tuple, list))
assert isinstance(kwargs, dict)
except AssertionError:
raise CommandArgParseError(str(argument))
else:
p.add_argument(*args, **kwargs)
@abstractmethod
def add_arguments(self, parser: "argparse.ArgumentParser"):
"""
Add to parser all necessary arguments for this Main.
:param parser: Argument parser.
"""
pass
def parse_arguments(self, args=None, parser=None, parser_class=None):
"""
command Line application arguments.
"""
if parser is None:
parser = argparse.ArgumentParser(description=self.description, conflict_handler="resolve")
# Call inner method that adds arguments from all classes (defined in metaclass)
self._add_arguments(parser, parser_class)
return parser.parse_known_args(args=args)
def run_python(self, cmd, *args, **kwargs):
"""
Run a python command in a different process.
:param cmd: Python command.
:param args: List of args passed to Process.
:param kwargs: Dict of kwargs passed to Process.
:return: Command return code.
"""
self.cli.logger.debug("- [python] %s.%s", str(cmd.__module__), str(cmd.__qualname__))
result = 0
if not getattr(self.args, "dry_run", False):
# Run command
if asyncio.iscoroutinefunction(cmd.func.func):
result = asyncio.get_event_loop().run_until_complete(cmd(*args, **kwargs))
else:
result = cmd(*args, **kwargs)
return result
def run_shell(self, cmd, *args, **kwargs):
"""
Run a shell command in a different process.
:param cmd: Shell command.
:param args: List of args passed to Popen.
:param kwargs: Dict of kwargs passed to Popen.
:return: Command return code.
"""
self.cli.logger.info("[shell] %s", " ".join(cmd))
result = 0
if not getattr(self.args, "dry_run", False):
# Run command
p = Popen(args=cmd, *args, **kwargs)
while p.returncode is None: # pragma: no cover
try:
p.wait()
except KeyboardInterrupt:
self.cli.logger.info("Soft quit signal received, waiting the process to stop")
p.send_signal(signal.SIGINT)
try:
p.wait()
except KeyboardInterrupt:
self.cli.logger.info("Hard quit signal received, killing the process immediately")
p.send_signal(signal.SIGKILL)
p.wait(timeout=3)
result = p.returncode
return result
def run_command(self, input_command, *args, **kwargs):
"""
Run the given command, building it with arguments.
:param input_command: Command to execute.
:param args: List of args passed to run_<type> command.
:param kwargs: Dict of kwargs passed to run_<type> command.
:return: Command return code.
"""
# Print header
self.cli.print_header(input_command, **kwargs)
# Get list of commands
commands, command_type = Builder.build_command(input_command, *args, **kwargs)
# Print command list
self.cli.print_commands_list(commands, command_type)
return_code = 0
for c in commands:
if command_type == Type.PYTHON:
return_code = self.run_python(c)
elif command_type in (Type.SHELL, Type.SHELL_WITH_HELP):
return_code = self.run_shell(c)
else: # pragma: no cover
raise CommandTypeError(command_type)
self.cli.print_return(return_code)
# Break on non-zero exit code.
if return_code != 0:
return return_code
return return_code
@abstractmethod
def run(self, *args, **kwargs):
pass