# coding=utf-8 """ Bridges calls made inside of pyscript with the Cmd2 host app while maintaining a reasonable degree of isolation between the two Copyright 2018 Eric Lin Released under MIT license, see LICENSE file """ import argparse import sys from typing import List, Optional from .argparse_completer import _RangeAction, is_potential_flag from .utils import namedtuple_with_defaults, StdSim, quote_string_if_needed # Python 3.4 require contextlib2 for temporarily redirecting stderr and stdout if sys.version_info < (3, 5): from contextlib2 import redirect_stdout, redirect_stderr else: from contextlib import redirect_stdout, redirect_stderr class CommandResult(namedtuple_with_defaults('CommandResult', ['stdout', 'stderr', 'data'])): """Encapsulates the results from a command. Named tuple attributes ---------------------- stdout: str - Output captured from stdout while this command is executing stderr: str - Output captured from stderr while this command is executing. None if no error captured data - Data returned by the command. NOTE: Named tuples are immutable. So the contents are there for access, not for modification. """ def __bool__(self) -> bool: """Returns True if the command succeeded, otherwise False""" # If data has a __bool__ method, then call it to determine success of command if self.data is not None and callable(getattr(self.data, '__bool__', None)): return bool(self.data) # Otherwise check if stderr was filled out else: return not self.stderr def _exec_cmd(cmd2_app, command: str, echo: bool) -> CommandResult: """ Helper to encapsulate executing a command and capturing the results :param cmd2_app: cmd2 app that will run the command :param command: command line being run :param echo: if True, output will be echoed to stdout/stderr while the command runs :return: result of the command """ copy_stdout = StdSim(sys.stdout, echo) copy_stderr = StdSim(sys.stderr, echo) copy_cmd_stdout = StdSim(cmd2_app.stdout, echo) cmd2_app._last_result = None try: cmd2_app.stdout = copy_cmd_stdout with redirect_stdout(copy_stdout): with redirect_stderr(copy_stderr): # Include a newline in case it's a multiline command cmd2_app.onecmd_plus_hooks(command + '\n') finally: cmd2_app.stdout = copy_cmd_stdout.inner_stream # if stderr is empty, set it to None stderr = copy_stderr.getvalue() if copy_stderr.getvalue() else None outbuf = copy_cmd_stdout.getvalue() if copy_cmd_stdout.getvalue() else copy_stdout.getvalue() result = CommandResult(stdout=outbuf, stderr=stderr, data=cmd2_app._last_result) return result class ArgparseFunctor: """ Encapsulates translating Python object traversal """ def __init__(self, echo: bool, cmd2_app, command_name: str, parser: argparse.ArgumentParser): self._echo = echo self._cmd2_app = cmd2_app self._command_name = command_name self._parser = parser # Dictionary mapping command argument name to value self._args = {} # tag the argument that's a remainder type self._remainder_arg = None # separately track flag arguments so they will be printed before positionals self._flag_args = [] # argparse object for the current command layer self.__current_subcommand_parser = parser def __dir__(self): """Returns a custom list of attribute names to match the sub-commands""" commands = [] for action in self.__current_subcommand_parser._actions: if not action.option_strings and isinstance(action, argparse._SubParsersAction): commands.extend(action.choices) return commands def __getattr__(self, item: str): """Search for a sub-command matching this item and update internal state to track the traversal""" # look for sub-command under the current command/sub-command layer for action in self.__current_subcommand_parser._actions: if not action.option_strings and isinstance(action, argparse._SubParsersAction): if item in action.choices: # item matches the a sub-command, save our position in argparse, # save the sub-command, return self to allow next level of traversal self.__current_subcommand_parser = action.choices[item] self._args[action.dest] = item return self raise AttributeError(item) def __call__(self, *args, **kwargs): """ Process the arguments at this layer of the argparse command tree. If there are more sub-commands, return self to accept the next sub-command name. If there are no more sub-commands, execute the sub-command with the given parameters. """ next_pos_index = 0 has_subcommand = False # Iterate through the current sub-command's arguments in order for action in self.__current_subcommand_parser._actions: # is this a flag option? if action.option_strings: # this is a flag argument, search for the argument by name in the parameters if action.dest in kwargs: self._args[action.dest] = kwargs[action.dest] self._flag_args.append(action.dest) else: # This is a positional argument, search the positional arguments passed in. if not isinstance(action, argparse._SubParsersAction): if action.dest in kwargs: # if this positional argument happens to be passed in as a keyword argument # go ahead and consume the matching keyword argument self._args[action.dest] = kwargs[action.dest] elif next_pos_index < len(args): # Make sure we actually have positional arguments to consume pos_remain = len(args) - next_pos_index # Check if this argument consumes a range of values if isinstance(action, _RangeAction) and action.nargs_min is not None \ and action.nargs_max is not None: # this is a cmd2 ranged action. if pos_remain >= action.nargs_min: # Do we meet the minimum count? if pos_remain > action.nargs_max: # Do we exceed the maximum count? self._args[action.dest] = args[next_pos_index:next_pos_index + action.nargs_max] next_pos_index += action.nargs_max else: self._args[action.dest] = args[next_pos_index:next_pos_index + pos_remain] next_pos_index += pos_remain else: raise ValueError('Expected at least {} values for {}'.format(action.nargs_min, action.dest)) elif action.nargs is not None: if action.nargs == '+': if pos_remain > 0: self._args[action.dest] = args[next_pos_index:next_pos_index + pos_remain] next_pos_index += pos_remain else: raise ValueError('Expected at least 1 value for {}'.format(action.dest)) elif action.nargs == '*': self._args[action.dest] = args[next_pos_index:next_pos_index + pos_remain] next_pos_index += pos_remain elif action.nargs == argparse.REMAINDER: self._args[action.dest] = args[next_pos_index:next_pos_index + pos_remain] next_pos_index += pos_remain self._remainder_arg = action.dest elif action.nargs == '?': self._args[action.dest] = args[next_pos_index] next_pos_index += 1 else: self._args[action.dest] = args[next_pos_index] next_pos_index += 1 else: has_subcommand = True # Check if there are any extra arguments we don't know how to handle for kw in kwargs: if kw not in self._args: raise TypeError("{}() got an unexpected keyword argument '{}'".format( self.__current_subcommand_parser.prog, kw)) if has_subcommand: return self else: return self._run() def _run(self): # look up command function func = self._cmd2_app.cmd_func(self._command_name) if func is None: raise AttributeError("'{}' object has no command called '{}'".format(self._cmd2_app.__class__.__name__, self._command_name)) # reconstruct the cmd2 command from the python call command = self._command_name def process_argument(action, value): nonlocal command if isinstance(action, argparse._CountAction): if isinstance(value, int): for _ in range(value): command += ' {}'.format(action.option_strings[0]) return else: raise TypeError('Expected int for ' + action.dest) if isinstance(action, argparse._StoreConstAction) or isinstance(action, argparse._AppendConstAction): if value: # Nothing else to append to the command string, just the flag is enough. command += ' {}'.format(action.option_strings[0]) return else: # value is not True so we default to false, which means don't include the flag return # was the argument a flag? if action.option_strings: command += ' {}'.format(action.option_strings[0]) is_remainder_arg = action.dest == self._remainder_arg if isinstance(value, List) or isinstance(value, tuple): for item in value: item = str(item).strip() if not is_remainder_arg and is_potential_flag(item, self._parser): raise ValueError('{} appears to be a flag and should be supplied as a keyword argument ' 'to the function.'.format(item)) item = quote_string_if_needed(item) command += ' {}'.format(item) # If this is a flag parameter that can accept a variable number of arguments and we have not # reached the max number, add 2 prefix chars (ex: --) to tell argparse to stop processing the # parameter. This also means the remaining arguments will be treated as positionals by argparse. if action.option_strings and isinstance(action, _RangeAction) and action.nargs_max is not None and \ action.nargs_max > len(value): command += ' {0}{0}'.format(self._parser.prefix_chars[0]) else: value = str(value).strip() if not is_remainder_arg and is_potential_flag(value, self._parser): raise ValueError('{} appears to be a flag and should be supplied as a keyword argument ' 'to the function.'.format(value)) value = quote_string_if_needed(value) command += ' {}'.format(value) # If this is a flag parameter that can accept a variable number of arguments and we have not # reached the max number, add 2 prefix chars (ex: --) to tell argparse to stop processing the # parameter. This also means the remaining arguments will be treated as positionals by argparse. if action.option_strings and isinstance(action, _RangeAction) and action.nargs_max is not None and \ action.nargs_max > 1: command += ' {0}{0}'.format(self._parser.prefix_chars[0]) def process_action(action): nonlocal command if isinstance(action, argparse._SubParsersAction): command += ' {}'.format(self._args[action.dest]) traverse_parser(action.choices[self._args[action.dest]]) elif isinstance(action, argparse._AppendAction): if isinstance(self._args[action.dest], list) or isinstance(self._args[action.dest], tuple): for values in self._args[action.dest]: process_argument(action, values) else: process_argument(action, self._args[action.dest]) else: process_argument(action, self._args[action.dest]) def traverse_parser(parser): # first process optional flag arguments for action in parser._actions: if action.dest in self._args and action.dest in self._flag_args and action.dest != self._remainder_arg: process_action(action) # next process positional arguments for action in parser._actions: if action.dest in self._args and action.dest not in self._flag_args and \ action.dest != self._remainder_arg: process_action(action) # Keep remainder argument last for action in parser._actions: if action.dest in self._args and action.dest == self._remainder_arg: process_action(action) traverse_parser(self._parser) return _exec_cmd(self._cmd2_app, command, self._echo) class PyscriptBridge(object): """Preserves the legacy 'cmd' interface for pyscript while also providing a new python API wrapper for application commands.""" def __init__(self, cmd2_app): self._cmd2_app = cmd2_app self._last_result = None self.cmd_echo = False def __getattr__(self, item: str): """ Provide functionality to call application commands as a method of PyscriptBridge ex: app.help() """ func = self._cmd2_app.cmd_func(item) if func: if hasattr(func, 'argparser'): # Command uses argparse, return an object that can traverse the argparse subcommands and arguments return ArgparseFunctor(self.cmd_echo, self._cmd2_app, item, getattr(func, 'argparser')) else: # Command doesn't use argparse, we will accept parameters in the form of a command string def wrap_func(args=''): command = item if args: command += ' ' + args return _exec_cmd(self._cmd2_app, command, self.cmd_echo) return wrap_func else: # item does not refer to a command raise AttributeError("'{}' object has no attribute '{}'".format(self._cmd2_app.pyscript_name, item)) def __dir__(self): """Return a custom set of attribute names""" attributes = self._cmd2_app.get_all_commands() attributes.insert(0, 'cmd_echo') return attributes def __call__(self, command: str, echo: Optional[bool]=None) -> CommandResult: """ Provide functionality to call application commands by calling PyscriptBridge ex: app('help') :param command: command line being run :param echo: if True, output will be echoed to stdout/stderr while the command runs this temporarily overrides the value of self.cmd_echo """ if echo is None: echo = self.cmd_echo return _exec_cmd(self._cmd2_app, command, echo)