diff options
Diffstat (limited to 'cmd2')
-rw-r--r-- | cmd2/__init__.py | 3 | ||||
-rw-r--r-- | cmd2/argcomplete_bridge.py | 246 | ||||
-rwxr-xr-x | cmd2/argparse_completer.py | 272 | ||||
-rwxr-xr-x | cmd2/cmd2.py | 36 | ||||
-rw-r--r-- | cmd2/pyscript_bridge.py | 273 | ||||
-rw-r--r-- | cmd2/utils.py | 32 |
6 files changed, 846 insertions, 16 deletions
diff --git a/cmd2/__init__.py b/cmd2/__init__.py index 2dd08977..bf6c047f 100644 --- a/cmd2/__init__.py +++ b/cmd2/__init__.py @@ -1,5 +1,2 @@ # # -*- coding: utf-8 -*- -# -from .cmd2 import __version__, Cmd, AddSubmenu, CmdResult, categorize -from .cmd2 import with_argument_list, with_argparser, with_argparser_and_unknown_args, with_category diff --git a/cmd2/argcomplete_bridge.py b/cmd2/argcomplete_bridge.py new file mode 100644 index 00000000..583f3345 --- /dev/null +++ b/cmd2/argcomplete_bridge.py @@ -0,0 +1,246 @@ +# coding=utf-8 +"""Hijack the ArgComplete's bash completion handler to return AutoCompleter results""" + +try: + # check if argcomplete is installed + import argcomplete +except ImportError: + # not installed, skip the rest of the file + pass + +else: + # argcomplete is installed + + from contextlib import redirect_stdout + import copy + from io import StringIO + import os + import shlex + import sys + + from . import constants + from . import utils + + + def tokens_for_completion(line, endidx): + """ + Used by tab completion functions to get all tokens through the one being completed + :param line: str - the current input line with leading whitespace removed + :param endidx: int - the ending index of the prefix text + :return: A 4 item tuple where the items are + On Success + tokens: list of unquoted tokens + this is generally the list needed for tab completion functions + raw_tokens: list of tokens with any quotes preserved + this can be used to know if a token was quoted or is missing a closing quote + begidx: beginning of last token + endidx: cursor position + + Both lists are guaranteed to have at least 1 item + The last item in both lists is the token being tab completed + + On Failure + Both items are None + """ + unclosed_quote = '' + quotes_to_try = copy.copy(constants.QUOTES) + + tmp_line = line[:endidx] + tmp_endidx = endidx + + # Parse the line into tokens + while True: + try: + # Use non-POSIX parsing to keep the quotes around the tokens + initial_tokens = shlex.split(tmp_line[:tmp_endidx], posix=False) + + # calculate begidx + if unclosed_quote: + begidx = tmp_line[:tmp_endidx].rfind(initial_tokens[-1]) + 1 + else: + if tmp_endidx > 0 and tmp_line[tmp_endidx - 1] == ' ': + begidx = endidx + else: + begidx = tmp_line[:tmp_endidx].rfind(initial_tokens[-1]) + + # If the cursor is at an empty token outside of a quoted string, + # then that is the token being completed. Add it to the list. + if not unclosed_quote and begidx == tmp_endidx: + initial_tokens.append('') + break + except ValueError: + # ValueError can be caused by missing closing quote + if not quotes_to_try: + # Since we have no more quotes to try, something else + # is causing the parsing error. Return None since + # this means the line is malformed. + return None, None, None, None + + # Add a closing quote and try to parse again + unclosed_quote = quotes_to_try[0] + quotes_to_try = quotes_to_try[1:] + + tmp_line = line[:endidx] + tmp_line += unclosed_quote + tmp_endidx = endidx + 1 + + raw_tokens = initial_tokens + + # Save the unquoted tokens + tokens = [utils.strip_quotes(cur_token) for cur_token in raw_tokens] + + # If the token being completed had an unclosed quote, we need + # to remove the closing quote that was added in order for it + # to match what was on the command line. + if unclosed_quote: + raw_tokens[-1] = raw_tokens[-1][:-1] + + return tokens, raw_tokens, begidx, endidx + + class CompletionFinder(argcomplete.CompletionFinder): + """Hijack the functor from argcomplete to call AutoCompleter""" + + def __call__(self, argument_parser, completer=None, always_complete_options=True, exit_method=os._exit, output_stream=None, + exclude=None, validator=None, print_suppressed=False, append_space=None, + default_completer=argcomplete.FilesCompleter()): + """ + :param argument_parser: The argument parser to autocomplete on + :type argument_parser: :class:`argparse.ArgumentParser` + :param always_complete_options: + Controls the autocompletion of option strings if an option string opening character (normally ``-``) has not + been entered. If ``True`` (default), both short (``-x``) and long (``--x``) option strings will be + suggested. If ``False``, no option strings will be suggested. If ``long``, long options and short options + with no long variant will be suggested. If ``short``, short options and long options with no short variant + will be suggested. + :type always_complete_options: boolean or string + :param exit_method: + Method used to stop the program after printing completions. Defaults to :meth:`os._exit`. If you want to + perform a normal exit that calls exit handlers, use :meth:`sys.exit`. + :type exit_method: callable + :param exclude: List of strings representing options to be omitted from autocompletion + :type exclude: iterable + :param validator: + Function to filter all completions through before returning (called with two string arguments, completion + and prefix; return value is evaluated as a boolean) + :type validator: callable + :param print_suppressed: + Whether or not to autocomplete options that have the ``help=argparse.SUPPRESS`` keyword argument set. + :type print_suppressed: boolean + :param append_space: + Whether to append a space to unique matches. The default is ``True``. + :type append_space: boolean + + .. note:: + If you are not subclassing CompletionFinder to override its behaviors, + use ``argcomplete.autocomplete()`` directly. It has the same signature as this method. + + Produces tab completions for ``argument_parser``. See module docs for more info. + + Argcomplete only executes actions if their class is known not to have side effects. Custom action classes can be + added to argcomplete.safe_actions, if their values are wanted in the ``parsed_args`` completer argument, or + their execution is otherwise desirable. + """ + self.__init__(argument_parser, always_complete_options=always_complete_options, exclude=exclude, + validator=validator, print_suppressed=print_suppressed, append_space=append_space, + default_completer=default_completer) + + if "_ARGCOMPLETE" not in os.environ: + # not an argument completion invocation + return + + try: + argcomplete.debug_stream = os.fdopen(9, "w") + except IOError: + argcomplete.debug_stream = sys.stderr + + if output_stream is None: + try: + output_stream = os.fdopen(8, "wb") + except IOError: + argcomplete.debug("Unable to open fd 8 for writing, quitting") + exit_method(1) + + # print("", stream=debug_stream) + # for v in "COMP_CWORD COMP_LINE COMP_POINT COMP_TYPE COMP_KEY _ARGCOMPLETE_COMP_WORDBREAKS COMP_WORDS".split(): + # print(v, os.environ[v], stream=debug_stream) + + ifs = os.environ.get("_ARGCOMPLETE_IFS", "\013") + if len(ifs) != 1: + argcomplete.debug("Invalid value for IFS, quitting [{v}]".format(v=ifs)) + exit_method(1) + + comp_line = os.environ["COMP_LINE"] + comp_point = int(os.environ["COMP_POINT"]) + + comp_line = argcomplete.ensure_str(comp_line) + + ############################## + # SWAPPED FOR AUTOCOMPLETER + # + # Replaced with our own tokenizer function + ############################## + + # cword_prequote, cword_prefix, cword_suffix, comp_words, last_wordbreak_pos = split_line(comp_line, comp_point) + tokens, _, begidx, endidx = tokens_for_completion(comp_line, comp_point) + + # _ARGCOMPLETE is set by the shell script to tell us where comp_words + # should start, based on what we're completing. + # 1: <script> [args] + # 2: python <script> [args] + # 3: python -m <module> [args] + start = int(os.environ["_ARGCOMPLETE"]) - 1 + ############################## + # SWAPPED FOR AUTOCOMPLETER + # + # Applying the same token dropping to our tokens + ############################## + # comp_words = comp_words[start:] + tokens = tokens[start:] + + # debug("\nLINE: {!r}".format(comp_line), + # "\nPOINT: {!r}".format(comp_point), + # "\nPREQUOTE: {!r}".format(cword_prequote), + # "\nPREFIX: {!r}".format(cword_prefix), + # "\nSUFFIX: {!r}".format(cword_suffix), + # "\nWORDS:", comp_words) + + ############################## + # SWAPPED FOR AUTOCOMPLETER + # + # Replaced with our own completion function and customizing the returned values + ############################## + # completions = self._get_completions(comp_words, cword_prefix, cword_prequote, last_wordbreak_pos) + + # capture stdout from the autocompleter + result = StringIO() + with redirect_stdout(result): + completions = completer.complete_command(tokens, tokens[-1], comp_line, begidx, endidx) + outstr = result.getvalue() + + if completions: + # If any completion has a space in it, then quote all completions + # this improves the user experience so they don't nede to go back and add a quote + if ' ' in ''.join(completions): + completions = ['"{}"'.format(entry) for entry in completions] + + argcomplete.debug("\nReturning completions:", completions) + + output_stream.write(ifs.join(completions).encode(argcomplete.sys_encoding)) + elif outstr: + # if there are no completions, but we got something from stdout, try to print help + + # trick the bash completion into thinking there are 2 completions that are unlikely + # to ever match. + outstr = outstr.replace('\n', ' ').replace('\t', ' ').replace(' ', ' ').strip() + # generate a filler entry that should always sort first + filler = ' {0:><{width}}'.format('', width=len(outstr)/2) + outstr = ifs.join([filler, outstr]) + + output_stream.write(outstr.encode(argcomplete.sys_encoding)) + else: + # if completions is None we assume we don't know how to handle it so let bash + # go forward with normal filesystem completion + output_stream.write(ifs.join([]).encode(argcomplete.sys_encoding)) + output_stream.flush() + argcomplete.debug_stream.flush() + exit_method(0) diff --git a/cmd2/argparse_completer.py b/cmd2/argparse_completer.py index 03f2d965..4964b1ec 100755 --- a/cmd2/argparse_completer.py +++ b/cmd2/argparse_completer.py @@ -64,7 +64,8 @@ from typing import List, Dict, Tuple, Callable, Union # imports copied from argparse to support our customized argparse functions -from argparse import ZERO_OR_MORE, ONE_OR_MORE, ArgumentError, _ +from argparse import ZERO_OR_MORE, ONE_OR_MORE, ArgumentError, _, _get_action_name, SUPPRESS + import re as _re @@ -75,6 +76,7 @@ from .rl_utils import rl_force_redisplay # define the completion choices for the argument. You may provide a Collection or a Function. ACTION_ARG_CHOICES = 'arg_choices' + class _RangeAction(object): def __init__(self, nargs: Union[int, str, Tuple[int, int], None]): self.nargs_min = None @@ -103,6 +105,7 @@ class _RangeAction(object): self.nargs_adjusted = nargs +# noinspection PyShadowingBuiltins,PyShadowingBuiltins class _StoreRangeAction(argparse._StoreAction, _RangeAction): def __init__(self, option_strings, @@ -131,6 +134,7 @@ class _StoreRangeAction(argparse._StoreAction, _RangeAction): metavar=metavar) +# noinspection PyShadowingBuiltins,PyShadowingBuiltins class _AppendRangeAction(argparse._AppendAction, _RangeAction): def __init__(self, option_strings, @@ -433,7 +437,6 @@ class AutoCompleter(object): return self.basic_complete(text, line, begidx, endidx, completers.keys()) return [] - @staticmethod def _process_action_nargs(action: argparse.Action, arg_state: _ArgumentState) -> None: if isinstance(action, _RangeAction): @@ -536,7 +539,10 @@ class AutoCompleter(object): prefix = ' {0: <{width}} '.format(prefix, width=20) pref_len = len(prefix) - help_lines = action.help.splitlines() + if action.help is not None: + help_lines = action.help.splitlines() + else: + help_lines = [''] if len(help_lines) == 1: print('\nHint:\n{}{}\n'.format(prefix, help_lines[0])) else: @@ -571,6 +577,7 @@ class AutoCompleter(object): ############################################################################### +# noinspection PyCompatibility,PyShadowingBuiltins,PyShadowingBuiltins class ACHelpFormatter(argparse.HelpFormatter): """Custom help formatter to configure ordering of help text""" @@ -631,6 +638,7 @@ class ACHelpFormatter(argparse.HelpFormatter): # End cmd2 customization # helper for wrapping lines + # noinspection PyMissingOrEmptyDocstring,PyShadowingNames def get_lines(parts, indent, prefix=None): lines = [] line = [] @@ -722,6 +730,7 @@ class ACHelpFormatter(argparse.HelpFormatter): else: result = default_metavar + # noinspection PyMissingOrEmptyDocstring def format(tuple_size): if isinstance(result, tuple): return result @@ -748,6 +757,7 @@ class ACHelpFormatter(argparse.HelpFormatter): return text.splitlines() +# noinspection PyCompatibility class ACArgumentParser(argparse.ArgumentParser): """Custom argparse class to override error method to change default help text.""" @@ -866,3 +876,259 @@ class ACArgumentParser(argparse.ArgumentParser): 'Expected between {} and {} arguments'.format(action.nargs_min, action.nargs_max)) return super(ACArgumentParser, self)._match_argument(action, arg_strings_pattern) + + def _parse_known_args(self, arg_strings, namespace): + # replace arg strings that are file references + if self.fromfile_prefix_chars is not None: + arg_strings = self._read_args_from_files(arg_strings) + + # map all mutually exclusive arguments to the other arguments + # they can't occur with + action_conflicts = {} + for mutex_group in self._mutually_exclusive_groups: + group_actions = mutex_group._group_actions + for i, mutex_action in enumerate(mutex_group._group_actions): + conflicts = action_conflicts.setdefault(mutex_action, []) + conflicts.extend(group_actions[:i]) + conflicts.extend(group_actions[i + 1:]) + + # find all option indices, and determine the arg_string_pattern + # which has an 'O' if there is an option at an index, + # an 'A' if there is an argument, or a '-' if there is a '--' + option_string_indices = {} + arg_string_pattern_parts = [] + arg_strings_iter = iter(arg_strings) + for i, arg_string in enumerate(arg_strings_iter): + + # all args after -- are non-options + if arg_string == '--': + arg_string_pattern_parts.append('-') + for arg_string in arg_strings_iter: + arg_string_pattern_parts.append('A') + + # otherwise, add the arg to the arg strings + # and note the index if it was an option + else: + option_tuple = self._parse_optional(arg_string) + if option_tuple is None: + pattern = 'A' + else: + option_string_indices[i] = option_tuple + pattern = 'O' + arg_string_pattern_parts.append(pattern) + + # join the pieces together to form the pattern + arg_strings_pattern = ''.join(arg_string_pattern_parts) + + # converts arg strings to the appropriate and then takes the action + seen_actions = set() + seen_non_default_actions = set() + + def take_action(action, argument_strings, option_string=None): + seen_actions.add(action) + argument_values = self._get_values(action, argument_strings) + + # error if this argument is not allowed with other previously + # seen arguments, assuming that actions that use the default + # value don't really count as "present" + if argument_values is not action.default: + seen_non_default_actions.add(action) + for conflict_action in action_conflicts.get(action, []): + if conflict_action in seen_non_default_actions: + msg = _('not allowed with argument %s') + action_name = _get_action_name(conflict_action) + raise ArgumentError(action, msg % action_name) + + # take the action if we didn't receive a SUPPRESS value + # (e.g. from a default) + if argument_values is not SUPPRESS: + action(self, namespace, argument_values, option_string) + + # function to convert arg_strings into an optional action + def consume_optional(start_index): + + # get the optional identified at this index + option_tuple = option_string_indices[start_index] + action, option_string, explicit_arg = option_tuple + + # identify additional optionals in the same arg string + # (e.g. -xyz is the same as -x -y -z if no args are required) + match_argument = self._match_argument + action_tuples = [] + while True: + + # if we found no optional action, skip it + if action is None: + extras.append(arg_strings[start_index]) + return start_index + 1 + + # if there is an explicit argument, try to match the + # optional's string arguments to only this + if explicit_arg is not None: + arg_count = match_argument(action, 'A') + + # if the action is a single-dash option and takes no + # arguments, try to parse more single-dash options out + # of the tail of the option string + chars = self.prefix_chars + if arg_count == 0 and option_string[1] not in chars: + action_tuples.append((action, [], option_string)) + char = option_string[0] + option_string = char + explicit_arg[0] + new_explicit_arg = explicit_arg[1:] or None + optionals_map = self._option_string_actions + if option_string in optionals_map: + action = optionals_map[option_string] + explicit_arg = new_explicit_arg + else: + msg = _('ignored explicit argument %r') + raise ArgumentError(action, msg % explicit_arg) + + # if the action expect exactly one argument, we've + # successfully matched the option; exit the loop + elif arg_count == 1: + stop = start_index + 1 + args = [explicit_arg] + action_tuples.append((action, args, option_string)) + break + + # error if a double-dash option did not use the + # explicit argument + else: + msg = _('ignored explicit argument %r') + raise ArgumentError(action, msg % explicit_arg) + + # if there is no explicit argument, try to match the + # optional's string arguments with the following strings + # if successful, exit the loop + else: + start = start_index + 1 + selected_patterns = arg_strings_pattern[start:] + arg_count = match_argument(action, selected_patterns) + stop = start + arg_count + args = arg_strings[start:stop] + action_tuples.append((action, args, option_string)) + break + + # add the Optional to the list and return the index at which + # the Optional's string args stopped + assert action_tuples + for action, args, option_string in action_tuples: + take_action(action, args, option_string) + return stop + + # the list of Positionals left to be parsed; this is modified + # by consume_positionals() + positionals = self._get_positional_actions() + + # function to convert arg_strings into positional actions + def consume_positionals(start_index): + # match as many Positionals as possible + match_partial = self._match_arguments_partial + selected_pattern = arg_strings_pattern[start_index:] + arg_counts = match_partial(positionals, selected_pattern) + + #################################################################### + # Applied mixed.patch from https://bugs.python.org/issue15112 + if 'O' in arg_strings_pattern[start_index:]: + # if there is an optional after this, remove + # 'empty' positionals from the current match + + while len(arg_counts) > 1 and arg_counts[-1] == 0: + arg_counts = arg_counts[:-1] + #################################################################### + + # slice off the appropriate arg strings for each Positional + # and add the Positional and its args to the list + for action, arg_count in zip(positionals, arg_counts): + args = arg_strings[start_index: start_index + arg_count] + start_index += arg_count + take_action(action, args) + + # slice off the Positionals that we just parsed and return the + # index at which the Positionals' string args stopped + positionals[:] = positionals[len(arg_counts):] + return start_index + + # consume Positionals and Optionals alternately, until we have + # passed the last option string + extras = [] + start_index = 0 + if option_string_indices: + max_option_string_index = max(option_string_indices) + else: + max_option_string_index = -1 + while start_index <= max_option_string_index: + + # consume any Positionals preceding the next option + next_option_string_index = min([ + index + for index in option_string_indices + if index >= start_index]) + if start_index != next_option_string_index: + positionals_end_index = consume_positionals(start_index) + + # only try to parse the next optional if we didn't consume + # the option string during the positionals parsing + if positionals_end_index > start_index: + start_index = positionals_end_index + continue + else: + start_index = positionals_end_index + + # if we consumed all the positionals we could and we're not + # at the index of an option string, there were extra arguments + if start_index not in option_string_indices: + strings = arg_strings[start_index:next_option_string_index] + extras.extend(strings) + start_index = next_option_string_index + + # consume the next optional and any arguments for it + start_index = consume_optional(start_index) + + # consume any positionals following the last Optional + stop_index = consume_positionals(start_index) + + # if we didn't consume all the argument strings, there were extras + extras.extend(arg_strings[stop_index:]) + + # make sure all required actions were present and also convert + # action defaults which were not given as arguments + required_actions = [] + for action in self._actions: + if action not in seen_actions: + if action.required: + required_actions.append(_get_action_name(action)) + else: + # Convert action default now instead of doing it before + # parsing arguments to avoid calling convert functions + # twice (which may fail) if the argument was given, but + # only if it was defined already in the namespace + if (action.default is not None and + isinstance(action.default, str) and + hasattr(namespace, action.dest) and + action.default is getattr(namespace, action.dest)): + setattr(namespace, action.dest, + self._get_value(action, action.default)) + + if required_actions: + self.error(_('the following arguments are required: %s') % + ', '.join(required_actions)) + + # make sure all required groups had one option present + for group in self._mutually_exclusive_groups: + if group.required: + for action in group._group_actions: + if action in seen_non_default_actions: + break + + # if no actions were used, report the error + else: + names = [_get_action_name(action) + for action in group._group_actions + if action.help is not SUPPRESS] + msg = _('one of the arguments %s is required') + self.error(msg % ' '.join(names)) + + # return the updated namespace and the extra arguments + return namespace, extras diff --git a/cmd2/cmd2.py b/cmd2/cmd2.py index e99e4659..f4f30bd4 100755 --- a/cmd2/cmd2.py +++ b/cmd2/cmd2.py @@ -197,8 +197,12 @@ def with_argparser_and_unknown_args(argparser: argparse.ArgumentParser) -> Calla @functools.wraps(func) def cmd_wrapper(instance, cmdline): lexed_arglist = parse_quoted_string(cmdline) - args, unknown = argparser.parse_known_args(lexed_arglist) - return func(instance, args, unknown) + try: + args, unknown = argparser.parse_known_args(lexed_arglist) + except SystemExit: + return + else: + return func(instance, args, unknown) # argparser defaults the program name to sys.argv[0] # we want it to be the name of our command @@ -234,8 +238,12 @@ def with_argparser(argparser: argparse.ArgumentParser) -> Callable: @functools.wraps(func) def cmd_wrapper(instance, cmdline): lexed_arglist = parse_quoted_string(cmdline) - args = argparser.parse_args(lexed_arglist) - return func(instance, args) + try: + args = argparser.parse_args(lexed_arglist) + except SystemExit: + return + else: + return func(instance, args) # argparser defaults the program name to sys.argv[0] # we want it to be the name of our command @@ -608,7 +616,7 @@ class Cmd(cmd.Cmd): if _which(editor): break feedback_to_output = False # Do not include nonessentials in >, | output by default (things like timing) - locals_in_py = True + locals_in_py = False quiet = False # Do not suppress nonessential output timing = False # Prints elapsed time for each command @@ -670,6 +678,7 @@ class Cmd(cmd.Cmd): self.initial_stdout = sys.stdout self.history = History() self.pystate = {} + self.pyscript_name = 'app' self.keywords = self.reserved_words + [fname[3:] for fname in dir(self) if fname.startswith('do_')] self.statement_parser = StatementParser( allow_redirection=self.allow_redirection, @@ -1993,6 +2002,8 @@ class Cmd(cmd.Cmd): if self.allow_redirection: self._redirect_output(statement) timestart = datetime.datetime.now() + if self._in_py: + self._last_result = None statement = self.precmd(statement) stop = self.onecmd(statement) stop = self.postcmd(stop, statement) @@ -2797,16 +2808,16 @@ Usage: Usage: unalias [-a] name [name ...] py <command>: Executes a Python command. py: Enters interactive Python mode. End with ``Ctrl-D`` (Unix) / ``Ctrl-Z`` (Windows), ``quit()``, '`exit()``. - Non-python commands can be issued with ``cmd("your command")``. + Non-python commands can be issued with ``pyscript_name("your command")``. Run python code from external script files with ``run("script.py")`` """ + from .pyscript_bridge import PyscriptBridge if self._in_py: self.perror("Recursively entering interactive Python consoles is not allowed.", traceback_war=False) return self._in_py = True try: - self.pystate['self'] = self arg = arg.strip() # Support the run command even if called prior to invoking an interactive interpreter @@ -2829,10 +2840,14 @@ Usage: Usage: unalias [-a] name [name ...] """ return self.onecmd_plus_hooks(cmd_plus_args + '\n') + bridge = PyscriptBridge(self) self.pystate['run'] = run - self.pystate['cmd'] = onecmd_plus_hooks + self.pystate[self.pyscript_name] = bridge + + if self.locals_in_py: + self.pystate['self'] = self - localvars = (self.locals_in_py and self.pystate) or {} + localvars = self.pystate interp = InteractiveConsole(locals=localvars) interp.runcode('import sys, os;sys.path.insert(0, os.getcwd())') @@ -2853,9 +2868,10 @@ Usage: Usage: unalias [-a] name [name ...] keepstate = Statekeeper(sys, ('stdin', 'stdout')) sys.stdout = self.stdout sys.stdin = self.stdin + docstr = self.do_py.__doc__.replace('pyscript_name', self.pyscript_name) interp.interact(banner="Python %s on %s\n%s\n(%s)\n%s" % (sys.version, sys.platform, cprt, self.__class__.__name__, - self.do_py.__doc__)) + docstr)) except EmbeddedConsoleExit: pass if keepstate is not None: diff --git a/cmd2/pyscript_bridge.py b/cmd2/pyscript_bridge.py new file mode 100644 index 00000000..055ae4ae --- /dev/null +++ b/cmd2/pyscript_bridge.py @@ -0,0 +1,273 @@ +# coding=utf-8 +""" +Bridges calls made inside of pyscript with the Cmd2 host app while maintaining a reasonable +degree of isolation between the two + +Copyright 2018 Eric Lin <anselor@gmail.com> +Released under MIT license, see LICENSE file +""" + +import argparse +from collections import namedtuple +import functools +import sys +from typing import List, Tuple + +# Python 3.4 require contextlib2 for temporarily redirecting stderr and stdout +if sys.version_info < (3, 5): + from contextlib2 import redirect_stdout, redirect_stderr +else: + from contextlib import redirect_stdout, redirect_stderr + +from .argparse_completer import _RangeAction +from .utils import namedtuple_with_defaults + + +class CommandResult(namedtuple_with_defaults('CmdResult', ['stdout', 'stderr', 'data'])): + """Encapsulates the results from a command. + + Named tuple attributes + ---------------------- + stdout: str - Output captured from stdout while this command is executing + stderr: str - Output captured from stderr while this command is executing. None if no error captured + data - Data returned by the command. + + NOTE: Named tuples are immutable. So the contents are there for access, not for modification. + """ + def __bool__(self): + """If stderr is None and data is not None the command is considered a success""" + return not self.stderr and self.data is not None + + +class CopyStream(object): + """Copies all data written to a stream""" + def __init__(self, inner_stream): + self.buffer = '' + self.inner_stream = inner_stream + + def write(self, s): + self.buffer += s + self.inner_stream.write(s) + + def read(self): + raise NotImplementedError + + def clear(self): + self.buffer = '' + + +def _exec_cmd(cmd2_app, func): + """Helper to encapsulate executing a command and capturing the results""" + copy_stdout = CopyStream(sys.stdout) + copy_stderr = CopyStream(sys.stderr) + + cmd2_app._last_result = None + + with redirect_stdout(copy_stdout): + with redirect_stderr(copy_stderr): + func() + + # if stderr is empty, set it to None + stderr = copy_stderr if copy_stderr.buffer else None + + result = CommandResult(stdout=copy_stdout.buffer, stderr=stderr, data=cmd2_app._last_result) + return result + + +class ArgparseFunctor: + """ + Encapsulates translating python object traversal + """ + def __init__(self, cmd2_app, item, parser): + self._cmd2_app = cmd2_app + self._item = item + self._parser = parser + + # Dictionary mapping command argument name to value + self._args = {} + # argparse object for the current command layer + self.__current_subcommand_parser = parser + + def __getattr__(self, item): + """Search for a subcommand matching this item and update internal state to track the traversal""" + # look for sub-command under the current command/sub-command layer + for action in self.__current_subcommand_parser._actions: + if not action.option_strings and isinstance(action, argparse._SubParsersAction): + if item in action.choices: + # item matches the a sub-command, save our position in argparse, + # save the sub-command, return self to allow next level of traversal + self.__current_subcommand_parser = action.choices[item] + self._args[action.dest] = item + return self + + raise AttributeError(item) + # return super().__getattr__(item) + + def __call__(self, *args, **kwargs): + """ + Process the arguments at this layer of the argparse command tree. If there are more sub-commands, + return self to accept the next sub-command name. If there are no more sub-commands, execute the + sub-command with the given parameters. + """ + next_pos_index = 0 + + has_subcommand = False + consumed_kw = [] + + # Iterate through the current sub-command's arguments in order + for action in self.__current_subcommand_parser._actions: + # is this a flag option? + if action.option_strings: + # this is a flag argument, search for the argument by name in the parameters + if action.dest in kwargs: + self._args[action.dest] = kwargs[action.dest] + consumed_kw.append(action.dest) + else: + # This is a positional argument, search the positional arguments passed in. + if not isinstance(action, argparse._SubParsersAction): + if action.dest in kwargs: + # if this positional argument happens to be passed in as a keyword argument + # go ahead and consume the matching keyword argument + self._args[action.dest] = kwargs[action.dest] + elif next_pos_index < len(args): + # Make sure we actually have positional arguments to consume + pos_remain = len(args) - next_pos_index + + # Check if this argument consumes a range of values + if isinstance(action, _RangeAction) and action.nargs_min is not None \ + and action.nargs_max is not None: + # this is a cmd2 ranged action. + + if pos_remain >= action.nargs_min: + # Do we meet the minimum count? + if pos_remain > action.nargs_max: + # Do we exceed the maximum count? + self._args[action.dest] = args[next_pos_index:next_pos_index + action.nargs_max] + next_pos_index += action.nargs_max + else: + self._args[action.dest] = args[next_pos_index:next_pos_index + pos_remain] + next_pos_index += pos_remain + else: + raise ValueError('Expected at least {} values for {}'.format(action.nargs_min, + action.dest)) + elif action.nargs is not None: + if action.nargs == '+': + if pos_remain > 0: + self._args[action.dest] = args[next_pos_index:next_pos_index + pos_remain] + next_pos_index += pos_remain + else: + raise ValueError('Expected at least 1 value for {}'.format(action.dest)) + elif action.nargs == '*': + self._args[action.dest] = args[next_pos_index:next_pos_index + pos_remain] + next_pos_index += pos_remain + elif action.nargs == '?': + self._args[action.dest] = args[next_pos_index] + next_pos_index += 1 + else: + self._args[action.dest] = args[next_pos_index] + next_pos_index += 1 + else: + has_subcommand = True + + # Check if there are any extra arguments we don't know how to handle + for kw in kwargs: + if kw not in self._args: # consumed_kw: + raise TypeError('{}() got an unexpected keyword argument \'{}\''.format( + self.__current_subcommand_parser.prog, kw)) + + if has_subcommand: + return self + else: + return self._run() + + def _run(self): + # look up command function + func = getattr(self._cmd2_app, 'do_' + self._item) + + # reconstruct the cmd2 command from the python call + cmd_str = [''] + + def process_flag(action, value): + if isinstance(action, argparse._CountAction): + if isinstance(value, int): + for c in range(value): + cmd_str[0] += '{} '.format(action.option_strings[0]) + return + else: + raise TypeError('Expected int for ' + action.dest) + if isinstance(action, argparse._StoreConstAction) or isinstance(action, argparse._AppendConstAction): + if value: + # Nothing else to append to the command string, just the flag is enough. + cmd_str[0] += '{} '.format(action.option_strings[0]) + return + else: + # value is not True so we default to false, which means don't include the flag + return + + # was the argument a flag? + if action.option_strings: + cmd_str[0] += '{} '.format(action.option_strings[0]) + + if isinstance(value, List) or isinstance(value, Tuple): + for item in value: + item = str(item).strip() + if ' ' in item: + item = '"{}"'.format(item) + cmd_str[0] += '{} '.format(item) + else: + value = str(value).strip() + if ' ' in value: + value = '"{}"'.format(value) + cmd_str[0] += '{} '.format(value) + + def traverse_parser(parser): + for action in parser._actions: + # was something provided for the argument + if action.dest in self._args: + if isinstance(action, argparse._SubParsersAction): + cmd_str[0] += '{} '.format(self._args[action.dest]) + traverse_parser(action.choices[self._args[action.dest]]) + elif isinstance(action, argparse._AppendAction): + if isinstance(self._args[action.dest], List) or isinstance(self._args[action.dest], Tuple): + for values in self._args[action.dest]: + process_flag(action, values) + else: + process_flag(action, self._args[action.dest]) + else: + process_flag(action, self._args[action.dest]) + + traverse_parser(self._parser) + + # print('Command: {}'.format(cmd_str[0])) + + return _exec_cmd(self._cmd2_app, functools.partial(func, cmd_str[0])) + +class PyscriptBridge(object): + """Preserves the legacy 'cmd' interface for pyscript while also providing a new python API wrapper for + application commands.""" + def __init__(self, cmd2_app): + self._cmd2_app = cmd2_app + self._last_result = None + + def __getattr__(self, item: str): + """Check if the attribute is a command. If so, return a callable.""" + commands = self._cmd2_app.get_all_commands() + if item in commands: + func = getattr(self._cmd2_app, 'do_' + item) + + try: + # See if the command uses argparse + parser = getattr(func, 'argparser') + except AttributeError: + # Command doesn't, we will accept parameters in the form of a command string + def wrap_func(args=''): + return _exec_cmd(self._cmd2_app, functools.partial(func, args)) + return wrap_func + else: + # Command does use argparse, return an object that can traverse the argparse subcommands and arguments + return ArgparseFunctor(self._cmd2_app, item, parser) + + raise AttributeError(item) + + def __call__(self, args): + return _exec_cmd(self._cmd2_app, functools.partial(self._cmd2_app.onecmd_plus_hooks, args + '\n')) diff --git a/cmd2/utils.py b/cmd2/utils.py index 6abab94c..dbe39213 100644 --- a/cmd2/utils.py +++ b/cmd2/utils.py @@ -2,6 +2,7 @@ # coding=utf-8 """Shared utility functions""" +import collections from . import constants def strip_ansi(text: str) -> str: @@ -12,6 +13,7 @@ def strip_ansi(text: str) -> str: """ return constants.ANSI_ESCAPE_RE.sub('', text) + def strip_quotes(arg: str) -> str: """ Strip outer quotes from a string. @@ -23,3 +25,33 @@ def strip_quotes(arg: str) -> str: if len(arg) > 1 and arg[0] == arg[-1] and arg[0] in constants.QUOTES: arg = arg[1:-1] return arg + + +def namedtuple_with_defaults(typename, field_names, default_values=()): + """ + Convenience function for defining a namedtuple with default values + + From: https://stackoverflow.com/questions/11351032/namedtuple-and-default-values-for-optional-keyword-arguments + + Examples: + >>> Node = namedtuple_with_defaults('Node', 'val left right') + >>> Node() + Node(val=None, left=None, right=None) + >>> Node = namedtuple_with_defaults('Node', 'val left right', [1, 2, 3]) + >>> Node() + Node(val=1, left=2, right=3) + >>> Node = namedtuple_with_defaults('Node', 'val left right', {'right':7}) + >>> Node() + Node(val=None, left=None, right=7) + >>> Node(4) + Node(val=4, left=None, right=7) + """ + T = collections.namedtuple(typename, field_names) + T.__new__.__defaults__ = (None,) * len(T._fields) + if isinstance(default_values, collections.Mapping): + prototype = T(**default_values) + else: + prototype = T(*default_values) + T.__new__.__defaults__ = tuple(prototype) + return T + |