diff options
author | Eric Lin <anselor@gmail.com> | 2018-05-02 15:22:14 -0400 |
---|---|---|
committer | Eric Lin <anselor@gmail.com> | 2018-05-02 15:22:14 -0400 |
commit | a55f0b6ed559d03f2d8b596898d638b288c11a68 (patch) | |
tree | 54d84c8d1d7c14247ac3a22186ed106190a8b494 /cmd2/pyscript_bridge.py | |
parent | dbf4846e8bc0e6ca38c928d8fe4752f9b6173803 (diff) | |
parent | a95c8a065abeac286c196783393ecc49e4356f54 (diff) | |
download | cmd2-git-a55f0b6ed559d03f2d8b596898d638b288c11a68.tar.gz |
Merge branch 'test_merge' into test_ply_merge
Diffstat (limited to 'cmd2/pyscript_bridge.py')
-rw-r--r-- | cmd2/pyscript_bridge.py | 273 |
1 files changed, 273 insertions, 0 deletions
diff --git a/cmd2/pyscript_bridge.py b/cmd2/pyscript_bridge.py new file mode 100644 index 00000000..055ae4ae --- /dev/null +++ b/cmd2/pyscript_bridge.py @@ -0,0 +1,273 @@ +# coding=utf-8 +""" +Bridges calls made inside of pyscript with the Cmd2 host app while maintaining a reasonable +degree of isolation between the two + +Copyright 2018 Eric Lin <anselor@gmail.com> +Released under MIT license, see LICENSE file +""" + +import argparse +from collections import namedtuple +import functools +import sys +from typing import List, Tuple + +# Python 3.4 require contextlib2 for temporarily redirecting stderr and stdout +if sys.version_info < (3, 5): + from contextlib2 import redirect_stdout, redirect_stderr +else: + from contextlib import redirect_stdout, redirect_stderr + +from .argparse_completer import _RangeAction +from .utils import namedtuple_with_defaults + + +class CommandResult(namedtuple_with_defaults('CmdResult', ['stdout', 'stderr', 'data'])): + """Encapsulates the results from a command. + + Named tuple attributes + ---------------------- + stdout: str - Output captured from stdout while this command is executing + stderr: str - Output captured from stderr while this command is executing. None if no error captured + data - Data returned by the command. + + NOTE: Named tuples are immutable. So the contents are there for access, not for modification. + """ + def __bool__(self): + """If stderr is None and data is not None the command is considered a success""" + return not self.stderr and self.data is not None + + +class CopyStream(object): + """Copies all data written to a stream""" + def __init__(self, inner_stream): + self.buffer = '' + self.inner_stream = inner_stream + + def write(self, s): + self.buffer += s + self.inner_stream.write(s) + + def read(self): + raise NotImplementedError + + def clear(self): + self.buffer = '' + + +def _exec_cmd(cmd2_app, func): + """Helper to encapsulate executing a command and capturing the results""" + copy_stdout = CopyStream(sys.stdout) + copy_stderr = CopyStream(sys.stderr) + + cmd2_app._last_result = None + + with redirect_stdout(copy_stdout): + with redirect_stderr(copy_stderr): + func() + + # if stderr is empty, set it to None + stderr = copy_stderr if copy_stderr.buffer else None + + result = CommandResult(stdout=copy_stdout.buffer, stderr=stderr, data=cmd2_app._last_result) + return result + + +class ArgparseFunctor: + """ + Encapsulates translating python object traversal + """ + def __init__(self, cmd2_app, item, parser): + self._cmd2_app = cmd2_app + self._item = item + self._parser = parser + + # Dictionary mapping command argument name to value + self._args = {} + # argparse object for the current command layer + self.__current_subcommand_parser = parser + + def __getattr__(self, item): + """Search for a subcommand matching this item and update internal state to track the traversal""" + # look for sub-command under the current command/sub-command layer + for action in self.__current_subcommand_parser._actions: + if not action.option_strings and isinstance(action, argparse._SubParsersAction): + if item in action.choices: + # item matches the a sub-command, save our position in argparse, + # save the sub-command, return self to allow next level of traversal + self.__current_subcommand_parser = action.choices[item] + self._args[action.dest] = item + return self + + raise AttributeError(item) + # return super().__getattr__(item) + + def __call__(self, *args, **kwargs): + """ + Process the arguments at this layer of the argparse command tree. If there are more sub-commands, + return self to accept the next sub-command name. If there are no more sub-commands, execute the + sub-command with the given parameters. + """ + next_pos_index = 0 + + has_subcommand = False + consumed_kw = [] + + # Iterate through the current sub-command's arguments in order + for action in self.__current_subcommand_parser._actions: + # is this a flag option? + if action.option_strings: + # this is a flag argument, search for the argument by name in the parameters + if action.dest in kwargs: + self._args[action.dest] = kwargs[action.dest] + consumed_kw.append(action.dest) + else: + # This is a positional argument, search the positional arguments passed in. + if not isinstance(action, argparse._SubParsersAction): + if action.dest in kwargs: + # if this positional argument happens to be passed in as a keyword argument + # go ahead and consume the matching keyword argument + self._args[action.dest] = kwargs[action.dest] + elif next_pos_index < len(args): + # Make sure we actually have positional arguments to consume + pos_remain = len(args) - next_pos_index + + # Check if this argument consumes a range of values + if isinstance(action, _RangeAction) and action.nargs_min is not None \ + and action.nargs_max is not None: + # this is a cmd2 ranged action. + + if pos_remain >= action.nargs_min: + # Do we meet the minimum count? + if pos_remain > action.nargs_max: + # Do we exceed the maximum count? + self._args[action.dest] = args[next_pos_index:next_pos_index + action.nargs_max] + next_pos_index += action.nargs_max + else: + self._args[action.dest] = args[next_pos_index:next_pos_index + pos_remain] + next_pos_index += pos_remain + else: + raise ValueError('Expected at least {} values for {}'.format(action.nargs_min, + action.dest)) + elif action.nargs is not None: + if action.nargs == '+': + if pos_remain > 0: + self._args[action.dest] = args[next_pos_index:next_pos_index + pos_remain] + next_pos_index += pos_remain + else: + raise ValueError('Expected at least 1 value for {}'.format(action.dest)) + elif action.nargs == '*': + self._args[action.dest] = args[next_pos_index:next_pos_index + pos_remain] + next_pos_index += pos_remain + elif action.nargs == '?': + self._args[action.dest] = args[next_pos_index] + next_pos_index += 1 + else: + self._args[action.dest] = args[next_pos_index] + next_pos_index += 1 + else: + has_subcommand = True + + # Check if there are any extra arguments we don't know how to handle + for kw in kwargs: + if kw not in self._args: # consumed_kw: + raise TypeError('{}() got an unexpected keyword argument \'{}\''.format( + self.__current_subcommand_parser.prog, kw)) + + if has_subcommand: + return self + else: + return self._run() + + def _run(self): + # look up command function + func = getattr(self._cmd2_app, 'do_' + self._item) + + # reconstruct the cmd2 command from the python call + cmd_str = [''] + + def process_flag(action, value): + if isinstance(action, argparse._CountAction): + if isinstance(value, int): + for c in range(value): + cmd_str[0] += '{} '.format(action.option_strings[0]) + return + else: + raise TypeError('Expected int for ' + action.dest) + if isinstance(action, argparse._StoreConstAction) or isinstance(action, argparse._AppendConstAction): + if value: + # Nothing else to append to the command string, just the flag is enough. + cmd_str[0] += '{} '.format(action.option_strings[0]) + return + else: + # value is not True so we default to false, which means don't include the flag + return + + # was the argument a flag? + if action.option_strings: + cmd_str[0] += '{} '.format(action.option_strings[0]) + + if isinstance(value, List) or isinstance(value, Tuple): + for item in value: + item = str(item).strip() + if ' ' in item: + item = '"{}"'.format(item) + cmd_str[0] += '{} '.format(item) + else: + value = str(value).strip() + if ' ' in value: + value = '"{}"'.format(value) + cmd_str[0] += '{} '.format(value) + + def traverse_parser(parser): + for action in parser._actions: + # was something provided for the argument + if action.dest in self._args: + if isinstance(action, argparse._SubParsersAction): + cmd_str[0] += '{} '.format(self._args[action.dest]) + traverse_parser(action.choices[self._args[action.dest]]) + elif isinstance(action, argparse._AppendAction): + if isinstance(self._args[action.dest], List) or isinstance(self._args[action.dest], Tuple): + for values in self._args[action.dest]: + process_flag(action, values) + else: + process_flag(action, self._args[action.dest]) + else: + process_flag(action, self._args[action.dest]) + + traverse_parser(self._parser) + + # print('Command: {}'.format(cmd_str[0])) + + return _exec_cmd(self._cmd2_app, functools.partial(func, cmd_str[0])) + +class PyscriptBridge(object): + """Preserves the legacy 'cmd' interface for pyscript while also providing a new python API wrapper for + application commands.""" + def __init__(self, cmd2_app): + self._cmd2_app = cmd2_app + self._last_result = None + + def __getattr__(self, item: str): + """Check if the attribute is a command. If so, return a callable.""" + commands = self._cmd2_app.get_all_commands() + if item in commands: + func = getattr(self._cmd2_app, 'do_' + item) + + try: + # See if the command uses argparse + parser = getattr(func, 'argparser') + except AttributeError: + # Command doesn't, we will accept parameters in the form of a command string + def wrap_func(args=''): + return _exec_cmd(self._cmd2_app, functools.partial(func, args)) + return wrap_func + else: + # Command does use argparse, return an object that can traverse the argparse subcommands and arguments + return ArgparseFunctor(self._cmd2_app, item, parser) + + raise AttributeError(item) + + def __call__(self, args): + return _exec_cmd(self._cmd2_app, functools.partial(self._cmd2_app.onecmd_plus_hooks, args + '\n')) |