summaryrefslogtreecommitdiff
path: root/cmd2
diff options
context:
space:
mode:
Diffstat (limited to 'cmd2')
-rw-r--r--cmd2/__init__.py3
-rw-r--r--cmd2/argcomplete_bridge.py246
-rwxr-xr-xcmd2/argparse_completer.py272
-rwxr-xr-xcmd2/cmd2.py36
-rw-r--r--cmd2/pyscript_bridge.py273
-rw-r--r--cmd2/utils.py32
6 files changed, 846 insertions, 16 deletions
diff --git a/cmd2/__init__.py b/cmd2/__init__.py
index 2dd08977..bf6c047f 100644
--- a/cmd2/__init__.py
+++ b/cmd2/__init__.py
@@ -1,5 +1,2 @@
#
# -*- coding: utf-8 -*-
-#
-from .cmd2 import __version__, Cmd, AddSubmenu, CmdResult, categorize
-from .cmd2 import with_argument_list, with_argparser, with_argparser_and_unknown_args, with_category
diff --git a/cmd2/argcomplete_bridge.py b/cmd2/argcomplete_bridge.py
new file mode 100644
index 00000000..583f3345
--- /dev/null
+++ b/cmd2/argcomplete_bridge.py
@@ -0,0 +1,246 @@
+# coding=utf-8
+"""Hijack the ArgComplete's bash completion handler to return AutoCompleter results"""
+
+try:
+ # check if argcomplete is installed
+ import argcomplete
+except ImportError:
+ # not installed, skip the rest of the file
+ pass
+
+else:
+ # argcomplete is installed
+
+ from contextlib import redirect_stdout
+ import copy
+ from io import StringIO
+ import os
+ import shlex
+ import sys
+
+ from . import constants
+ from . import utils
+
+
+ def tokens_for_completion(line, endidx):
+ """
+ Used by tab completion functions to get all tokens through the one being completed
+ :param line: str - the current input line with leading whitespace removed
+ :param endidx: int - the ending index of the prefix text
+ :return: A 4 item tuple where the items are
+ On Success
+ tokens: list of unquoted tokens
+ this is generally the list needed for tab completion functions
+ raw_tokens: list of tokens with any quotes preserved
+ this can be used to know if a token was quoted or is missing a closing quote
+ begidx: beginning of last token
+ endidx: cursor position
+
+ Both lists are guaranteed to have at least 1 item
+ The last item in both lists is the token being tab completed
+
+ On Failure
+ Both items are None
+ """
+ unclosed_quote = ''
+ quotes_to_try = copy.copy(constants.QUOTES)
+
+ tmp_line = line[:endidx]
+ tmp_endidx = endidx
+
+ # Parse the line into tokens
+ while True:
+ try:
+ # Use non-POSIX parsing to keep the quotes around the tokens
+ initial_tokens = shlex.split(tmp_line[:tmp_endidx], posix=False)
+
+ # calculate begidx
+ if unclosed_quote:
+ begidx = tmp_line[:tmp_endidx].rfind(initial_tokens[-1]) + 1
+ else:
+ if tmp_endidx > 0 and tmp_line[tmp_endidx - 1] == ' ':
+ begidx = endidx
+ else:
+ begidx = tmp_line[:tmp_endidx].rfind(initial_tokens[-1])
+
+ # If the cursor is at an empty token outside of a quoted string,
+ # then that is the token being completed. Add it to the list.
+ if not unclosed_quote and begidx == tmp_endidx:
+ initial_tokens.append('')
+ break
+ except ValueError:
+ # ValueError can be caused by missing closing quote
+ if not quotes_to_try:
+ # Since we have no more quotes to try, something else
+ # is causing the parsing error. Return None since
+ # this means the line is malformed.
+ return None, None, None, None
+
+ # Add a closing quote and try to parse again
+ unclosed_quote = quotes_to_try[0]
+ quotes_to_try = quotes_to_try[1:]
+
+ tmp_line = line[:endidx]
+ tmp_line += unclosed_quote
+ tmp_endidx = endidx + 1
+
+ raw_tokens = initial_tokens
+
+ # Save the unquoted tokens
+ tokens = [utils.strip_quotes(cur_token) for cur_token in raw_tokens]
+
+ # If the token being completed had an unclosed quote, we need
+ # to remove the closing quote that was added in order for it
+ # to match what was on the command line.
+ if unclosed_quote:
+ raw_tokens[-1] = raw_tokens[-1][:-1]
+
+ return tokens, raw_tokens, begidx, endidx
+
+ class CompletionFinder(argcomplete.CompletionFinder):
+ """Hijack the functor from argcomplete to call AutoCompleter"""
+
+ def __call__(self, argument_parser, completer=None, always_complete_options=True, exit_method=os._exit, output_stream=None,
+ exclude=None, validator=None, print_suppressed=False, append_space=None,
+ default_completer=argcomplete.FilesCompleter()):
+ """
+ :param argument_parser: The argument parser to autocomplete on
+ :type argument_parser: :class:`argparse.ArgumentParser`
+ :param always_complete_options:
+ Controls the autocompletion of option strings if an option string opening character (normally ``-``) has not
+ been entered. If ``True`` (default), both short (``-x``) and long (``--x``) option strings will be
+ suggested. If ``False``, no option strings will be suggested. If ``long``, long options and short options
+ with no long variant will be suggested. If ``short``, short options and long options with no short variant
+ will be suggested.
+ :type always_complete_options: boolean or string
+ :param exit_method:
+ Method used to stop the program after printing completions. Defaults to :meth:`os._exit`. If you want to
+ perform a normal exit that calls exit handlers, use :meth:`sys.exit`.
+ :type exit_method: callable
+ :param exclude: List of strings representing options to be omitted from autocompletion
+ :type exclude: iterable
+ :param validator:
+ Function to filter all completions through before returning (called with two string arguments, completion
+ and prefix; return value is evaluated as a boolean)
+ :type validator: callable
+ :param print_suppressed:
+ Whether or not to autocomplete options that have the ``help=argparse.SUPPRESS`` keyword argument set.
+ :type print_suppressed: boolean
+ :param append_space:
+ Whether to append a space to unique matches. The default is ``True``.
+ :type append_space: boolean
+
+ .. note::
+ If you are not subclassing CompletionFinder to override its behaviors,
+ use ``argcomplete.autocomplete()`` directly. It has the same signature as this method.
+
+ Produces tab completions for ``argument_parser``. See module docs for more info.
+
+ Argcomplete only executes actions if their class is known not to have side effects. Custom action classes can be
+ added to argcomplete.safe_actions, if their values are wanted in the ``parsed_args`` completer argument, or
+ their execution is otherwise desirable.
+ """
+ self.__init__(argument_parser, always_complete_options=always_complete_options, exclude=exclude,
+ validator=validator, print_suppressed=print_suppressed, append_space=append_space,
+ default_completer=default_completer)
+
+ if "_ARGCOMPLETE" not in os.environ:
+ # not an argument completion invocation
+ return
+
+ try:
+ argcomplete.debug_stream = os.fdopen(9, "w")
+ except IOError:
+ argcomplete.debug_stream = sys.stderr
+
+ if output_stream is None:
+ try:
+ output_stream = os.fdopen(8, "wb")
+ except IOError:
+ argcomplete.debug("Unable to open fd 8 for writing, quitting")
+ exit_method(1)
+
+ # print("", stream=debug_stream)
+ # for v in "COMP_CWORD COMP_LINE COMP_POINT COMP_TYPE COMP_KEY _ARGCOMPLETE_COMP_WORDBREAKS COMP_WORDS".split():
+ # print(v, os.environ[v], stream=debug_stream)
+
+ ifs = os.environ.get("_ARGCOMPLETE_IFS", "\013")
+ if len(ifs) != 1:
+ argcomplete.debug("Invalid value for IFS, quitting [{v}]".format(v=ifs))
+ exit_method(1)
+
+ comp_line = os.environ["COMP_LINE"]
+ comp_point = int(os.environ["COMP_POINT"])
+
+ comp_line = argcomplete.ensure_str(comp_line)
+
+ ##############################
+ # SWAPPED FOR AUTOCOMPLETER
+ #
+ # Replaced with our own tokenizer function
+ ##############################
+
+ # cword_prequote, cword_prefix, cword_suffix, comp_words, last_wordbreak_pos = split_line(comp_line, comp_point)
+ tokens, _, begidx, endidx = tokens_for_completion(comp_line, comp_point)
+
+ # _ARGCOMPLETE is set by the shell script to tell us where comp_words
+ # should start, based on what we're completing.
+ # 1: <script> [args]
+ # 2: python <script> [args]
+ # 3: python -m <module> [args]
+ start = int(os.environ["_ARGCOMPLETE"]) - 1
+ ##############################
+ # SWAPPED FOR AUTOCOMPLETER
+ #
+ # Applying the same token dropping to our tokens
+ ##############################
+ # comp_words = comp_words[start:]
+ tokens = tokens[start:]
+
+ # debug("\nLINE: {!r}".format(comp_line),
+ # "\nPOINT: {!r}".format(comp_point),
+ # "\nPREQUOTE: {!r}".format(cword_prequote),
+ # "\nPREFIX: {!r}".format(cword_prefix),
+ # "\nSUFFIX: {!r}".format(cword_suffix),
+ # "\nWORDS:", comp_words)
+
+ ##############################
+ # SWAPPED FOR AUTOCOMPLETER
+ #
+ # Replaced with our own completion function and customizing the returned values
+ ##############################
+ # completions = self._get_completions(comp_words, cword_prefix, cword_prequote, last_wordbreak_pos)
+
+ # capture stdout from the autocompleter
+ result = StringIO()
+ with redirect_stdout(result):
+ completions = completer.complete_command(tokens, tokens[-1], comp_line, begidx, endidx)
+ outstr = result.getvalue()
+
+ if completions:
+ # If any completion has a space in it, then quote all completions
+ # this improves the user experience so they don't nede to go back and add a quote
+ if ' ' in ''.join(completions):
+ completions = ['"{}"'.format(entry) for entry in completions]
+
+ argcomplete.debug("\nReturning completions:", completions)
+
+ output_stream.write(ifs.join(completions).encode(argcomplete.sys_encoding))
+ elif outstr:
+ # if there are no completions, but we got something from stdout, try to print help
+
+ # trick the bash completion into thinking there are 2 completions that are unlikely
+ # to ever match.
+ outstr = outstr.replace('\n', ' ').replace('\t', ' ').replace(' ', ' ').strip()
+ # generate a filler entry that should always sort first
+ filler = ' {0:><{width}}'.format('', width=len(outstr)/2)
+ outstr = ifs.join([filler, outstr])
+
+ output_stream.write(outstr.encode(argcomplete.sys_encoding))
+ else:
+ # if completions is None we assume we don't know how to handle it so let bash
+ # go forward with normal filesystem completion
+ output_stream.write(ifs.join([]).encode(argcomplete.sys_encoding))
+ output_stream.flush()
+ argcomplete.debug_stream.flush()
+ exit_method(0)
diff --git a/cmd2/argparse_completer.py b/cmd2/argparse_completer.py
index 03f2d965..4964b1ec 100755
--- a/cmd2/argparse_completer.py
+++ b/cmd2/argparse_completer.py
@@ -64,7 +64,8 @@ from typing import List, Dict, Tuple, Callable, Union
# imports copied from argparse to support our customized argparse functions
-from argparse import ZERO_OR_MORE, ONE_OR_MORE, ArgumentError, _
+from argparse import ZERO_OR_MORE, ONE_OR_MORE, ArgumentError, _, _get_action_name, SUPPRESS
+
import re as _re
@@ -75,6 +76,7 @@ from .rl_utils import rl_force_redisplay
# define the completion choices for the argument. You may provide a Collection or a Function.
ACTION_ARG_CHOICES = 'arg_choices'
+
class _RangeAction(object):
def __init__(self, nargs: Union[int, str, Tuple[int, int], None]):
self.nargs_min = None
@@ -103,6 +105,7 @@ class _RangeAction(object):
self.nargs_adjusted = nargs
+# noinspection PyShadowingBuiltins,PyShadowingBuiltins
class _StoreRangeAction(argparse._StoreAction, _RangeAction):
def __init__(self,
option_strings,
@@ -131,6 +134,7 @@ class _StoreRangeAction(argparse._StoreAction, _RangeAction):
metavar=metavar)
+# noinspection PyShadowingBuiltins,PyShadowingBuiltins
class _AppendRangeAction(argparse._AppendAction, _RangeAction):
def __init__(self,
option_strings,
@@ -433,7 +437,6 @@ class AutoCompleter(object):
return self.basic_complete(text, line, begidx, endidx, completers.keys())
return []
-
@staticmethod
def _process_action_nargs(action: argparse.Action, arg_state: _ArgumentState) -> None:
if isinstance(action, _RangeAction):
@@ -536,7 +539,10 @@ class AutoCompleter(object):
prefix = ' {0: <{width}} '.format(prefix, width=20)
pref_len = len(prefix)
- help_lines = action.help.splitlines()
+ if action.help is not None:
+ help_lines = action.help.splitlines()
+ else:
+ help_lines = ['']
if len(help_lines) == 1:
print('\nHint:\n{}{}\n'.format(prefix, help_lines[0]))
else:
@@ -571,6 +577,7 @@ class AutoCompleter(object):
###############################################################################
+# noinspection PyCompatibility,PyShadowingBuiltins,PyShadowingBuiltins
class ACHelpFormatter(argparse.HelpFormatter):
"""Custom help formatter to configure ordering of help text"""
@@ -631,6 +638,7 @@ class ACHelpFormatter(argparse.HelpFormatter):
# End cmd2 customization
# helper for wrapping lines
+ # noinspection PyMissingOrEmptyDocstring,PyShadowingNames
def get_lines(parts, indent, prefix=None):
lines = []
line = []
@@ -722,6 +730,7 @@ class ACHelpFormatter(argparse.HelpFormatter):
else:
result = default_metavar
+ # noinspection PyMissingOrEmptyDocstring
def format(tuple_size):
if isinstance(result, tuple):
return result
@@ -748,6 +757,7 @@ class ACHelpFormatter(argparse.HelpFormatter):
return text.splitlines()
+# noinspection PyCompatibility
class ACArgumentParser(argparse.ArgumentParser):
"""Custom argparse class to override error method to change default help text."""
@@ -866,3 +876,259 @@ class ACArgumentParser(argparse.ArgumentParser):
'Expected between {} and {} arguments'.format(action.nargs_min, action.nargs_max))
return super(ACArgumentParser, self)._match_argument(action, arg_strings_pattern)
+
+ def _parse_known_args(self, arg_strings, namespace):
+ # replace arg strings that are file references
+ if self.fromfile_prefix_chars is not None:
+ arg_strings = self._read_args_from_files(arg_strings)
+
+ # map all mutually exclusive arguments to the other arguments
+ # they can't occur with
+ action_conflicts = {}
+ for mutex_group in self._mutually_exclusive_groups:
+ group_actions = mutex_group._group_actions
+ for i, mutex_action in enumerate(mutex_group._group_actions):
+ conflicts = action_conflicts.setdefault(mutex_action, [])
+ conflicts.extend(group_actions[:i])
+ conflicts.extend(group_actions[i + 1:])
+
+ # find all option indices, and determine the arg_string_pattern
+ # which has an 'O' if there is an option at an index,
+ # an 'A' if there is an argument, or a '-' if there is a '--'
+ option_string_indices = {}
+ arg_string_pattern_parts = []
+ arg_strings_iter = iter(arg_strings)
+ for i, arg_string in enumerate(arg_strings_iter):
+
+ # all args after -- are non-options
+ if arg_string == '--':
+ arg_string_pattern_parts.append('-')
+ for arg_string in arg_strings_iter:
+ arg_string_pattern_parts.append('A')
+
+ # otherwise, add the arg to the arg strings
+ # and note the index if it was an option
+ else:
+ option_tuple = self._parse_optional(arg_string)
+ if option_tuple is None:
+ pattern = 'A'
+ else:
+ option_string_indices[i] = option_tuple
+ pattern = 'O'
+ arg_string_pattern_parts.append(pattern)
+
+ # join the pieces together to form the pattern
+ arg_strings_pattern = ''.join(arg_string_pattern_parts)
+
+ # converts arg strings to the appropriate and then takes the action
+ seen_actions = set()
+ seen_non_default_actions = set()
+
+ def take_action(action, argument_strings, option_string=None):
+ seen_actions.add(action)
+ argument_values = self._get_values(action, argument_strings)
+
+ # error if this argument is not allowed with other previously
+ # seen arguments, assuming that actions that use the default
+ # value don't really count as "present"
+ if argument_values is not action.default:
+ seen_non_default_actions.add(action)
+ for conflict_action in action_conflicts.get(action, []):
+ if conflict_action in seen_non_default_actions:
+ msg = _('not allowed with argument %s')
+ action_name = _get_action_name(conflict_action)
+ raise ArgumentError(action, msg % action_name)
+
+ # take the action if we didn't receive a SUPPRESS value
+ # (e.g. from a default)
+ if argument_values is not SUPPRESS:
+ action(self, namespace, argument_values, option_string)
+
+ # function to convert arg_strings into an optional action
+ def consume_optional(start_index):
+
+ # get the optional identified at this index
+ option_tuple = option_string_indices[start_index]
+ action, option_string, explicit_arg = option_tuple
+
+ # identify additional optionals in the same arg string
+ # (e.g. -xyz is the same as -x -y -z if no args are required)
+ match_argument = self._match_argument
+ action_tuples = []
+ while True:
+
+ # if we found no optional action, skip it
+ if action is None:
+ extras.append(arg_strings[start_index])
+ return start_index + 1
+
+ # if there is an explicit argument, try to match the
+ # optional's string arguments to only this
+ if explicit_arg is not None:
+ arg_count = match_argument(action, 'A')
+
+ # if the action is a single-dash option and takes no
+ # arguments, try to parse more single-dash options out
+ # of the tail of the option string
+ chars = self.prefix_chars
+ if arg_count == 0 and option_string[1] not in chars:
+ action_tuples.append((action, [], option_string))
+ char = option_string[0]
+ option_string = char + explicit_arg[0]
+ new_explicit_arg = explicit_arg[1:] or None
+ optionals_map = self._option_string_actions
+ if option_string in optionals_map:
+ action = optionals_map[option_string]
+ explicit_arg = new_explicit_arg
+ else:
+ msg = _('ignored explicit argument %r')
+ raise ArgumentError(action, msg % explicit_arg)
+
+ # if the action expect exactly one argument, we've
+ # successfully matched the option; exit the loop
+ elif arg_count == 1:
+ stop = start_index + 1
+ args = [explicit_arg]
+ action_tuples.append((action, args, option_string))
+ break
+
+ # error if a double-dash option did not use the
+ # explicit argument
+ else:
+ msg = _('ignored explicit argument %r')
+ raise ArgumentError(action, msg % explicit_arg)
+
+ # if there is no explicit argument, try to match the
+ # optional's string arguments with the following strings
+ # if successful, exit the loop
+ else:
+ start = start_index + 1
+ selected_patterns = arg_strings_pattern[start:]
+ arg_count = match_argument(action, selected_patterns)
+ stop = start + arg_count
+ args = arg_strings[start:stop]
+ action_tuples.append((action, args, option_string))
+ break
+
+ # add the Optional to the list and return the index at which
+ # the Optional's string args stopped
+ assert action_tuples
+ for action, args, option_string in action_tuples:
+ take_action(action, args, option_string)
+ return stop
+
+ # the list of Positionals left to be parsed; this is modified
+ # by consume_positionals()
+ positionals = self._get_positional_actions()
+
+ # function to convert arg_strings into positional actions
+ def consume_positionals(start_index):
+ # match as many Positionals as possible
+ match_partial = self._match_arguments_partial
+ selected_pattern = arg_strings_pattern[start_index:]
+ arg_counts = match_partial(positionals, selected_pattern)
+
+ ####################################################################
+ # Applied mixed.patch from https://bugs.python.org/issue15112
+ if 'O' in arg_strings_pattern[start_index:]:
+ # if there is an optional after this, remove
+ # 'empty' positionals from the current match
+
+ while len(arg_counts) > 1 and arg_counts[-1] == 0:
+ arg_counts = arg_counts[:-1]
+ ####################################################################
+
+ # slice off the appropriate arg strings for each Positional
+ # and add the Positional and its args to the list
+ for action, arg_count in zip(positionals, arg_counts):
+ args = arg_strings[start_index: start_index + arg_count]
+ start_index += arg_count
+ take_action(action, args)
+
+ # slice off the Positionals that we just parsed and return the
+ # index at which the Positionals' string args stopped
+ positionals[:] = positionals[len(arg_counts):]
+ return start_index
+
+ # consume Positionals and Optionals alternately, until we have
+ # passed the last option string
+ extras = []
+ start_index = 0
+ if option_string_indices:
+ max_option_string_index = max(option_string_indices)
+ else:
+ max_option_string_index = -1
+ while start_index <= max_option_string_index:
+
+ # consume any Positionals preceding the next option
+ next_option_string_index = min([
+ index
+ for index in option_string_indices
+ if index >= start_index])
+ if start_index != next_option_string_index:
+ positionals_end_index = consume_positionals(start_index)
+
+ # only try to parse the next optional if we didn't consume
+ # the option string during the positionals parsing
+ if positionals_end_index > start_index:
+ start_index = positionals_end_index
+ continue
+ else:
+ start_index = positionals_end_index
+
+ # if we consumed all the positionals we could and we're not
+ # at the index of an option string, there were extra arguments
+ if start_index not in option_string_indices:
+ strings = arg_strings[start_index:next_option_string_index]
+ extras.extend(strings)
+ start_index = next_option_string_index
+
+ # consume the next optional and any arguments for it
+ start_index = consume_optional(start_index)
+
+ # consume any positionals following the last Optional
+ stop_index = consume_positionals(start_index)
+
+ # if we didn't consume all the argument strings, there were extras
+ extras.extend(arg_strings[stop_index:])
+
+ # make sure all required actions were present and also convert
+ # action defaults which were not given as arguments
+ required_actions = []
+ for action in self._actions:
+ if action not in seen_actions:
+ if action.required:
+ required_actions.append(_get_action_name(action))
+ else:
+ # Convert action default now instead of doing it before
+ # parsing arguments to avoid calling convert functions
+ # twice (which may fail) if the argument was given, but
+ # only if it was defined already in the namespace
+ if (action.default is not None and
+ isinstance(action.default, str) and
+ hasattr(namespace, action.dest) and
+ action.default is getattr(namespace, action.dest)):
+ setattr(namespace, action.dest,
+ self._get_value(action, action.default))
+
+ if required_actions:
+ self.error(_('the following arguments are required: %s') %
+ ', '.join(required_actions))
+
+ # make sure all required groups had one option present
+ for group in self._mutually_exclusive_groups:
+ if group.required:
+ for action in group._group_actions:
+ if action in seen_non_default_actions:
+ break
+
+ # if no actions were used, report the error
+ else:
+ names = [_get_action_name(action)
+ for action in group._group_actions
+ if action.help is not SUPPRESS]
+ msg = _('one of the arguments %s is required')
+ self.error(msg % ' '.join(names))
+
+ # return the updated namespace and the extra arguments
+ return namespace, extras
diff --git a/cmd2/cmd2.py b/cmd2/cmd2.py
index e99e4659..f4f30bd4 100755
--- a/cmd2/cmd2.py
+++ b/cmd2/cmd2.py
@@ -197,8 +197,12 @@ def with_argparser_and_unknown_args(argparser: argparse.ArgumentParser) -> Calla
@functools.wraps(func)
def cmd_wrapper(instance, cmdline):
lexed_arglist = parse_quoted_string(cmdline)
- args, unknown = argparser.parse_known_args(lexed_arglist)
- return func(instance, args, unknown)
+ try:
+ args, unknown = argparser.parse_known_args(lexed_arglist)
+ except SystemExit:
+ return
+ else:
+ return func(instance, args, unknown)
# argparser defaults the program name to sys.argv[0]
# we want it to be the name of our command
@@ -234,8 +238,12 @@ def with_argparser(argparser: argparse.ArgumentParser) -> Callable:
@functools.wraps(func)
def cmd_wrapper(instance, cmdline):
lexed_arglist = parse_quoted_string(cmdline)
- args = argparser.parse_args(lexed_arglist)
- return func(instance, args)
+ try:
+ args = argparser.parse_args(lexed_arglist)
+ except SystemExit:
+ return
+ else:
+ return func(instance, args)
# argparser defaults the program name to sys.argv[0]
# we want it to be the name of our command
@@ -608,7 +616,7 @@ class Cmd(cmd.Cmd):
if _which(editor):
break
feedback_to_output = False # Do not include nonessentials in >, | output by default (things like timing)
- locals_in_py = True
+ locals_in_py = False
quiet = False # Do not suppress nonessential output
timing = False # Prints elapsed time for each command
@@ -670,6 +678,7 @@ class Cmd(cmd.Cmd):
self.initial_stdout = sys.stdout
self.history = History()
self.pystate = {}
+ self.pyscript_name = 'app'
self.keywords = self.reserved_words + [fname[3:] for fname in dir(self) if fname.startswith('do_')]
self.statement_parser = StatementParser(
allow_redirection=self.allow_redirection,
@@ -1993,6 +2002,8 @@ class Cmd(cmd.Cmd):
if self.allow_redirection:
self._redirect_output(statement)
timestart = datetime.datetime.now()
+ if self._in_py:
+ self._last_result = None
statement = self.precmd(statement)
stop = self.onecmd(statement)
stop = self.postcmd(stop, statement)
@@ -2797,16 +2808,16 @@ Usage: Usage: unalias [-a] name [name ...]
py <command>: Executes a Python command.
py: Enters interactive Python mode.
End with ``Ctrl-D`` (Unix) / ``Ctrl-Z`` (Windows), ``quit()``, '`exit()``.
- Non-python commands can be issued with ``cmd("your command")``.
+ Non-python commands can be issued with ``pyscript_name("your command")``.
Run python code from external script files with ``run("script.py")``
"""
+ from .pyscript_bridge import PyscriptBridge
if self._in_py:
self.perror("Recursively entering interactive Python consoles is not allowed.", traceback_war=False)
return
self._in_py = True
try:
- self.pystate['self'] = self
arg = arg.strip()
# Support the run command even if called prior to invoking an interactive interpreter
@@ -2829,10 +2840,14 @@ Usage: Usage: unalias [-a] name [name ...]
"""
return self.onecmd_plus_hooks(cmd_plus_args + '\n')
+ bridge = PyscriptBridge(self)
self.pystate['run'] = run
- self.pystate['cmd'] = onecmd_plus_hooks
+ self.pystate[self.pyscript_name] = bridge
+
+ if self.locals_in_py:
+ self.pystate['self'] = self
- localvars = (self.locals_in_py and self.pystate) or {}
+ localvars = self.pystate
interp = InteractiveConsole(locals=localvars)
interp.runcode('import sys, os;sys.path.insert(0, os.getcwd())')
@@ -2853,9 +2868,10 @@ Usage: Usage: unalias [-a] name [name ...]
keepstate = Statekeeper(sys, ('stdin', 'stdout'))
sys.stdout = self.stdout
sys.stdin = self.stdin
+ docstr = self.do_py.__doc__.replace('pyscript_name', self.pyscript_name)
interp.interact(banner="Python %s on %s\n%s\n(%s)\n%s" %
(sys.version, sys.platform, cprt, self.__class__.__name__,
- self.do_py.__doc__))
+ docstr))
except EmbeddedConsoleExit:
pass
if keepstate is not None:
diff --git a/cmd2/pyscript_bridge.py b/cmd2/pyscript_bridge.py
new file mode 100644
index 00000000..055ae4ae
--- /dev/null
+++ b/cmd2/pyscript_bridge.py
@@ -0,0 +1,273 @@
+# coding=utf-8
+"""
+Bridges calls made inside of pyscript with the Cmd2 host app while maintaining a reasonable
+degree of isolation between the two
+
+Copyright 2018 Eric Lin <anselor@gmail.com>
+Released under MIT license, see LICENSE file
+"""
+
+import argparse
+from collections import namedtuple
+import functools
+import sys
+from typing import List, Tuple
+
+# Python 3.4 require contextlib2 for temporarily redirecting stderr and stdout
+if sys.version_info < (3, 5):
+ from contextlib2 import redirect_stdout, redirect_stderr
+else:
+ from contextlib import redirect_stdout, redirect_stderr
+
+from .argparse_completer import _RangeAction
+from .utils import namedtuple_with_defaults
+
+
+class CommandResult(namedtuple_with_defaults('CmdResult', ['stdout', 'stderr', 'data'])):
+ """Encapsulates the results from a command.
+
+ Named tuple attributes
+ ----------------------
+ stdout: str - Output captured from stdout while this command is executing
+ stderr: str - Output captured from stderr while this command is executing. None if no error captured
+ data - Data returned by the command.
+
+ NOTE: Named tuples are immutable. So the contents are there for access, not for modification.
+ """
+ def __bool__(self):
+ """If stderr is None and data is not None the command is considered a success"""
+ return not self.stderr and self.data is not None
+
+
+class CopyStream(object):
+ """Copies all data written to a stream"""
+ def __init__(self, inner_stream):
+ self.buffer = ''
+ self.inner_stream = inner_stream
+
+ def write(self, s):
+ self.buffer += s
+ self.inner_stream.write(s)
+
+ def read(self):
+ raise NotImplementedError
+
+ def clear(self):
+ self.buffer = ''
+
+
+def _exec_cmd(cmd2_app, func):
+ """Helper to encapsulate executing a command and capturing the results"""
+ copy_stdout = CopyStream(sys.stdout)
+ copy_stderr = CopyStream(sys.stderr)
+
+ cmd2_app._last_result = None
+
+ with redirect_stdout(copy_stdout):
+ with redirect_stderr(copy_stderr):
+ func()
+
+ # if stderr is empty, set it to None
+ stderr = copy_stderr if copy_stderr.buffer else None
+
+ result = CommandResult(stdout=copy_stdout.buffer, stderr=stderr, data=cmd2_app._last_result)
+ return result
+
+
+class ArgparseFunctor:
+ """
+ Encapsulates translating python object traversal
+ """
+ def __init__(self, cmd2_app, item, parser):
+ self._cmd2_app = cmd2_app
+ self._item = item
+ self._parser = parser
+
+ # Dictionary mapping command argument name to value
+ self._args = {}
+ # argparse object for the current command layer
+ self.__current_subcommand_parser = parser
+
+ def __getattr__(self, item):
+ """Search for a subcommand matching this item and update internal state to track the traversal"""
+ # look for sub-command under the current command/sub-command layer
+ for action in self.__current_subcommand_parser._actions:
+ if not action.option_strings and isinstance(action, argparse._SubParsersAction):
+ if item in action.choices:
+ # item matches the a sub-command, save our position in argparse,
+ # save the sub-command, return self to allow next level of traversal
+ self.__current_subcommand_parser = action.choices[item]
+ self._args[action.dest] = item
+ return self
+
+ raise AttributeError(item)
+ # return super().__getattr__(item)
+
+ def __call__(self, *args, **kwargs):
+ """
+ Process the arguments at this layer of the argparse command tree. If there are more sub-commands,
+ return self to accept the next sub-command name. If there are no more sub-commands, execute the
+ sub-command with the given parameters.
+ """
+ next_pos_index = 0
+
+ has_subcommand = False
+ consumed_kw = []
+
+ # Iterate through the current sub-command's arguments in order
+ for action in self.__current_subcommand_parser._actions:
+ # is this a flag option?
+ if action.option_strings:
+ # this is a flag argument, search for the argument by name in the parameters
+ if action.dest in kwargs:
+ self._args[action.dest] = kwargs[action.dest]
+ consumed_kw.append(action.dest)
+ else:
+ # This is a positional argument, search the positional arguments passed in.
+ if not isinstance(action, argparse._SubParsersAction):
+ if action.dest in kwargs:
+ # if this positional argument happens to be passed in as a keyword argument
+ # go ahead and consume the matching keyword argument
+ self._args[action.dest] = kwargs[action.dest]
+ elif next_pos_index < len(args):
+ # Make sure we actually have positional arguments to consume
+ pos_remain = len(args) - next_pos_index
+
+ # Check if this argument consumes a range of values
+ if isinstance(action, _RangeAction) and action.nargs_min is not None \
+ and action.nargs_max is not None:
+ # this is a cmd2 ranged action.
+
+ if pos_remain >= action.nargs_min:
+ # Do we meet the minimum count?
+ if pos_remain > action.nargs_max:
+ # Do we exceed the maximum count?
+ self._args[action.dest] = args[next_pos_index:next_pos_index + action.nargs_max]
+ next_pos_index += action.nargs_max
+ else:
+ self._args[action.dest] = args[next_pos_index:next_pos_index + pos_remain]
+ next_pos_index += pos_remain
+ else:
+ raise ValueError('Expected at least {} values for {}'.format(action.nargs_min,
+ action.dest))
+ elif action.nargs is not None:
+ if action.nargs == '+':
+ if pos_remain > 0:
+ self._args[action.dest] = args[next_pos_index:next_pos_index + pos_remain]
+ next_pos_index += pos_remain
+ else:
+ raise ValueError('Expected at least 1 value for {}'.format(action.dest))
+ elif action.nargs == '*':
+ self._args[action.dest] = args[next_pos_index:next_pos_index + pos_remain]
+ next_pos_index += pos_remain
+ elif action.nargs == '?':
+ self._args[action.dest] = args[next_pos_index]
+ next_pos_index += 1
+ else:
+ self._args[action.dest] = args[next_pos_index]
+ next_pos_index += 1
+ else:
+ has_subcommand = True
+
+ # Check if there are any extra arguments we don't know how to handle
+ for kw in kwargs:
+ if kw not in self._args: # consumed_kw:
+ raise TypeError('{}() got an unexpected keyword argument \'{}\''.format(
+ self.__current_subcommand_parser.prog, kw))
+
+ if has_subcommand:
+ return self
+ else:
+ return self._run()
+
+ def _run(self):
+ # look up command function
+ func = getattr(self._cmd2_app, 'do_' + self._item)
+
+ # reconstruct the cmd2 command from the python call
+ cmd_str = ['']
+
+ def process_flag(action, value):
+ if isinstance(action, argparse._CountAction):
+ if isinstance(value, int):
+ for c in range(value):
+ cmd_str[0] += '{} '.format(action.option_strings[0])
+ return
+ else:
+ raise TypeError('Expected int for ' + action.dest)
+ if isinstance(action, argparse._StoreConstAction) or isinstance(action, argparse._AppendConstAction):
+ if value:
+ # Nothing else to append to the command string, just the flag is enough.
+ cmd_str[0] += '{} '.format(action.option_strings[0])
+ return
+ else:
+ # value is not True so we default to false, which means don't include the flag
+ return
+
+ # was the argument a flag?
+ if action.option_strings:
+ cmd_str[0] += '{} '.format(action.option_strings[0])
+
+ if isinstance(value, List) or isinstance(value, Tuple):
+ for item in value:
+ item = str(item).strip()
+ if ' ' in item:
+ item = '"{}"'.format(item)
+ cmd_str[0] += '{} '.format(item)
+ else:
+ value = str(value).strip()
+ if ' ' in value:
+ value = '"{}"'.format(value)
+ cmd_str[0] += '{} '.format(value)
+
+ def traverse_parser(parser):
+ for action in parser._actions:
+ # was something provided for the argument
+ if action.dest in self._args:
+ if isinstance(action, argparse._SubParsersAction):
+ cmd_str[0] += '{} '.format(self._args[action.dest])
+ traverse_parser(action.choices[self._args[action.dest]])
+ elif isinstance(action, argparse._AppendAction):
+ if isinstance(self._args[action.dest], List) or isinstance(self._args[action.dest], Tuple):
+ for values in self._args[action.dest]:
+ process_flag(action, values)
+ else:
+ process_flag(action, self._args[action.dest])
+ else:
+ process_flag(action, self._args[action.dest])
+
+ traverse_parser(self._parser)
+
+ # print('Command: {}'.format(cmd_str[0]))
+
+ return _exec_cmd(self._cmd2_app, functools.partial(func, cmd_str[0]))
+
+class PyscriptBridge(object):
+ """Preserves the legacy 'cmd' interface for pyscript while also providing a new python API wrapper for
+ application commands."""
+ def __init__(self, cmd2_app):
+ self._cmd2_app = cmd2_app
+ self._last_result = None
+
+ def __getattr__(self, item: str):
+ """Check if the attribute is a command. If so, return a callable."""
+ commands = self._cmd2_app.get_all_commands()
+ if item in commands:
+ func = getattr(self._cmd2_app, 'do_' + item)
+
+ try:
+ # See if the command uses argparse
+ parser = getattr(func, 'argparser')
+ except AttributeError:
+ # Command doesn't, we will accept parameters in the form of a command string
+ def wrap_func(args=''):
+ return _exec_cmd(self._cmd2_app, functools.partial(func, args))
+ return wrap_func
+ else:
+ # Command does use argparse, return an object that can traverse the argparse subcommands and arguments
+ return ArgparseFunctor(self._cmd2_app, item, parser)
+
+ raise AttributeError(item)
+
+ def __call__(self, args):
+ return _exec_cmd(self._cmd2_app, functools.partial(self._cmd2_app.onecmd_plus_hooks, args + '\n'))
diff --git a/cmd2/utils.py b/cmd2/utils.py
index 6abab94c..dbe39213 100644
--- a/cmd2/utils.py
+++ b/cmd2/utils.py
@@ -2,6 +2,7 @@
# coding=utf-8
"""Shared utility functions"""
+import collections
from . import constants
def strip_ansi(text: str) -> str:
@@ -12,6 +13,7 @@ def strip_ansi(text: str) -> str:
"""
return constants.ANSI_ESCAPE_RE.sub('', text)
+
def strip_quotes(arg: str) -> str:
""" Strip outer quotes from a string.
@@ -23,3 +25,33 @@ def strip_quotes(arg: str) -> str:
if len(arg) > 1 and arg[0] == arg[-1] and arg[0] in constants.QUOTES:
arg = arg[1:-1]
return arg
+
+
+def namedtuple_with_defaults(typename, field_names, default_values=()):
+ """
+ Convenience function for defining a namedtuple with default values
+
+ From: https://stackoverflow.com/questions/11351032/namedtuple-and-default-values-for-optional-keyword-arguments
+
+ Examples:
+ >>> Node = namedtuple_with_defaults('Node', 'val left right')
+ >>> Node()
+ Node(val=None, left=None, right=None)
+ >>> Node = namedtuple_with_defaults('Node', 'val left right', [1, 2, 3])
+ >>> Node()
+ Node(val=1, left=2, right=3)
+ >>> Node = namedtuple_with_defaults('Node', 'val left right', {'right':7})
+ >>> Node()
+ Node(val=None, left=None, right=7)
+ >>> Node(4)
+ Node(val=4, left=None, right=7)
+ """
+ T = collections.namedtuple(typename, field_names)
+ T.__new__.__defaults__ = (None,) * len(T._fields)
+ if isinstance(default_values, collections.Mapping):
+ prototype = T(**default_values)
+ else:
+ prototype = T(*default_values)
+ T.__new__.__defaults__ = tuple(prototype)
+ return T
+