summaryrefslogtreecommitdiff
path: root/cmd2/cmd2.py
diff options
context:
space:
mode:
Diffstat (limited to 'cmd2/cmd2.py')
-rwxr-xr-xcmd2/cmd2.py4069
1 files changed, 4069 insertions, 0 deletions
diff --git a/cmd2/cmd2.py b/cmd2/cmd2.py
new file mode 100755
index 00000000..54eff811
--- /dev/null
+++ b/cmd2/cmd2.py
@@ -0,0 +1,4069 @@
+#!/usr/bin/env python
+# coding=utf-8
+"""Variant on standard library's cmd with extra features.
+
+To use, simply import cmd2.Cmd instead of cmd.Cmd; use precisely as though you
+were using the standard library's cmd, while enjoying the extra features.
+
+Searchable command history (commands: "history")
+Load commands from file, save to file, edit commands in file
+Multi-line commands
+Special-character shortcut commands (beyond cmd's "@" and "!")
+Settable environment parameters
+Parsing commands with `argparse` argument parsers (flags)
+Redirection to file with >, >>; input from file with <
+Easy transcript-based testing of applications (see examples/example.py)
+Bash-style ``select`` available
+
+Note that redirection with > and | will only work if `self.poutput()`
+is used in place of `print`.
+
+- Catherine Devlin, Jan 03 2008 - catherinedevlin.blogspot.com
+
+Git repository on GitHub at https://github.com/python-cmd2/cmd2
+"""
+import argparse
+import atexit
+import cmd
+import codecs
+import collections
+import copy
+import datetime
+import functools
+import glob
+import io
+from io import StringIO
+import os
+import platform
+import re
+import shlex
+import signal
+import subprocess
+import sys
+import tempfile
+import traceback
+import unittest
+from code import InteractiveConsole
+
+try:
+ from enum34 import Enum
+except ImportError:
+ from enum import Enum
+
+import pyparsing
+import pyperclip
+
+# Newer versions of pyperclip are released as a single file, but older versions had a more complicated structure
+try:
+ from pyperclip.exceptions import PyperclipException
+except ImportError:
+ # noinspection PyUnresolvedReferences
+ from pyperclip import PyperclipException
+
+# Collection is a container that is sizable and iterable
+# It was introduced in Python 3.6. We will try to import it, otherwise use our implementation
+try:
+ from collections.abc import Collection, Iterable
+except ImportError:
+ from collections.abc import Sized, Iterable, Container
+
+ # noinspection PyAbstractClass
+ class Collection(Sized, Iterable, Container):
+
+ __slots__ = ()
+
+ # noinspection PyPep8Naming
+ @classmethod
+ def __subclasshook__(cls, C):
+ if cls is Collection:
+ if any("__len__" in B.__dict__ for B in C.__mro__) and \
+ any("__iter__" in B.__dict__ for B in C.__mro__) and \
+ any("__contains__" in B.__dict__ for B in C.__mro__):
+ return True
+ return NotImplemented
+
+# Python 3.4 require contextlib2 for temporarily redirecting stderr and stdout
+if sys.version_info < (3, 5):
+ from contextlib2 import redirect_stdout, redirect_stderr
+else:
+ from contextlib import redirect_stdout, redirect_stderr
+
+# Detect whether IPython is installed to determine if the built-in "ipy" command should be included
+ipython_available = True
+try:
+ # noinspection PyUnresolvedReferences,PyPackageRequirements
+ from IPython import embed
+except ImportError:
+ ipython_available = False
+
+# Prefer statically linked gnureadline if available (for macOS compatibility due to issues with libedit)
+try:
+ import gnureadline as readline
+except ImportError:
+ # Try to import readline, but allow failure for convenience in Windows unit testing
+ # Note: If this actually fails, you should install readline on Linux or Mac or pyreadline on Windows
+ try:
+ # noinspection PyUnresolvedReferences
+ import readline
+ except ImportError:
+ pass
+
+# Check what implementation of readline we are using
+class RlType(Enum):
+ GNU = 1
+ PYREADLINE = 2
+ NONE = 3
+
+rl_type = RlType.NONE
+
+if 'pyreadline' in sys.modules:
+ rl_type = RlType.PYREADLINE
+
+ # Save the original pyreadline display completion function since we need to override it and restore it
+ # noinspection PyProtectedMember
+ orig_pyreadline_display = readline.rl.mode._display_completions
+
+elif 'gnureadline' in sys.modules or 'readline' in sys.modules:
+ rl_type = RlType.GNU
+
+ # We need wcswidth to calculate display width of tab completions
+ from wcwidth import wcswidth
+
+ # Load the readline lib so we can make changes to it
+ import ctypes
+ readline_lib = ctypes.CDLL(readline.__file__)
+
+ # Save address that rl_basic_quote_characters is pointing to since we need to override and restore it
+ rl_basic_quote_characters = ctypes.c_char_p.in_dll(readline_lib, "rl_basic_quote_characters")
+ orig_rl_basic_quote_characters_addr = ctypes.cast(rl_basic_quote_characters, ctypes.c_void_p).value
+
+__version__ = '0.9.0'
+
+# Pyparsing enablePackrat() can greatly speed up parsing, but problems have been seen in Python 3 in the past
+pyparsing.ParserElement.enablePackrat()
+
+# Override the default whitespace chars in Pyparsing so that newlines are not treated as whitespace
+pyparsing.ParserElement.setDefaultWhitespaceChars(' \t')
+
+
+# The next 2 variables and associated setter functions effect how arguments are parsed for decorated commands
+# which use one of the decorators: @with_argument_list, @with_argparser, or @with_argparser_and_unknown_args
+# The defaults are sane and maximize ease of use for new applications based on cmd2.
+
+# Use POSIX or Non-POSIX (Windows) rules for splitting a command-line string into a list of arguments via shlex.split()
+POSIX_SHLEX = False
+
+# Strip outer quotes for convenience if POSIX_SHLEX = False
+STRIP_QUOTES_FOR_NON_POSIX = True
+
+# Used for tab completion and word breaks. Do not change.
+QUOTES = ['"', "'"]
+REDIRECTION_CHARS = ['|', '<', '>']
+
+# optional attribute, when tagged on a function, allows cmd2 to categorize commands
+HELP_CATEGORY = 'help_category'
+HELP_SUMMARY = 'help_summary'
+
+
+def categorize(func, category):
+ """Categorize a function.
+
+ The help command output will group this function under the specified category heading
+
+ :param func: Union[Callable, Iterable] - function to categorize
+ :param category: str - category to put it in
+ """
+ if isinstance(func, Iterable):
+ for item in func:
+ setattr(item, HELP_CATEGORY, category)
+ else:
+ setattr(func, HELP_CATEGORY, category)
+
+
+def set_posix_shlex(val):
+ """ Allows user of cmd2 to choose between POSIX and non-POSIX splitting of args for decorated commands.
+
+ :param val: bool - True => POSIX, False => Non-POSIX
+ """
+ global POSIX_SHLEX
+ POSIX_SHLEX = val
+
+
+def set_strip_quotes(val):
+ """ Allows user of cmd2 to choose whether to automatically strip outer-quotes when POSIX_SHLEX is False.
+
+ :param val: bool - True => strip quotes on args for decorated commands if POSIX_SHLEX is False.
+ """
+ global STRIP_QUOTES_FOR_NON_POSIX
+ STRIP_QUOTES_FOR_NON_POSIX = val
+
+
+def _which(editor):
+ try:
+ editor_path = subprocess.check_output(['which', editor], stderr=subprocess.STDOUT).strip()
+ editor_path = editor_path.decode()
+ except subprocess.CalledProcessError:
+ editor_path = None
+ return editor_path
+
+
+def strip_quotes(arg):
+ """ Strip outer quotes from a string.
+
+ Applies to both single and double quotes.
+
+ :param arg: str - string to strip outer quotes from
+ :return str - same string with potentially outer quotes stripped
+ """
+ quote_chars = '"' + "'"
+
+ if len(arg) > 1 and arg[0] == arg[-1] and arg[0] in quote_chars:
+ arg = arg[1:-1]
+ return arg
+
+
+def parse_quoted_string(cmdline):
+ """Parse a quoted string into a list of arguments."""
+ if isinstance(cmdline, list):
+ # arguments are already a list, return the list we were passed
+ lexed_arglist = cmdline
+ else:
+ # Use shlex to split the command line into a list of arguments based on shell rules
+ lexed_arglist = shlex.split(cmdline, posix=POSIX_SHLEX)
+ # If not using POSIX shlex, make sure to strip off outer quotes for convenience
+ if not POSIX_SHLEX and STRIP_QUOTES_FOR_NON_POSIX:
+ temp_arglist = []
+ for arg in lexed_arglist:
+ temp_arglist.append(strip_quotes(arg))
+ lexed_arglist = temp_arglist
+ return lexed_arglist
+
+
+def with_category(category):
+ """A decorator to apply a category to a command function"""
+ def cat_decorator(func):
+ categorize(func, category)
+ return func
+ return cat_decorator
+
+
+def with_argument_list(func):
+ """A decorator to alter the arguments passed to a do_* cmd2
+ method. Default passes a string of whatever the user typed.
+ With this decorator, the decorated method will receive a list
+ of arguments parsed from user input using shlex.split()."""
+ @functools.wraps(func)
+ def cmd_wrapper(self, cmdline):
+ lexed_arglist = parse_quoted_string(cmdline)
+ return func(self, lexed_arglist)
+
+ cmd_wrapper.__doc__ = func.__doc__
+ return cmd_wrapper
+
+
+def with_argparser_and_unknown_args(argparser):
+ """A decorator to alter a cmd2 method to populate its ``args`` argument by parsing arguments with the given
+ instance of argparse.ArgumentParser, but also returning unknown args as a list.
+
+ :param argparser: argparse.ArgumentParser - given instance of ArgumentParser
+ :return: function that gets passed parsed args and a list of unknown args
+ """
+
+ # noinspection PyProtectedMember
+ def arg_decorator(func):
+ @functools.wraps(func)
+ def cmd_wrapper(instance, cmdline):
+ lexed_arglist = parse_quoted_string(cmdline)
+ args, unknown = argparser.parse_known_args(lexed_arglist)
+ return func(instance, args, unknown)
+
+ # argparser defaults the program name to sys.argv[0]
+ # we want it to be the name of our command
+ argparser.prog = func.__name__[3:]
+
+ # If the description has not been set, then use the method docstring if one exists
+ if argparser.description is None and func.__doc__:
+ argparser.description = func.__doc__
+
+ if func.__doc__:
+ setattr(cmd_wrapper, HELP_SUMMARY, func.__doc__)
+
+ cmd_wrapper.__doc__ = argparser.format_help()
+
+ # Mark this function as having an argparse ArgumentParser (used by do_help)
+ cmd_wrapper.__dict__['has_parser'] = True
+
+ # If there are subcommands, store their names in a list to support tab-completion of subcommand names
+ if argparser._subparsers is not None:
+ subcommand_names = argparser._subparsers._group_actions[0]._name_parser_map.keys()
+ cmd_wrapper.__dict__['subcommand_names'] = subcommand_names
+
+ return cmd_wrapper
+
+ return arg_decorator
+
+
+def with_argparser(argparser):
+ """A decorator to alter a cmd2 method to populate its ``args`` argument by parsing arguments
+ with the given instance of argparse.ArgumentParser.
+
+ :param argparser: argparse.ArgumentParser - given instance of ArgumentParser
+ :return: function that gets passed parsed args
+ """
+
+ # noinspection PyProtectedMember
+ def arg_decorator(func):
+ @functools.wraps(func)
+ def cmd_wrapper(instance, cmdline):
+ lexed_arglist = parse_quoted_string(cmdline)
+ args = argparser.parse_args(lexed_arglist)
+ return func(instance, args)
+
+ # argparser defaults the program name to sys.argv[0]
+ # we want it to be the name of our command
+ argparser.prog = func.__name__[3:]
+
+ # If the description has not been set, then use the method docstring if one exists
+ if argparser.description is None and func.__doc__:
+ argparser.description = func.__doc__
+
+ if func.__doc__:
+ setattr(cmd_wrapper, HELP_SUMMARY, func.__doc__)
+
+ cmd_wrapper.__doc__ = argparser.format_help()
+
+ # Mark this function as having an argparse ArgumentParser (used by do_help)
+ cmd_wrapper.__dict__['has_parser'] = True
+
+ # If there are subcommands, store their names in a list to support tab-completion of subcommand names
+ if argparser._subparsers is not None:
+
+ # Key is subcommand name and value is completer function
+ subcommands = collections.OrderedDict()
+
+ # Get all subcommands and check if they have completer functions
+ for name, parser in argparser._subparsers._group_actions[0]._name_parser_map.items():
+ if 'completer' in parser._defaults:
+ completer = parser._defaults['completer']
+ else:
+ completer = None
+ subcommands[name] = completer
+
+ cmd_wrapper.__dict__['subcommands'] = subcommands
+
+ return cmd_wrapper
+
+ return arg_decorator
+
+
+# Can we access the clipboard? Should always be true on Windows and Mac, but only sometimes on Linux
+# noinspection PyUnresolvedReferences
+try:
+ # Get the version of the pyperclip module as a float
+ pyperclip_ver = float('.'.join(pyperclip.__version__.split('.')[:2]))
+
+ # The extraneous output bug in pyperclip on Linux using xclip was fixed in more recent versions of pyperclip
+ if sys.platform.startswith('linux') and pyperclip_ver < 1.6:
+ # Avoid extraneous output to stderr from xclip when clipboard is empty at cost of overwriting clipboard contents
+ pyperclip.copy('')
+ else:
+ # Try getting the contents of the clipboard
+ _ = pyperclip.paste()
+except PyperclipException:
+ can_clip = False
+else:
+ can_clip = True
+
+
+def get_paste_buffer():
+ """Get the contents of the clipboard / paste buffer.
+
+ :return: str - contents of the clipboard
+ """
+ pb_str = pyperclip.paste()
+ return pb_str
+
+
+def write_to_paste_buffer(txt):
+ """Copy text to the clipboard / paste buffer.
+
+ :param txt: str - text to copy to the clipboard
+ """
+ pyperclip.copy(txt)
+
+
+class ParsedString(str):
+ """Subclass of str which also stores a pyparsing.ParseResults object containing structured parse results."""
+ # pyarsing.ParseResults - structured parse results, to provide multiple means of access to the parsed data
+ parsed = None
+
+ # Function which did the parsing
+ parser = None
+
+ def full_parsed_statement(self):
+ """Used to reconstruct the full parsed statement when a command isn't recognized."""
+ new = ParsedString('%s %s' % (self.parsed.command, self.parsed.args))
+ new.parsed = self.parsed
+ new.parser = self.parser
+ return new
+
+
+def replace_with_file_contents(fname):
+ """Action to perform when successfully matching parse element definition for inputFrom parser.
+
+ :param fname: str - filename
+ :return: str - contents of file "fname"
+ """
+ try:
+ # Any outer quotes are not part of the filename
+ unquoted_file = strip_quotes(fname[0])
+ with open(os.path.expanduser(unquoted_file)) as source_file:
+ result = source_file.read()
+ except IOError:
+ result = '< %s' % fname[0] # wasn't a file after all
+
+ # TODO: IF pyparsing input parser logic gets fixed to support empty file, add support to get from paste buffer
+ return result
+
+
+class EmbeddedConsoleExit(SystemExit):
+ """Custom exception class for use with the py command."""
+ pass
+
+
+class EmptyStatement(Exception):
+ """Custom exception class for handling behavior when the user just presses <Enter>."""
+ pass
+
+
+# Regular expression to match ANSI escape codes
+ANSI_ESCAPE_RE = re.compile(r'\x1b[^m]*m')
+
+
+def strip_ansi(text):
+ """Strip ANSI escape codes from a string.
+
+ :param text: str - a string which may contain ANSI escape codes
+ :return: str - the same string with any ANSI escape codes removed
+ """
+ return ANSI_ESCAPE_RE.sub('', text)
+
+
+def _pop_readline_history(clear_history=True):
+ """Returns a copy of readline's history and optionally clears it (default)"""
+ # noinspection PyArgumentList
+ history = [
+ readline.get_history_item(i)
+ for i in range(1, 1 + readline.get_current_history_length())
+ ]
+ if clear_history:
+ readline.clear_history()
+ return history
+
+
+def _push_readline_history(history, clear_history=True):
+ """Restores readline's history and optionally clears it first (default)"""
+ if clear_history:
+ readline.clear_history()
+ for line in history:
+ readline.add_history(line)
+
+
+def _complete_from_cmd(cmd_obj, text, line, begidx, endidx):
+ """Complete as though the user was typing inside cmd's cmdloop()"""
+ from itertools import takewhile
+ command_subcommand_params = line.split(None, 3)
+
+ if len(command_subcommand_params) < (3 if text else 2):
+ n = len(command_subcommand_params[0])
+ n += sum(1 for _ in takewhile(str.isspace, line[n:]))
+ return cmd_obj.completenames(text, line[n:], begidx - n, endidx - n)
+
+ command, subcommand = command_subcommand_params[:2]
+ n = len(command) + sum(1 for _ in takewhile(str.isspace, line))
+ cfun = getattr(cmd_obj, 'complete_' + subcommand, cmd_obj.complete)
+ return cfun(text, line[n:], begidx - n, endidx - n)
+
+
+class AddSubmenu(object):
+ """Conveniently add a submenu (Cmd-like class) to a Cmd
+
+ e.g. given "class SubMenu(Cmd): ..." then
+
+ @AddSubmenu(SubMenu(), 'sub')
+ class MyCmd(cmd.Cmd):
+ ....
+
+ will have the following effects:
+ 1. 'sub' will interactively enter the cmdloop of a SubMenu instance
+ 2. 'sub cmd args' will call do_cmd(args) in a SubMenu instance
+ 3. 'sub ... [TAB]' will have the same behavior as [TAB] in a SubMenu cmdloop
+ i.e., autocompletion works the way you think it should
+ 4. 'help sub [cmd]' will print SubMenu's help (calls its do_help())
+ """
+
+ class _Nonexistent(object):
+ """
+ Used to mark missing attributes.
+ Disable __dict__ creation since this class does nothing
+ """
+ __slots__ = () #
+
+ def __init__(self,
+ submenu,
+ command,
+ aliases=(),
+ reformat_prompt="{super_prompt}>> {sub_prompt}",
+ shared_attributes=None,
+ require_predefined_shares=True,
+ create_subclass=False,
+ preserve_shares=False,
+ persistent_history_file=None
+ ):
+ """Set up the class decorator
+
+ submenu (Cmd): Instance of something cmd.Cmd-like
+
+ command (str): The command the user types to access the SubMenu instance
+
+ aliases (iterable): More commands that will behave like "command"
+
+ reformat_prompt (str): Format str or None to disable
+ if it's a string, it should contain one or more of:
+ {super_prompt}: The current cmd's prompt
+ {command}: The command in the current cmd with which it was called
+ {sub_prompt}: The subordinate cmd's original prompt
+ the default is "{super_prompt}{command} {sub_prompt}"
+
+ shared_attributes (dict): dict of the form {'subordinate_attr': 'parent_attr'}
+ the attributes are copied to the submenu at the last moment; the submenu's
+ attributes are backed up before this and restored afterward
+
+ require_predefined_shares: The shared attributes above must be independently
+ defined in the subordinate Cmd (default: True)
+
+ create_subclass: put the modifications in a subclass rather than modifying
+ the existing class (default: False)
+ """
+ self.submenu = submenu
+ self.command = command
+ self.aliases = aliases
+ if persistent_history_file:
+ self.persistent_history_file = os.path.expanduser(persistent_history_file)
+ else:
+ self.persistent_history_file = None
+
+ if reformat_prompt is not None and not isinstance(reformat_prompt, str):
+ raise ValueError("reformat_prompt should be either a format string or None")
+ self.reformat_prompt = reformat_prompt
+
+ self.shared_attributes = {} if shared_attributes is None else shared_attributes
+ if require_predefined_shares:
+ for attr in self.shared_attributes.keys():
+ if not hasattr(submenu, attr):
+ raise AttributeError("The shared attribute '{attr}' is not defined in {cmd}. Either define {attr} "
+ "in {cmd} or set require_predefined_shares=False."
+ .format(cmd=submenu.__class__.__name__, attr=attr))
+
+ self.create_subclass = create_subclass
+ self.preserve_shares = preserve_shares
+
+ def _get_original_attributes(self):
+ return {
+ attr: getattr(self.submenu, attr, AddSubmenu._Nonexistent)
+ for attr in self.shared_attributes.keys()
+ }
+
+ def _copy_in_shared_attrs(self, parent_cmd):
+ for sub_attr, par_attr in self.shared_attributes.items():
+ setattr(self.submenu, sub_attr, getattr(parent_cmd, par_attr))
+
+ def _copy_out_shared_attrs(self, parent_cmd, original_attributes):
+ if self.preserve_shares:
+ for sub_attr, par_attr in self.shared_attributes.items():
+ setattr(parent_cmd, par_attr, getattr(self.submenu, sub_attr))
+ else:
+ for attr, value in original_attributes.items():
+ if attr is not AddSubmenu._Nonexistent:
+ setattr(self.submenu, attr, value)
+ else:
+ delattr(self.submenu, attr)
+
+ def __call__(self, cmd_obj):
+ """Creates a subclass of Cmd wherein the given submenu can be accessed via the given command"""
+ def enter_submenu(parent_cmd, line):
+ """
+ This function will be bound to do_<submenu> and will change the scope of the CLI to that of the
+ submenu.
+ """
+ submenu = self.submenu
+ original_attributes = self._get_original_attributes()
+ history = _pop_readline_history()
+
+ if self.persistent_history_file:
+ try:
+ readline.read_history_file(self.persistent_history_file)
+ except FileNotFoundError:
+ pass
+
+ try:
+ # copy over any shared attributes
+ self._copy_in_shared_attrs(parent_cmd)
+
+ if line.parsed.args:
+ # Remove the menu argument and execute the command in the submenu
+ line = submenu.parser_manager.parsed(line.parsed.args)
+ submenu.precmd(line)
+ ret = submenu.onecmd(line)
+ submenu.postcmd(ret, line)
+ else:
+ if self.reformat_prompt is not None:
+ prompt = submenu.prompt
+ submenu.prompt = self.reformat_prompt.format(
+ super_prompt=parent_cmd.prompt,
+ command=self.command,
+ sub_prompt=prompt,
+ )
+ submenu.cmdloop()
+ if self.reformat_prompt is not None:
+ # noinspection PyUnboundLocalVariable
+ self.submenu.prompt = prompt
+ finally:
+ # copy back original attributes
+ self._copy_out_shared_attrs(parent_cmd, original_attributes)
+
+ # write submenu history
+ if self.persistent_history_file:
+ readline.write_history_file(self.persistent_history_file)
+ # reset main app history before exit
+ _push_readline_history(history)
+
+ def complete_submenu(_self, text, line, begidx, endidx):
+ """
+ This function will be bound to complete_<submenu> and will perform the complete commands of the submenu.
+ """
+ submenu = self.submenu
+ original_attributes = self._get_original_attributes()
+ try:
+ # copy over any shared attributes
+ self._copy_in_shared_attrs(_self)
+ return _complete_from_cmd(submenu, text, line, begidx, endidx)
+ finally:
+ # copy back original attributes
+ self._copy_out_shared_attrs(_self, original_attributes)
+
+ original_do_help = cmd_obj.do_help
+ original_complete_help = cmd_obj.complete_help
+
+ def help_submenu(_self, line):
+ """
+ This function will be bound to help_<submenu> and will call the help commands of the submenu.
+ """
+ tokens = line.split(None, 1)
+ if tokens and (tokens[0] == self.command or tokens[0] in self.aliases):
+ self.submenu.do_help(tokens[1] if len(tokens) == 2 else '')
+ else:
+ original_do_help(_self, line)
+
+ def _complete_submenu_help(_self, text, line, begidx, endidx):
+ """autocomplete to match help_submenu()'s behavior"""
+ tokens = line.split(None, 1)
+ if len(tokens) == 2 and (
+ not (not tokens[1].startswith(self.command) and not any(
+ tokens[1].startswith(alias) for alias in self.aliases))
+ ):
+ return self.submenu.complete_help(
+ text,
+ tokens[1],
+ begidx - line.index(tokens[1]),
+ endidx - line.index(tokens[1]),
+ )
+ else:
+ return original_complete_help(_self, text, line, begidx, endidx)
+
+ if self.create_subclass:
+ class _Cmd(cmd_obj):
+ do_help = help_submenu
+ complete_help = _complete_submenu_help
+ else:
+ _Cmd = cmd_obj
+ _Cmd.do_help = help_submenu
+ _Cmd.complete_help = _complete_submenu_help
+
+ # Create bindings in the parent command to the submenus commands.
+ setattr(_Cmd, 'do_' + self.command, enter_submenu)
+ setattr(_Cmd, 'complete_' + self.command, complete_submenu)
+
+ # Create additional bindings for aliases
+ for _alias in self.aliases:
+ setattr(_Cmd, 'do_' + _alias, enter_submenu)
+ setattr(_Cmd, 'complete_' + _alias, complete_submenu)
+ return _Cmd
+
+
+class Cmd(cmd.Cmd):
+ """An easy but powerful framework for writing line-oriented command interpreters.
+
+ Extends the Python Standard Library’s cmd package by adding a lot of useful features
+ to the out of the box configuration.
+
+ Line-oriented command interpreters are often useful for test harnesses, internal tools, and rapid prototypes.
+ """
+ # Attributes used to configure the ParserManager (all are not dynamically settable at runtime)
+ blankLinesAllowed = False
+ commentGrammars = pyparsing.Or([pyparsing.pythonStyleComment, pyparsing.cStyleComment])
+ commentInProgress = pyparsing.Literal('/*') + pyparsing.SkipTo(pyparsing.stringEnd ^ '*/')
+ legalChars = u'!#$%.:?@_-' + pyparsing.alphanums + pyparsing.alphas8bit
+ multilineCommands = []
+ prefixParser = pyparsing.Empty()
+ redirector = '>' # for sending output to file
+ shortcuts = {'?': 'help', '!': 'shell', '@': 'load', '@@': '_relative_load'}
+ aliases = dict()
+ terminators = [';'] # make sure your terminators are not in legalChars!
+
+ # Attributes which are NOT dynamically settable at runtime
+ allow_cli_args = True # Should arguments passed on the command-line be processed as commands?
+ allow_redirection = True # Should output redirection and pipes be allowed
+ default_to_shell = False # Attempt to run unrecognized commands as shell commands
+ quit_on_sigint = False # Quit the loop on interrupt instead of just resetting prompt
+ reserved_words = []
+
+ # Attributes which ARE dynamically settable at runtime
+ colors = (platform.system() != 'Windows')
+ continuation_prompt = '> '
+ debug = False
+ echo = False
+ editor = os.environ.get('EDITOR')
+ if not editor:
+ if sys.platform[:3] == 'win':
+ editor = 'notepad'
+ else:
+ # Favor command-line editors first so we don't leave the terminal to edit
+ for editor in ['vim', 'vi', 'emacs', 'nano', 'pico', 'gedit', 'kate', 'subl', 'geany', 'atom']:
+ if _which(editor):
+ break
+ feedback_to_output = False # Do not include nonessentials in >, | output by default (things like timing)
+ locals_in_py = True
+ quiet = False # Do not suppress nonessential output
+ timing = False # Prints elapsed time for each command
+
+ # To make an attribute settable with the "do_set" command, add it to this ...
+ # This starts out as a dictionary but gets converted to an OrderedDict sorted alphabetically by key
+ settable = {'colors': 'Colorized output (*nix only)',
+ 'continuation_prompt': 'On 2nd+ line of input',
+ 'debug': 'Show full error stack on error',
+ 'echo': 'Echo command issued into output',
+ 'editor': 'Program used by ``edit``',
+ 'feedback_to_output': 'Include nonessentials in `|`, `>` results',
+ 'locals_in_py': 'Allow access to your application in py via self',
+ 'prompt': 'The prompt issued to solicit input',
+ 'quiet': "Don't print nonessential feedback",
+ 'timing': 'Report execution times'}
+
+ def __init__(self, completekey='tab', stdin=None, stdout=None, persistent_history_file='',
+ persistent_history_length=1000, startup_script=None, use_ipython=False, transcript_files=None):
+ """An easy but powerful framework for writing line-oriented command interpreters, extends Python's cmd package.
+
+ :param completekey: str - (optional) readline name of a completion key, default to Tab
+ :param stdin: (optional) alternate input file object, if not specified, sys.stdin is used
+ :param stdout: (optional) alternate output file object, if not specified, sys.stdout is used
+ :param persistent_history_file: str - (optional) file path to load a persistent readline history from
+ :param persistent_history_length: int - (optional) max number of lines which will be written to the history file
+ :param startup_script: str - (optional) file path to a a script to load and execute at startup
+ :param use_ipython: (optional) should the "ipy" command be included for an embedded IPython shell
+ :param transcript_files: str - (optional) allows running transcript tests when allow_cli_args is False
+ """
+ # If use_ipython is False, make sure the do_ipy() method doesn't exit
+ if not use_ipython:
+ try:
+ del Cmd.do_ipy
+ except AttributeError:
+ pass
+
+ # If persistent readline history is enabled, then read history from file and register to write to file at exit
+ if persistent_history_file:
+ persistent_history_file = os.path.expanduser(persistent_history_file)
+ try:
+ readline.read_history_file(persistent_history_file)
+ # default history len is -1 (infinite), which may grow unruly
+ readline.set_history_length(persistent_history_length)
+ except FileNotFoundError:
+ pass
+ atexit.register(readline.write_history_file, persistent_history_file)
+
+ # Call super class constructor
+ super().__init__(completekey=completekey, stdin=stdin, stdout=stdout)
+
+ # Commands to exclude from the help menu and tab completion
+ self.hidden_commands = ['eof', 'eos', '_relative_load']
+
+ # Commands to exclude from the history command
+ self.exclude_from_history = '''history edit eof eos'''.split()
+
+ self._finalize_app_parameters()
+
+ self.initial_stdout = sys.stdout
+ self.history = History()
+ self.pystate = {}
+ self.keywords = self.reserved_words + [fname[3:] for fname in dir(self) if fname.startswith('do_')]
+ self.parser_manager = ParserManager(redirector=self.redirector, terminators=self.terminators,
+ multilineCommands=self.multilineCommands,
+ legalChars=self.legalChars, commentGrammars=self.commentGrammars,
+ commentInProgress=self.commentInProgress,
+ blankLinesAllowed=self.blankLinesAllowed, prefixParser=self.prefixParser,
+ preparse=self.preparse, postparse=self.postparse, aliases=self.aliases,
+ shortcuts=self.shortcuts)
+ self._transcript_files = transcript_files
+
+ # Used to enable the ability for a Python script to quit the application
+ self._should_quit = False
+
+ # True if running inside a Python script or interactive console, False otherwise
+ self._in_py = False
+
+ # Stores results from the last command run to enable usage of results in a Python script or interactive console
+ # Built-in commands don't make use of this. It is purely there for user-defined commands and convenience.
+ self._last_result = None
+
+ # Used to save state during a redirection
+ self.kept_state = None
+ self.kept_sys = None
+
+ # Codes used for exit conditions
+ self._STOP_AND_EXIT = True # cmd convention
+
+ self._colorcodes = {'bold': {True: '\x1b[1m', False: '\x1b[22m'},
+ 'cyan': {True: '\x1b[36m', False: '\x1b[39m'},
+ 'blue': {True: '\x1b[34m', False: '\x1b[39m'},
+ 'red': {True: '\x1b[31m', False: '\x1b[39m'},
+ 'magenta': {True: '\x1b[35m', False: '\x1b[39m'},
+ 'green': {True: '\x1b[32m', False: '\x1b[39m'},
+ 'underline': {True: '\x1b[4m', False: '\x1b[24m'},
+ 'yellow': {True: '\x1b[33m', False: '\x1b[39m'}}
+
+ # Used load command to store the current script dir as a LIFO queue to support _relative_load command
+ self._script_dir = []
+
+ # Used when piping command output to a shell command
+ self.pipe_proc = None
+
+ # Used by complete() for readline tab completion
+ self.completion_matches = []
+
+ # Used to keep track of whether we are redirecting or piping output
+ self.redirecting = False
+
+ # If this string is non-empty, then this warning message will print if a broken pipe error occurs while printing
+ self.broken_pipe_warning = ''
+
+ # If a startup script is provided, then add it in the queue to load
+ if startup_script is not None:
+ startup_script = os.path.expanduser(startup_script)
+ if os.path.exists(startup_script) and os.path.getsize(startup_script) > 0:
+ self.cmdqueue.append('load {}'.format(startup_script))
+
+ ############################################################################################################
+ # The following variables are used by tab-completion functions. They are reset each time complete() is run
+ # using set_completion_defaults() and it is up to completer functions to set them before returning results.
+ ############################################################################################################
+
+ # If true and a single match is returned to complete(), then a space will be appended
+ # if the match appears at the end of the line
+ self.allow_appended_space = True
+
+ # If true and a single match is returned to complete(), then a closing quote
+ # will be added if there is an unmatched opening quote
+ self.allow_closing_quote = True
+
+ # Use this list if you are completing strings that contain a common delimiter and you only want to
+ # display the final portion of the matches as the tab-completion suggestions. The full matches
+ # still must be returned from your completer function. For an example, look at path_complete()
+ # which uses this to show only the basename of paths as the suggestions. delimiter_complete() also
+ # populates this list.
+ self.display_matches = []
+
+ # ----- Methods related to presenting output to the user -----
+
+ @property
+ def visible_prompt(self):
+ """Read-only property to get the visible prompt with any ANSI escape codes stripped.
+
+ Used by transcript testing to make it easier and more reliable when users are doing things like coloring the
+ prompt using ANSI color codes.
+
+ :return: str - prompt stripped of any ANSI escape codes
+ """
+ return strip_ansi(self.prompt)
+
+ def _finalize_app_parameters(self):
+ self.commentGrammars.ignore(pyparsing.quotedString).setParseAction(lambda x: '')
+ # noinspection PyUnresolvedReferences
+ self.shortcuts = sorted(self.shortcuts.items(), reverse=True)
+
+ # Make sure settable parameters are sorted alphabetically by key
+ self.settable = collections.OrderedDict(sorted(self.settable.items(), key=lambda t: t[0]))
+
+ def poutput(self, msg, end='\n'):
+ """Convenient shortcut for self.stdout.write(); by default adds newline to end if not already present.
+
+ Also handles BrokenPipeError exceptions for when a commands's output has been piped to another process and
+ that process terminates before the cmd2 command is finished executing.
+
+ :param msg: str - message to print to current stdout - anything convertible to a str with '{}'.format() is OK
+ :param end: str - string appended after the end of the message if not already present, default a newline
+ """
+ if msg is not None and msg != '':
+ try:
+ msg_str = '{}'.format(msg)
+ self.stdout.write(msg_str)
+ if not msg_str.endswith(end):
+ self.stdout.write(end)
+ except BrokenPipeError:
+ # This occurs if a command's output is being piped to another process and that process closes before the
+ # command is finished. If you would like your application to print a warning message, then set the
+ # broken_pipe_warning attribute to the message you want printed.
+ if self.broken_pipe_warning:
+ sys.stderr.write(self.broken_pipe_warning)
+
+ def perror(self, errmsg, exception_type=None, traceback_war=True):
+ """ Print error message to sys.stderr and if debug is true, print an exception Traceback if one exists.
+
+ :param errmsg: str - error message to print out
+ :param exception_type: str - (optional) type of exception which precipitated this error message
+ :param traceback_war: bool - (optional) if True, print a message to let user know they can enable debug
+ :return:
+ """
+ if self.debug:
+ traceback.print_exc()
+
+ if exception_type is None:
+ err = self.colorize("ERROR: {}\n".format(errmsg), 'red')
+ sys.stderr.write(err)
+ else:
+ err = "EXCEPTION of type '{}' occurred with message: '{}'\n".format(exception_type, errmsg)
+ sys.stderr.write(self.colorize(err, 'red'))
+
+ if traceback_war:
+ war = "To enable full traceback, run the following command: 'set debug true'\n"
+ sys.stderr.write(self.colorize(war, 'yellow'))
+
+ def pfeedback(self, msg):
+ """For printing nonessential feedback. Can be silenced with `quiet`.
+ Inclusion in redirected output is controlled by `feedback_to_output`."""
+ if not self.quiet:
+ if self.feedback_to_output:
+ self.poutput(msg)
+ else:
+ sys.stderr.write("{}\n".format(msg))
+
+ def ppaged(self, msg, end='\n'):
+ """Print output using a pager if it would go off screen and stdout isn't currently being redirected.
+
+ Never uses a pager inside of a script (Python or text) or when output is being redirected or piped or when
+ stdout or stdin are not a fully functional terminal.
+
+ :param msg: str - message to print to current stdout - anything convertible to a str with '{}'.format() is OK
+ :param end: str - string appended after the end of the message if not already present, default a newline
+ """
+ if msg is not None and msg != '':
+ try:
+ msg_str = '{}'.format(msg)
+ if not msg_str.endswith(end):
+ msg_str += end
+
+ # Attempt to detect if we are not running within a fully functional terminal.
+ # Don't try to use the pager when being run by a continuous integration system like Jenkins + pexpect.
+ functional_terminal = False
+
+ if self.stdin.isatty() and self.stdout.isatty():
+ if sys.platform.startswith('win') or os.environ.get('TERM') is not None:
+ functional_terminal = True
+
+ # Don't attempt to use a pager that can block if redirecting or running a script (either text or Python)
+ # Also only attempt to use a pager if actually running in a real fully functional terminal
+ if functional_terminal and not self.redirecting and not self._in_py and not self._script_dir:
+
+ if sys.platform.startswith('win'):
+ pager_cmd = 'more'
+ else:
+ # Here is the meaning of the various flags we are using with the less command:
+ # -S causes lines longer than the screen width to be chopped (truncated) rather than wrapped
+ # -R causes ANSI "color" escape sequences to be output in raw form (i.e. colors are displayed)
+ # -X disables sending the termcap initialization and deinitialization strings to the terminal
+ # -F causes less to automatically exit if the entire file can be displayed on the first screen
+ pager_cmd = 'less -SRXF'
+ self.pipe_proc = subprocess.Popen(pager_cmd, shell=True, stdin=subprocess.PIPE)
+ try:
+ self.pipe_proc.stdin.write(msg_str.encode('utf-8', 'replace'))
+ self.pipe_proc.stdin.close()
+ except (IOError, KeyboardInterrupt):
+ pass
+
+ # Less doesn't respect ^C, but catches it for its own UI purposes (aborting search etc. inside less)
+ while True:
+ try:
+ self.pipe_proc.wait()
+ except KeyboardInterrupt:
+ pass
+ else:
+ break
+ self.pipe_proc = None
+ else:
+ self.stdout.write(msg_str)
+ except BrokenPipeError:
+ # This occurs if a command's output is being piped to another process and that process closes before the
+ # command is finished. If you would like your application to print a warning message, then set the
+ # broken_pipe_warning attribute to the message you want printed.
+ if self.broken_pipe_warning:
+ sys.stderr.write(self.broken_pipe_warning)
+
+ def colorize(self, val, color):
+ """Given a string (``val``), returns that string wrapped in UNIX-style
+ special characters that turn on (and then off) text color and style.
+ If the ``colors`` environment parameter is ``False``, or the application
+ is running on Windows, will return ``val`` unchanged.
+ ``color`` should be one of the supported strings (or styles):
+ red/blue/green/cyan/magenta, bold, underline"""
+ if self.colors and (self.stdout == self.initial_stdout):
+ return self._colorcodes[color][True] + val + self._colorcodes[color][False]
+ return val
+
+ def get_subcommands(self, command):
+ """
+ Returns a list of a command's subcommand names if they exist
+ :param command: the command we are querying
+ :return: A subcommand list or None
+ """
+
+ subcommand_names = None
+
+ # Check if is a valid command
+ funcname = self._func_named(command)
+
+ if funcname:
+ # Check to see if this function was decorated with an argparse ArgumentParser
+ func = getattr(self, funcname)
+ subcommands = func.__dict__.get('subcommands', None)
+ if subcommands is not None:
+ subcommand_names = subcommands.keys()
+
+ return subcommand_names
+
+ def get_subcommand_completer(self, command, subcommand):
+ """
+ Returns a subcommand's tab completion function if one exists
+ :param command: command which owns the subcommand
+ :param subcommand: the subcommand we are querying
+ :return: A completer or None
+ """
+
+ completer = None
+
+ # Check if is a valid command
+ funcname = self._func_named(command)
+
+ if funcname:
+ # Check to see if this function was decorated with an argparse ArgumentParser
+ func = getattr(self, funcname)
+ subcommands = func.__dict__.get('subcommands', None)
+ if subcommands is not None:
+ completer = subcommands[subcommand]
+
+ return completer
+
+ # ----- Methods related to tab completion -----
+
+ def set_completion_defaults(self):
+ """
+ Resets tab completion settings
+ Needs to be called each time readline runs tab completion
+ """
+ self.allow_appended_space = True
+ self.allow_closing_quote = True
+ self.display_matches = []
+
+ def tokens_for_completion(self, line, begidx, endidx):
+ """
+ Used by tab completion functions to get all tokens through the one being completed
+ :param line: str - the current input line with leading whitespace removed
+ :param begidx: int - the beginning index of the prefix text
+ :param endidx: int - the ending index of the prefix text
+ :return: A 2 item tuple where the items are
+ On Success
+ tokens: list of unquoted tokens
+ this is generally the list needed for tab completion functions
+ raw_tokens: list of tokens with any quotes preserved
+ this can be used to know if a token was quoted or is missing a closing quote
+
+ Both lists are guaranteed to have at least 1 item
+ The last item in both lists is the token being tab completed
+
+ On Failure
+ Both items are None
+ """
+ unclosed_quote = ''
+ quotes_to_try = copy.copy(QUOTES)
+
+ tmp_line = line[:endidx]
+ tmp_endidx = endidx
+
+ # Parse the line into tokens
+ while True:
+ try:
+ # Use non-POSIX parsing to keep the quotes around the tokens
+ initial_tokens = shlex.split(tmp_line[:tmp_endidx], posix=False)
+
+ # If the cursor is at an empty token outside of a quoted string,
+ # then that is the token being completed. Add it to the list.
+ if not unclosed_quote and begidx == tmp_endidx:
+ initial_tokens.append('')
+ break
+ except ValueError:
+ # ValueError can be caused by missing closing quote
+ if not quotes_to_try:
+ # Since we have no more quotes to try, something else
+ # is causing the parsing error. Return None since
+ # this means the line is malformed.
+ return None, None
+
+ # Add a closing quote and try to parse again
+ unclosed_quote = quotes_to_try[0]
+ quotes_to_try = quotes_to_try[1:]
+
+ tmp_line = line[:endidx]
+ tmp_line += unclosed_quote
+ tmp_endidx = endidx + 1
+
+ if self.allow_redirection:
+
+ # Since redirection is enabled, we need to treat redirection characters (|, <, >)
+ # as word breaks when they are in unquoted strings. Go through each token
+ # and further split them on these characters. Each run of redirect characters
+ # is treated as a single token.
+ raw_tokens = []
+
+ for cur_initial_token in initial_tokens:
+
+ # Save tokens up to 1 character in length or quoted tokens. No need to parse these.
+ if len(cur_initial_token) <= 1 or cur_initial_token[0] in QUOTES:
+ raw_tokens.append(cur_initial_token)
+ continue
+
+ # Iterate over each character in this token
+ cur_index = 0
+ cur_char = cur_initial_token[cur_index]
+
+ # Keep track of the token we are building
+ cur_raw_token = ''
+
+ while True:
+ if cur_char not in REDIRECTION_CHARS:
+
+ # Keep appending to cur_raw_token until we hit a redirect char
+ while cur_char not in REDIRECTION_CHARS:
+ cur_raw_token += cur_char
+ cur_index += 1
+ if cur_index < len(cur_initial_token):
+ cur_char = cur_initial_token[cur_index]
+ else:
+ break
+
+ else:
+ redirect_char = cur_char
+
+ # Keep appending to cur_raw_token until we hit something other than redirect_char
+ while cur_char == redirect_char:
+ cur_raw_token += cur_char
+ cur_index += 1
+ if cur_index < len(cur_initial_token):
+ cur_char = cur_initial_token[cur_index]
+ else:
+ break
+
+ # Save the current token
+ raw_tokens.append(cur_raw_token)
+ cur_raw_token = ''
+
+ # Check if we've viewed all characters
+ if cur_index >= len(cur_initial_token):
+ break
+ else:
+ raw_tokens = initial_tokens
+
+ # Save the unquoted tokens
+ tokens = [strip_quotes(cur_token) for cur_token in raw_tokens]
+
+ # If the token being completed had an unclosed quote, we need
+ # to remove the closing quote that was added in order for it
+ # to match what was on the command line.
+ if unclosed_quote:
+ raw_tokens[-1] = raw_tokens[-1][:-1]
+
+ return tokens, raw_tokens
+
+ # noinspection PyUnusedLocal
+ @staticmethod
+ def basic_complete(text, line, begidx, endidx, match_against):
+ """
+ Performs tab completion against a list
+
+ :param text: str - the string prefix we are attempting to match (all returned matches must begin with it)
+ :param line: str - the current input line with leading whitespace removed
+ :param begidx: int - the beginning index of the prefix text
+ :param endidx: int - the ending index of the prefix text
+ :param match_against: Collection - the list being matched against
+ :return: List[str] - a list of possible tab completions
+ """
+ return [cur_match for cur_match in match_against if cur_match.startswith(text)]
+
+ def delimiter_complete(self, text, line, begidx, endidx, match_against, delimiter):
+ """
+ Performs tab completion against a list but each match is split on a delimiter and only
+ the portion of the match being tab completed is shown as the completion suggestions.
+ This is useful if you match against strings that are hierarchical in nature and have a
+ common delimiter.
+
+ An easy way to illustrate this concept is path completion since paths are just directories/files
+ delimited by a slash. If you are tab completing items in /home/user you don't get the following
+ as suggestions:
+
+ /home/user/file.txt /home/user/program.c
+ /home/user/maps/ /home/user/cmd2.py
+
+ Instead you are shown:
+
+ file.txt program.c
+ maps/ cmd2.py
+
+ For a large set of data, this can be visually more pleasing and easier to search.
+
+ Another example would be strings formatted with the following syntax: company::department::name
+ In this case the delimiter would be :: and the user could easily narrow down what they are looking
+ for if they were only shown suggestions in the category they are at in the string.
+
+ :param text: str - the string prefix we are attempting to match (all returned matches must begin with it)
+ :param line: str - the current input line with leading whitespace removed
+ :param begidx: int - the beginning index of the prefix text
+ :param endidx: int - the ending index of the prefix text
+ :param match_against: Collection - the list being matched against
+ :param delimiter: str - what delimits each portion of the matches (ex: paths are delimited by a slash)
+ :return: List[str] - a list of possible tab completions
+ """
+ matches = self.basic_complete(text, line, begidx, endidx, match_against)
+
+ # Display only the portion of the match that's being completed based on delimiter
+ if matches:
+
+ # Get the common beginning for the matches
+ common_prefix = os.path.commonprefix(matches)
+ prefix_tokens = common_prefix.split(delimiter)
+
+ # Calculate what portion of the match we are completing
+ display_token_index = 0
+ if prefix_tokens:
+ display_token_index = len(prefix_tokens) - 1
+
+ # Get this portion for each match and store them in self.display_matches
+ for cur_match in matches:
+ match_tokens = cur_match.split(delimiter)
+ display_token = match_tokens[display_token_index]
+
+ if not display_token:
+ display_token = delimiter
+ self.display_matches.append(display_token)
+
+ return matches
+
+ def flag_based_complete(self, text, line, begidx, endidx, flag_dict, all_else=None):
+ """
+ Tab completes based on a particular flag preceding the token being completed
+ :param text: str - the string prefix we are attempting to match (all returned matches must begin with it)
+ :param line: str - the current input line with leading whitespace removed
+ :param begidx: int - the beginning index of the prefix text
+ :param endidx: int - the ending index of the prefix text
+ :param flag_dict: dict - dictionary whose structure is the following:
+ keys - flags (ex: -c, --create) that result in tab completion for the next
+ argument in the command line
+ values - there are two types of values
+ 1. iterable list of strings to match against (dictionaries, lists, etc.)
+ 2. function that performs tab completion (ex: path_complete)
+ :param all_else: Collection or function - an optional parameter for tab completing any token that isn't preceded
+ by a flag in flag_dict
+ :return: List[str] - a list of possible tab completions
+ """
+ # Get all tokens through the one being completed
+ tokens, _ = self.tokens_for_completion(line, begidx, endidx)
+ if tokens is None:
+ return []
+
+ completions_matches = []
+ match_against = all_else
+
+ # Must have at least 2 args for a flag to precede the token being completed
+ if len(tokens) > 1:
+ flag = tokens[-2]
+ if flag in flag_dict:
+ match_against = flag_dict[flag]
+
+ # Perform tab completion using a Collection
+ if isinstance(match_against, Collection):
+ completions_matches = self.basic_complete(text, line, begidx, endidx, match_against)
+
+ # Perform tab completion using a function
+ elif callable(match_against):
+ completions_matches = match_against(text, line, begidx, endidx)
+
+ return completions_matches
+
+ def index_based_complete(self, text, line, begidx, endidx, index_dict, all_else=None):
+ """
+ Tab completes based on a fixed position in the input string
+ :param text: str - the string prefix we are attempting to match (all returned matches must begin with it)
+ :param line: str - the current input line with leading whitespace removed
+ :param begidx: int - the beginning index of the prefix text
+ :param endidx: int - the ending index of the prefix text
+ :param index_dict: dict - dictionary whose structure is the following:
+ keys - 0-based token indexes into command line that determine which tokens
+ perform tab completion
+ values - there are two types of values
+ 1. iterable list of strings to match against (dictionaries, lists, etc.)
+ 2. function that performs tab completion (ex: path_complete)
+ :param all_else: Collection or function - an optional parameter for tab completing any token that isn't at an
+ index in index_dict
+ :return: List[str] - a list of possible tab completions
+ """
+ # Get all tokens through the one being completed
+ tokens, _ = self.tokens_for_completion(line, begidx, endidx)
+ if tokens is None:
+ return []
+
+ matches = []
+
+ # Get the index of the token being completed
+ index = len(tokens) - 1
+
+ # Check if token is at an index in the dictionary
+ if index in index_dict:
+ match_against = index_dict[index]
+ else:
+ match_against = all_else
+
+ # Perform tab completion using a Collection
+ if isinstance(match_against, Collection):
+ matches = self.basic_complete(text, line, begidx, endidx, match_against)
+
+ # Perform tab completion using a function
+ elif callable(match_against):
+ matches = match_against(text, line, begidx, endidx)
+
+ return matches
+
+ # noinspection PyUnusedLocal
+ def path_complete(self, text, line, begidx, endidx, dir_exe_only=False, dir_only=False):
+ """Performs completion of local file system paths
+
+ :param text: str - the string prefix we are attempting to match (all returned matches must begin with it)
+ :param line: str - the current input line with leading whitespace removed
+ :param begidx: int - the beginning index of the prefix text
+ :param endidx: int - the ending index of the prefix text
+ :param dir_exe_only: bool - only return directories and executables, not non-executable files
+ :param dir_only: bool - only return directories
+ :return: List[str] - a list of possible tab completions
+ """
+
+ # Used to complete ~ and ~user strings
+ def complete_users():
+
+ # We are returning ~user strings that resolve to directories,
+ # so don't append a space or quote in the case of a single result.
+ self.allow_appended_space = False
+ self.allow_closing_quote = False
+
+ users = []
+
+ # Windows lacks the pwd module so we can't get a list of users.
+ # Instead we will add a slash once the user enters text that
+ # resolves to an existing home directory.
+ if sys.platform.startswith('win'):
+ expanded_path = os.path.expanduser(text)
+ if os.path.isdir(expanded_path):
+ users.append(text + os.path.sep)
+ else:
+ import pwd
+
+ # Iterate through a list of users from the password database
+ for cur_pw in pwd.getpwall():
+
+ # Check if the user has an existing home dir
+ if os.path.isdir(cur_pw.pw_dir):
+
+ # Add a ~ to the user to match against text
+ cur_user = '~' + cur_pw.pw_name
+ if cur_user.startswith(text):
+ if add_trailing_sep_if_dir:
+ cur_user += os.path.sep
+ users.append(cur_user)
+
+ return users
+
+ # Determine if a trailing separator should be appended to directory completions
+ add_trailing_sep_if_dir = False
+ if endidx == len(line) or (endidx < len(line) and line[endidx] != os.path.sep):
+ add_trailing_sep_if_dir = True
+
+ # Used to replace cwd in the final results
+ cwd = os.getcwd()
+ cwd_added = False
+
+ # Used to replace expanded user path in final result
+ orig_tilde_path = ''
+ expanded_tilde_path = ''
+
+ # If the search text is blank, then search in the CWD for *
+ if not text:
+ search_str = os.path.join(os.getcwd(), '*')
+ cwd_added = True
+ else:
+ # Purposely don't match any path containing wildcards - what we are doing is complicated enough!
+ wildcards = ['*', '?']
+ for wildcard in wildcards:
+ if wildcard in text:
+ return []
+
+ # Start the search string
+ search_str = text + '*'
+
+ # Handle tilde expansion and completion
+ if text.startswith('~'):
+ sep_index = text.find(os.path.sep, 1)
+
+ # If there is no slash, then the user is still completing the user after the tilde
+ if sep_index == -1:
+ return complete_users()
+
+ # Otherwise expand the user dir
+ else:
+ search_str = os.path.expanduser(search_str)
+
+ # Get what we need to restore the original tilde path later
+ orig_tilde_path = text[:sep_index]
+ expanded_tilde_path = os.path.expanduser(orig_tilde_path)
+
+ # If the search text does not have a directory, then use the cwd
+ elif not os.path.dirname(text):
+ search_str = os.path.join(os.getcwd(), search_str)
+ cwd_added = True
+
+ # Find all matching path completions
+ matches = glob.glob(search_str)
+
+ # Filter based on type
+ if dir_exe_only:
+ matches = [c for c in matches if os.path.isdir(c) or os.access(c, os.X_OK)]
+ elif dir_only:
+ matches = [c for c in matches if os.path.isdir(c)]
+
+ # Don't append a space or closing quote to directory
+ if len(matches) == 1 and os.path.isdir(matches[0]):
+ self.allow_appended_space = False
+ self.allow_closing_quote = False
+
+ # Build display_matches and add a slash to directories
+ for index, cur_match in enumerate(matches):
+
+ # Display only the basename of this path in the tab-completion suggestions
+ self.display_matches.append(os.path.basename(cur_match))
+
+ # Add a separator after directories if the next character isn't already a separator
+ if os.path.isdir(cur_match) and add_trailing_sep_if_dir:
+ matches[index] += os.path.sep
+ self.display_matches[index] += os.path.sep
+
+ # Remove cwd if it was added to match the text readline expects
+ if cwd_added:
+ matches = [cur_path.replace(cwd + os.path.sep, '', 1) for cur_path in matches]
+
+ # Restore the tilde string if we expanded one to match the text readline expects
+ if expanded_tilde_path:
+ matches = [cur_path.replace(expanded_tilde_path, orig_tilde_path, 1) for cur_path in matches]
+
+ return matches
+
+ @staticmethod
+ def get_exes_in_path(starts_with):
+ """
+ Returns names of executables in a user's path
+ :param starts_with: str - what the exes should start with. leave blank for all exes in path.
+ :return: List[str] - a list of matching exe names
+ """
+ # Purposely don't match any executable containing wildcards
+ wildcards = ['*', '?']
+ for wildcard in wildcards:
+ if wildcard in starts_with:
+ return []
+
+ # Get a list of every directory in the PATH environment variable and ignore symbolic links
+ paths = [p for p in os.getenv('PATH').split(os.path.pathsep) if not os.path.islink(p)]
+
+ # Use a set to store exe names since there can be duplicates
+ exes_set = set()
+
+ # Find every executable file in the user's path that matches the pattern
+ for path in paths:
+ full_path = os.path.join(path, starts_with)
+ matches = [f for f in glob.glob(full_path + '*') if os.path.isfile(f) and os.access(f, os.X_OK)]
+
+ for match in matches:
+ exes_set.add(os.path.basename(match))
+
+ return list(exes_set)
+
+ def shell_cmd_complete(self, text, line, begidx, endidx, complete_blank=False):
+ """Performs completion of executables either in a user's path or a given path
+ :param text: str - the string prefix we are attempting to match (all returned matches must begin with it)
+ :param line: str - the current input line with leading whitespace removed
+ :param begidx: int - the beginning index of the prefix text
+ :param endidx: int - the ending index of the prefix text
+ :param complete_blank: bool - If True, then a blank will complete all shell commands in a user's path
+ If False, then no completion is performed
+ Defaults to False to match Bash shell behavior
+ :return: List[str] - a list of possible tab completions
+ """
+ # Don't tab complete anything if no shell command has been started
+ if not complete_blank and not text:
+ return []
+
+ # If there are no path characters in the search text, then do shell command completion in the user's path
+ if not text.startswith('~') and os.path.sep not in text:
+ return self.get_exes_in_path(text)
+
+ # Otherwise look for executables in the given path
+ else:
+ return self.path_complete(text, line, begidx, endidx, dir_exe_only=True)
+
+ def _redirect_complete(self, text, line, begidx, endidx, compfunc):
+ """
+ Called by complete() as the first tab completion function for all commands
+ It determines if it should tab complete for redirection (|, <, >, >>) or use the
+ completer function for the current command
+
+ :param text: str - the string prefix we are attempting to match (all returned matches must begin with it)
+ :param line: str - the current input line with leading whitespace removed
+ :param begidx: int - the beginning index of the prefix text
+ :param endidx: int - the ending index of the prefix text
+ :param compfunc: Callable - the completer function for the current command
+ this will be called if we aren't completing for redirection
+ :return: List[str] - a list of possible tab completions
+ """
+ if self.allow_redirection:
+
+ # Get all tokens through the one being completed. We want the raw tokens
+ # so we can tell if redirection strings are quoted and ignore them.
+ _, raw_tokens = self.tokens_for_completion(line, begidx, endidx)
+ if raw_tokens is None:
+ return []
+
+ if len(raw_tokens) > 1:
+
+ # Build a list of all redirection tokens
+ all_redirects = REDIRECTION_CHARS + ['>>']
+
+ # Check if there are redirection strings prior to the token being completed
+ seen_pipe = False
+ has_redirection = False
+
+ for cur_token in raw_tokens[:-1]:
+ if cur_token in all_redirects:
+ has_redirection = True
+
+ if cur_token == '|':
+ seen_pipe = True
+
+ # Get token prior to the one being completed
+ prior_token = raw_tokens[-2]
+
+ # If a pipe is right before the token being completed, complete a shell command as the piped process
+ if prior_token == '|':
+ return self.shell_cmd_complete(text, line, begidx, endidx)
+
+ # Otherwise do path completion either as files to redirectors or arguments to the piped process
+ elif prior_token in all_redirects or seen_pipe:
+ return self.path_complete(text, line, begidx, endidx)
+
+ # If there were redirection strings anywhere on the command line, then we
+ # are no longer tab completing for the current command
+ elif has_redirection:
+ return []
+
+ # Call the command's completer function
+ return compfunc(text, line, begidx, endidx)
+
+ @staticmethod
+ def _pad_matches_to_display(matches_to_display):
+ """
+ Adds padding to the matches being displayed as tab completion suggestions.
+ The default padding of readline/pyreadine is small and not visually appealing
+ especially if matches have spaces. It appears very squished together.
+
+ :param matches_to_display: the matches being padded
+ :return: the padded matches and length of padding that was added
+ """
+ if rl_type == RlType.GNU:
+ # Add 2 to the padding of 2 that readline uses for a total of 4.
+ padding = 2 * ' '
+
+ elif rl_type == RlType.PYREADLINE:
+ # Add 3 to the padding of 1 that pyreadline uses for a total of 4.
+ padding = 3 * ' '
+
+ else:
+ return matches_to_display, 0
+
+ return [cur_match + padding for cur_match in matches_to_display], len(padding)
+
+ def _display_matches_gnu_readline(self, substitution, matches, longest_match_length):
+ """
+ Prints a match list using GNU readline's rl_display_match_list()
+ This exists to print self.display_matches if it has data. Otherwise matches prints.
+
+ :param substitution: str - the substitution written to the command line
+ :param matches: list[str] - the tab completion matches to display
+ :param longest_match_length: int - longest printed length of the matches
+ """
+ if rl_type == RlType.GNU:
+
+ # Check if we should show display_matches
+ if self.display_matches:
+ matches_to_display = self.display_matches
+
+ # Recalculate longest_match_length for display_matches
+ longest_match_length = 0
+
+ for cur_match in matches_to_display:
+ cur_length = wcswidth(cur_match)
+ if cur_length > longest_match_length:
+ longest_match_length = cur_length
+ else:
+ matches_to_display = matches
+
+ # Add padding for visual appeal
+ matches_to_display, padding_length = self._pad_matches_to_display(matches_to_display)
+ longest_match_length += padding_length
+
+ # We will use readline's display function (rl_display_match_list()), so we
+ # need to encode our string as bytes to place in a C array.
+ encoded_substitution = bytes(substitution, encoding='utf-8')
+ encoded_matches = [bytes(cur_match, encoding='utf-8') for cur_match in matches_to_display]
+
+ # rl_display_match_list() expects matches to be in argv format where
+ # substitution is the first element, followed by the matches, and then a NULL.
+ # noinspection PyCallingNonCallable,PyTypeChecker
+ strings_array = (ctypes.c_char_p * (1 + len(encoded_matches) + 1))()
+
+ # Copy in the encoded strings and add a NULL to the end
+ strings_array[0] = encoded_substitution
+ strings_array[1:-1] = encoded_matches
+ strings_array[-1] = None
+
+ # Call readline's display function
+ # rl_display_match_list(strings_array, number of completion matches, longest match length)
+ readline_lib.rl_display_match_list(strings_array, len(encoded_matches), longest_match_length)
+
+ # rl_forced_update_display() is the proper way to redraw the prompt and line, but we
+ # have to use ctypes to do it since Python's readline API does not wrap the function
+ readline_lib.rl_forced_update_display()
+
+ # Since we updated the display, readline asks that rl_display_fixed be set for efficiency
+ display_fixed = ctypes.c_int.in_dll(readline_lib, "rl_display_fixed")
+ display_fixed.value = 1
+
+ def _display_matches_pyreadline(self, matches):
+ """
+ Prints a match list using pyreadline's _display_completions()
+ This exists to print self.display_matches if it has data. Otherwise matches prints.
+
+ :param matches: list[str] - the tab completion matches to display
+ """
+ if rl_type == RlType.PYREADLINE:
+
+ # Check if we should show display_matches
+ if self.display_matches:
+ matches_to_display = self.display_matches
+ else:
+ matches_to_display = matches
+
+ # Add padding for visual appeal
+ matches_to_display, _ = self._pad_matches_to_display(matches_to_display)
+
+ # Display the matches
+ orig_pyreadline_display(matches_to_display)
+
+ # ----- Methods which override stuff in cmd -----
+
+ def complete(self, text, state):
+ """Override of command method which returns the next possible completion for 'text'.
+
+ If a command has not been entered, then complete against command list.
+ Otherwise try to call complete_<command> to get list of completions.
+
+ This method gets called directly by readline because it is set as the tab-completion function.
+
+ This completer function is called as complete(text, state), for state in 0, 1, 2, …, until it returns a
+ non-string value. It should return the next possible completion starting with text.
+
+ :param text: str - the current word that user is typing
+ :param state: int - non-negative integer
+ """
+ if state == 0:
+ unclosed_quote = ''
+ self.set_completion_defaults()
+
+ # lstrip the original line
+ orig_line = readline.get_line_buffer()
+ line = orig_line.lstrip()
+ stripped = len(orig_line) - len(line)
+
+ # Calculate new indexes for the stripped line. If the cursor is at a position before the end of a
+ # line of spaces, then the following math could result in negative indexes. Enforce a max of 0.
+ begidx = max(readline.get_begidx() - stripped, 0)
+ endidx = max(readline.get_endidx() - stripped, 0)
+
+ # Shortcuts are not word break characters when tab completing. Therefore shortcuts become part
+ # of the text variable if there isn't a word break, like a space, after it. We need to remove it
+ # from text and update the indexes. This only applies if we are at the the beginning of the line.
+ shortcut_to_restore = ''
+ if begidx == 0:
+ for (shortcut, expansion) in self.shortcuts:
+ if text.startswith(shortcut):
+ # Save the shortcut to restore later
+ shortcut_to_restore = shortcut
+
+ # Adjust text and where it begins
+ text = text[len(shortcut_to_restore):]
+ begidx += len(shortcut_to_restore)
+ break
+
+ # If begidx is greater than 0, then we are no longer completing the command
+ if begidx > 0:
+
+ # Parse the command line
+ command, args, expanded_line = self.parseline(line)
+
+ # We overwrote line with a properly formatted but fully stripped version
+ # Restore the end spaces since line is only supposed to be lstripped when
+ # passed to completer functions according to Python docs
+ rstripped_len = len(line) - len(line.rstrip())
+ expanded_line += ' ' * rstripped_len
+
+ # Fix the index values if expanded_line has a different size than line
+ if len(expanded_line) != len(line):
+ diff = len(expanded_line) - len(line)
+ begidx += diff
+ endidx += diff
+
+ # Overwrite line to pass into completers
+ line = expanded_line
+
+ # Get all tokens through the one being completed
+ tokens, raw_tokens = self.tokens_for_completion(line, begidx, endidx)
+
+ # Either had a parsing error or are trying to complete the command token
+ # The latter can happen if default_to_shell is True and parseline() allowed
+ # assumed something like " or ' was a command.
+ if tokens is None or len(tokens) == 1:
+ self.completion_matches = []
+ return None
+
+ # Text we need to remove from completions later
+ text_to_remove = ''
+
+ # Get the token being completed with any opening quote preserved
+ raw_completion_token = raw_tokens[-1]
+
+ # Check if the token being completed has an opening quote
+ if raw_completion_token and raw_completion_token[0] in QUOTES:
+
+ # Since the token is still being completed, we know the opening quote is unclosed
+ unclosed_quote = raw_completion_token[0]
+
+ # readline still performs word breaks after a quote. Therefore something like quoted search
+ # text with a space would have resulted in begidx pointing to the middle of the token we
+ # we want to complete. Figure out where that token actually begins and save the beginning
+ # portion of it that was not part of the text readline gave us. We will remove it from the
+ # completions later since readline expects them to start with the original text.
+ actual_begidx = line[:endidx].rfind(tokens[-1])
+
+ if actual_begidx != begidx:
+ text_to_remove = line[actual_begidx:begidx]
+
+ # Adjust text and where it begins so the completer routines
+ # get unbroken search text to complete on.
+ text = text_to_remove + text
+ begidx = actual_begidx
+
+ # Check if a valid command was entered
+ if command in self.get_all_commands():
+ # Get the completer function for this command
+ try:
+ compfunc = getattr(self, 'complete_' + command)
+ except AttributeError:
+ compfunc = self.completedefault
+
+ subcommands = self.get_subcommands(command)
+ if subcommands is not None:
+ # Since there are subcommands, then try completing those if the cursor is in
+ # the token at index 1, otherwise default to using compfunc
+ index_dict = {1: subcommands}
+ compfunc = functools.partial(self.index_based_complete,
+ index_dict=index_dict,
+ all_else=compfunc)
+
+ # A valid command was not entered
+ else:
+ # Check if this command should be run as a shell command
+ if self.default_to_shell and command in self.get_exes_in_path(command):
+ compfunc = self.path_complete
+ else:
+ compfunc = self.completedefault
+
+ # Attempt tab completion for redirection first, and if that isn't occurring,
+ # call the completer function for the current command
+ self.completion_matches = self._redirect_complete(text, line, begidx, endidx, compfunc)
+
+ if self.completion_matches:
+
+ # Eliminate duplicates
+ matches_set = set(self.completion_matches)
+ self.completion_matches = list(matches_set)
+
+ display_matches_set = set(self.display_matches)
+ self.display_matches = list(display_matches_set)
+
+ # Check if display_matches has been used. If so, then matches
+ # on delimited strings like paths was done.
+ if self.display_matches:
+ matches_delimited = True
+ else:
+ matches_delimited = False
+
+ # Since self.display_matches is empty, set it to self.completion_matches
+ # before we alter them. That way the suggestions will reflect how we parsed
+ # the token being completed and not how readline did.
+ self.display_matches = copy.copy(self.completion_matches)
+
+ # Check if we need to add an opening quote
+ if not unclosed_quote:
+
+ add_quote = False
+
+ # This is the tab completion text that will appear on the command line.
+ common_prefix = os.path.commonprefix(self.completion_matches)
+
+ if matches_delimited:
+ # Check if any portion of the display matches appears in the tab completion
+ display_prefix = os.path.commonprefix(self.display_matches)
+
+ # For delimited matches, we check what appears before the display
+ # matches (common_prefix) as well as the display matches themselves.
+ if (' ' in common_prefix) or (display_prefix and ' ' in ''.join(self.display_matches)):
+ add_quote = True
+
+ # If there is a tab completion and any match has a space, then add an opening quote
+ elif common_prefix and ' ' in ''.join(self.completion_matches):
+ add_quote = True
+
+ if add_quote:
+ # Figure out what kind of quote to add and save it as the unclosed_quote
+ if '"' in ''.join(self.completion_matches):
+ unclosed_quote = "'"
+ else:
+ unclosed_quote = '"'
+
+ self.completion_matches = [unclosed_quote + match for match in self.completion_matches]
+
+ # Check if we need to remove text from the beginning of tab completions
+ elif text_to_remove:
+ self.completion_matches = \
+ [m.replace(text_to_remove, '', 1) for m in self.completion_matches]
+
+ # Check if we need to restore a shortcut in the tab completions
+ # so it doesn't get erased from the command line
+ if shortcut_to_restore:
+ self.completion_matches = \
+ [shortcut_to_restore + match for match in self.completion_matches]
+
+ else:
+ # Complete token against aliases and command names
+ alias_names = set(self.aliases.keys())
+ visible_commands = set(self.get_visible_commands())
+ strs_to_match = list(alias_names | visible_commands)
+ self.completion_matches = self.basic_complete(text, line, begidx, endidx, strs_to_match)
+
+ # Handle single result
+ if len(self.completion_matches) == 1:
+ str_to_append = ''
+
+ # Add a closing quote if needed and allowed
+ if self.allow_closing_quote and unclosed_quote:
+ str_to_append += unclosed_quote
+
+ # If we are at the end of the line, then add a space if allowed
+ if self.allow_appended_space and endidx == len(line):
+ str_to_append += ' '
+
+ self.completion_matches[0] += str_to_append
+
+ # Otherwise sort matches
+ elif self.completion_matches:
+ self.completion_matches.sort()
+ self.display_matches.sort()
+
+ try:
+ return self.completion_matches[state]
+ except IndexError:
+ return None
+
+ def get_all_commands(self):
+ """
+ Returns a list of all commands
+ """
+ return [cur_name[3:] for cur_name in self.get_names() if cur_name.startswith('do_')]
+
+ def get_visible_commands(self):
+ """
+ Returns a list of commands that have not been hidden
+ """
+ commands = self.get_all_commands()
+
+ # Remove the hidden commands
+ for name in self.hidden_commands:
+ if name in commands:
+ commands.remove(name)
+
+ return commands
+
+ def get_help_topics(self):
+ """ Returns a list of help topics """
+ return [name[5:] for name in self.get_names() if name.startswith('help_')]
+
+ def complete_help(self, text, line, begidx, endidx):
+ """
+ Override of parent class method to handle tab completing subcommands and not showing hidden commands
+ Returns a list of possible tab completions
+ """
+
+ # The command is the token at index 1 in the command line
+ cmd_index = 1
+
+ # The subcommand is the token at index 2 in the command line
+ subcmd_index = 2
+
+ # Get all tokens through the one being completed
+ tokens, _ = self.tokens_for_completion(line, begidx, endidx)
+ if tokens is None:
+ return []
+
+ matches = []
+
+ # Get the index of the token being completed
+ index = len(tokens) - 1
+
+ # Check if we are completing a command or help topic
+ if index == cmd_index:
+
+ # Complete token against topics and visible commands
+ topics = set(self.get_help_topics())
+ visible_commands = set(self.get_visible_commands())
+ strs_to_match = list(topics | visible_commands)
+ matches = self.basic_complete(text, line, begidx, endidx, strs_to_match)
+
+ # Check if we are completing a subcommand
+ elif index == subcmd_index:
+
+ # Match subcommands if any exist
+ command = tokens[cmd_index]
+ matches = self.basic_complete(text, line, begidx, endidx, self.get_subcommands(command))
+
+ return matches
+
+ # noinspection PyUnusedLocal
+ def sigint_handler(self, signum, frame):
+ """Signal handler for SIGINTs which typically come from Ctrl-C events.
+
+ If you need custom SIGINT behavior, then override this function.
+
+ :param signum: int - signal number
+ :param frame
+ """
+
+ # Save copy of pipe_proc since it could theoretically change while this is running
+ pipe_proc = self.pipe_proc
+
+ if pipe_proc is not None:
+ pipe_proc.terminate()
+
+ # Re-raise a KeyboardInterrupt so other parts of the code can catch it
+ raise KeyboardInterrupt("Got a keyboard interrupt")
+
+ def preloop(self):
+ """"Hook method executed once when the cmdloop() method is called."""
+
+ # Register a default SIGINT signal handler for Ctrl+C
+ signal.signal(signal.SIGINT, self.sigint_handler)
+
+ def precmd(self, statement):
+ """Hook method executed just before the command is processed by ``onecmd()`` and after adding it to the history.
+
+ :param statement: ParsedString - subclass of str which also contains pyparsing ParseResults instance
+ :return: ParsedString - a potentially modified version of the input ParsedString statement
+ """
+ return statement
+
+ # ----- Methods which are cmd2-specific lifecycle hooks which are not present in cmd -----
+
+ # noinspection PyMethodMayBeStatic
+ def preparse(self, raw):
+ """Hook method executed just before the command line is interpreted, but after the input prompt is generated.
+
+ :param raw: str - raw command line input
+ :return: str - potentially modified raw command line input
+ """
+ return raw
+
+ # noinspection PyMethodMayBeStatic
+ def postparse(self, parse_result):
+ """Hook that runs immediately after parsing the command-line but before ``parsed()`` returns a ParsedString.
+
+ :param parse_result: pyparsing.ParseResults - parsing results output by the pyparsing parser
+ :return: pyparsing.ParseResults - potentially modified ParseResults object
+ """
+ return parse_result
+
+ # noinspection PyMethodMayBeStatic
+ def postparsing_precmd(self, statement):
+ """This runs after parsing the command-line, but before anything else; even before adding cmd to history.
+
+ NOTE: This runs before precmd() and prior to any potential output redirection or piping.
+
+ If you wish to fatally fail this command and exit the application entirely, set stop = True.
+
+ If you wish to just fail this command you can do so by raising an exception:
+
+ - raise EmptyStatement - will silently fail and do nothing
+ - raise <AnyOtherException> - will fail and print an error message
+
+ :param statement: - the parsed command-line statement
+ :return: (bool, statement) - (stop, statement) containing a potentially modified version of the statement
+ """
+ stop = False
+ return stop, statement
+
+ # noinspection PyMethodMayBeStatic
+ def postparsing_postcmd(self, stop):
+ """This runs after everything else, including after postcmd().
+
+ It even runs when an empty line is entered. Thus, if you need to do something like update the prompt due
+ to notifications from a background thread, then this is the method you want to override to do it.
+
+ :param stop: bool - True implies the entire application should exit.
+ :return: bool - True implies the entire application should exit.
+ """
+ if not sys.platform.startswith('win'):
+ # Fix those annoying problems that occur with terminal programs like "less" when you pipe to them
+ if self.stdin.isatty():
+ proc = subprocess.Popen(shlex.split('stty sane'))
+ proc.communicate()
+ return stop
+
+ def parseline(self, line):
+ """Parse the line into a command name and a string containing the arguments.
+
+ NOTE: This is an override of a parent class method. It is only used by other parent class methods. But
+ we do need to override it here so that the additional shortcuts present in cmd2 get properly expanded for
+ purposes of tab completion.
+
+ Used for command tab completion. Returns a tuple containing (command, args, line).
+ 'command' and 'args' may be None if the line couldn't be parsed.
+
+ :param line: str - line read by readline
+ :return: (str, str, str) - tuple containing (command, args, line)
+ """
+ line = line.strip()
+
+ if not line:
+ # Deal with empty line or all whitespace line
+ return None, None, line
+
+ # Make a copy of aliases so we can edit it
+ tmp_aliases = list(self.aliases.keys())
+ keep_expanding = len(tmp_aliases) > 0
+
+ # Expand aliases
+ while keep_expanding:
+ for cur_alias in tmp_aliases:
+ keep_expanding = False
+
+ if line == cur_alias or line.startswith(cur_alias + ' '):
+ line = line.replace(cur_alias, self.aliases[cur_alias], 1)
+
+ # Do not expand the same alias more than once
+ tmp_aliases.remove(cur_alias)
+ keep_expanding = len(tmp_aliases) > 0
+ break
+
+ # Expand command shortcut to its full command name
+ for (shortcut, expansion) in self.shortcuts:
+ if line.startswith(shortcut):
+ # If the next character after the shortcut isn't a space, then insert one
+ shortcut_len = len(shortcut)
+ if len(line) == shortcut_len or line[shortcut_len] != ' ':
+ expansion += ' '
+
+ # Expand the shortcut
+ line = line.replace(shortcut, expansion, 1)
+ break
+
+ i, n = 0, len(line)
+
+ # If we are allowing shell commands, then allow any character in the command
+ if self.default_to_shell:
+ while i < n and line[i] != ' ':
+ i += 1
+
+ # Otherwise only allow those in identchars
+ else:
+ while i < n and line[i] in self.identchars:
+ i += 1
+
+ command, arg = line[:i], line[i:].strip()
+
+ return command, arg, line
+
+ def onecmd_plus_hooks(self, line):
+ """Top-level function called by cmdloop() to handle parsing a line and running the command and all of its hooks.
+
+ :param line: str - line of text read from input
+ :return: bool - True if cmdloop() should exit, False otherwise
+ """
+ stop = 0
+ try:
+ statement = self._complete_statement(line)
+ (stop, statement) = self.postparsing_precmd(statement)
+ if stop:
+ return self.postparsing_postcmd(stop)
+
+ try:
+ if self.allow_redirection:
+ self._redirect_output(statement)
+ timestart = datetime.datetime.now()
+ statement = self.precmd(statement)
+ stop = self.onecmd(statement)
+ stop = self.postcmd(stop, statement)
+ if self.timing:
+ self.pfeedback('Elapsed: %s' % str(datetime.datetime.now() - timestart))
+ finally:
+ if self.allow_redirection:
+ self._restore_output(statement)
+ except EmptyStatement:
+ pass
+ except ValueError as ex:
+ # If shlex.split failed on syntax, let user know whats going on
+ self.perror("Invalid syntax: {}".format(ex), traceback_war=False)
+ except Exception as ex:
+ self.perror(ex, type(ex).__name__)
+ finally:
+ return self.postparsing_postcmd(stop)
+
+ def runcmds_plus_hooks(self, cmds):
+ """Convenience method to run multiple commands by onecmd_plus_hooks.
+
+ This method adds the given cmds to the command queue and processes the
+ queue until completion or an error causes it to abort. Scripts that are
+ loaded will have their commands added to the queue. Scripts may even
+ load other scripts recursively. This means, however, that you should not
+ use this method if there is a running cmdloop or some other event-loop.
+ This method is only intended to be used in "one-off" scenarios.
+
+ NOTE: You may need this method even if you only have one command. If
+ that command is a load, then you will need this command to fully process
+ all the subsequent commands that are loaded from the script file. This
+ is an improvement over onecmd_plus_hooks, which expects to be used
+ inside of a command loop which does the processing of loaded commands.
+
+ Example: cmd_obj.runcmds_plus_hooks(['load myscript.txt'])
+
+ :param cmds: list - Command strings suitable for onecmd_plus_hooks.
+ :return: bool - True implies the entire application should exit.
+
+ """
+ stop = False
+ self.cmdqueue = list(cmds) + self.cmdqueue
+ try:
+ while self.cmdqueue and not stop:
+ line = self.cmdqueue.pop(0)
+ if self.echo and line != 'eos':
+ self.poutput('{}{}'.format(self.prompt, line))
+
+ stop = self.onecmd_plus_hooks(line)
+ finally:
+ # Clear out the command queue and script directory stack, just in
+ # case we hit an error and they were not completed.
+ self.cmdqueue = []
+ self._script_dir = []
+ # NOTE: placing this return here inside the finally block will
+ # swallow exceptions. This is consistent with what is done in
+ # onecmd_plus_hooks and _cmdloop, although it may not be
+ # necessary/desired here.
+ return stop
+
+ def _complete_statement(self, line):
+ """Keep accepting lines of input until the command is complete."""
+ if not line or (not pyparsing.Or(self.commentGrammars).setParseAction(lambda x: '').transformString(line)):
+ raise EmptyStatement()
+ statement = self.parser_manager.parsed(line)
+ while statement.parsed.multilineCommand and (statement.parsed.terminator == ''):
+ statement = '%s\n%s' % (statement.parsed.raw,
+ self.pseudo_raw_input(self.continuation_prompt))
+ statement = self.parser_manager.parsed(statement)
+ if not statement.parsed.command:
+ raise EmptyStatement()
+ return statement
+
+ def _redirect_output(self, statement):
+ """Handles output redirection for >, >>, and |.
+
+ :param statement: ParsedString - subclass of str which also contains pyparsing ParseResults instance
+ """
+ if statement.parsed.pipeTo:
+ self.kept_state = Statekeeper(self, ('stdout',))
+
+ # Create a pipe with read and write sides
+ read_fd, write_fd = os.pipe()
+
+ # Open each side of the pipe and set stdout accordingly
+ # noinspection PyTypeChecker
+ self.stdout = io.open(write_fd, 'w')
+ self.redirecting = True
+ # noinspection PyTypeChecker
+ subproc_stdin = io.open(read_fd, 'r')
+
+ # We want Popen to raise an exception if it fails to open the process. Thus we don't set shell to True.
+ try:
+ self.pipe_proc = subprocess.Popen(shlex.split(statement.parsed.pipeTo), stdin=subproc_stdin)
+ except Exception as ex:
+ # Restore stdout to what it was and close the pipe
+ self.stdout.close()
+ subproc_stdin.close()
+ self.pipe_proc = None
+ self.kept_state.restore()
+ self.kept_state = None
+ self.redirecting = False
+
+ # Re-raise the exception
+ raise ex
+ elif statement.parsed.output:
+ if (not statement.parsed.outputTo) and (not can_clip):
+ raise EnvironmentError('Cannot redirect to paste buffer; install ``xclip`` and re-run to enable')
+ self.kept_state = Statekeeper(self, ('stdout',))
+ self.kept_sys = Statekeeper(sys, ('stdout',))
+ self.redirecting = True
+ if statement.parsed.outputTo:
+ mode = 'w'
+ if statement.parsed.output == 2 * self.redirector:
+ mode = 'a'
+ sys.stdout = self.stdout = open(os.path.expanduser(statement.parsed.outputTo), mode)
+ else:
+ sys.stdout = self.stdout = tempfile.TemporaryFile(mode="w+")
+ if statement.parsed.output == '>>':
+ self.poutput(get_paste_buffer())
+
+ def _restore_output(self, statement):
+ """Handles restoring state after output redirection as well as the actual pipe operation if present.
+
+ :param statement: ParsedString - subclass of str which also contains pyparsing ParseResults instance
+ """
+ # If we have redirected output to a file or the clipboard or piped it to a shell command, then restore state
+ if self.kept_state is not None:
+ # If we redirected output to the clipboard
+ if statement.parsed.output and not statement.parsed.outputTo:
+ self.stdout.seek(0)
+ write_to_paste_buffer(self.stdout.read())
+
+ try:
+ # Close the file or pipe that stdout was redirected to
+ self.stdout.close()
+ except BrokenPipeError:
+ pass
+ finally:
+ # Restore self.stdout
+ self.kept_state.restore()
+ self.kept_state = None
+
+ # If we were piping output to a shell command, then close the subprocess the shell command was running in
+ if self.pipe_proc is not None:
+ self.pipe_proc.communicate()
+ self.pipe_proc = None
+
+ # Restore sys.stdout if need be
+ if self.kept_sys is not None:
+ self.kept_sys.restore()
+ self.kept_sys = None
+
+ self.redirecting = False
+
+ def _func_named(self, arg):
+ """Gets the method name associated with a given command.
+
+ :param arg: str - command to look up method name which implements it
+ :return: str - method name which implements the given command
+ """
+ result = None
+ target = 'do_' + arg
+ if target in dir(self):
+ result = target
+ return result
+
+ def onecmd(self, line):
+ """ This executes the actual do_* method for a command.
+
+ If the command provided doesn't exist, then it executes _default() instead.
+
+ :param line: ParsedString - subclass of string including the pyparsing ParseResults
+ :return: bool - a flag indicating whether the interpretation of commands should stop
+ """
+ statement = self.parser_manager.parsed(line)
+ funcname = self._func_named(statement.parsed.command)
+ if not funcname:
+ return self.default(statement)
+
+ # Since we have a valid command store it in the history
+ if statement.parsed.command not in self.exclude_from_history:
+ self.history.append(statement.parsed.raw)
+
+ try:
+ func = getattr(self, funcname)
+ except AttributeError:
+ return self.default(statement)
+
+ stop = func(statement)
+ return stop
+
+ def default(self, statement):
+ """Executed when the command given isn't a recognized command implemented by a do_* method.
+
+ :param statement: ParsedString - subclass of string including the pyparsing ParseResults
+ :return:
+ """
+ arg = statement.full_parsed_statement()
+ if self.default_to_shell:
+ result = os.system(arg)
+ # If os.system() succeeded, then don't print warning about unknown command
+ if not result:
+ return
+
+ # Print out a message stating this is an unknown command
+ self.poutput('*** Unknown syntax: {}\n'.format(arg))
+
+ @staticmethod
+ def _surround_ansi_escapes(prompt, start="\x01", end="\x02"):
+ """Overcome bug in GNU Readline in relation to calculation of prompt length in presence of ANSI escape codes.
+
+ :param prompt: str - original prompt
+ :param start: str - start code to tell GNU Readline about beginning of invisible characters
+ :param end: str - end code to tell GNU Readline about end of invisible characters
+ :return: str - prompt safe to pass to GNU Readline
+ """
+ # Windows terminals don't use ANSI escape codes and Windows readline isn't based on GNU Readline
+ if sys.platform == "win32":
+ return prompt
+
+ escaped = False
+ result = ""
+
+ for c in prompt:
+ if c == "\x1b" and not escaped:
+ result += start + c
+ escaped = True
+ elif c.isalpha() and escaped:
+ result += c + end
+ escaped = False
+ else:
+ result += c
+
+ return result
+
+ def pseudo_raw_input(self, prompt):
+ """
+ began life as a copy of cmd's cmdloop; like raw_input but
+
+ - accounts for changed stdin, stdout
+ - if input is a pipe (instead of a tty), look at self.echo
+ to decide whether to print the prompt and the input
+ """
+
+ # Deal with the vagaries of readline and ANSI escape codes
+ safe_prompt = self._surround_ansi_escapes(prompt)
+
+ if self.use_rawinput:
+ try:
+ if sys.stdin.isatty():
+ line = input(safe_prompt)
+ else:
+ line = input()
+ if self.echo:
+ sys.stdout.write('{}{}\n'.format(safe_prompt, line))
+ except EOFError:
+ line = 'eof'
+ else:
+ if self.stdin.isatty():
+ # on a tty, print the prompt first, then read the line
+ self.poutput(safe_prompt, end='')
+ self.stdout.flush()
+ line = self.stdin.readline()
+ if len(line) == 0:
+ line = 'eof'
+ else:
+ # we are reading from a pipe, read the line to see if there is
+ # anything there, if so, then decide whether to print the
+ # prompt or not
+ line = self.stdin.readline()
+ if len(line):
+ # we read something, output the prompt and the something
+ if self.echo:
+ self.poutput('{}{}'.format(safe_prompt, line))
+ else:
+ line = 'eof'
+ return line.strip()
+
+ def _cmdloop(self):
+ """Repeatedly issue a prompt, accept input, parse an initial prefix
+ off the received input, and dispatch to action methods, passing them
+ the remainder of the line as argument.
+
+ This serves the same role as cmd.cmdloop().
+
+ :return: bool - True implies the entire application should exit.
+ """
+ # An almost perfect copy from Cmd; however, the pseudo_raw_input portion
+ # has been split out so that it can be called separately
+ if self.use_rawinput and self.completekey:
+
+ # Set up readline for our tab completion needs
+ if rl_type == RlType.GNU:
+ readline.set_completion_display_matches_hook(self._display_matches_gnu_readline)
+
+ # Set GNU readline's rl_basic_quote_characters to NULL so it won't automatically add a closing quote
+ # We don't need to worry about setting rl_completion_suppress_quote since we never declared
+ # rl_completer_quote_characters.
+ rl_basic_quote_characters.value = None
+
+ elif rl_type == RlType.PYREADLINE:
+ readline.rl.mode._display_completions = self._display_matches_pyreadline
+
+ try:
+ self.old_completer = readline.get_completer()
+ self.old_delims = readline.get_completer_delims()
+ readline.set_completer(self.complete)
+
+ # Break words on whitespace and quotes when tab completing
+ completer_delims = " \t\n" + ''.join(QUOTES)
+
+ if self.allow_redirection:
+ # If redirection is allowed, then break words on those characters too
+ completer_delims += ''.join(REDIRECTION_CHARS)
+
+ readline.set_completer_delims(completer_delims)
+
+ # Enable tab completion
+ readline.parse_and_bind(self.completekey + ": complete")
+ except NameError:
+ pass
+ stop = None
+ try:
+ while not stop:
+ if self.cmdqueue:
+ # Run command out of cmdqueue if nonempty (populated by load command or commands at invocation)
+ line = self.cmdqueue.pop(0)
+
+ if self.echo and line != 'eos':
+ self.poutput('{}{}'.format(self.prompt, line))
+ else:
+ # Otherwise, read a command from stdin
+ if not self.quit_on_sigint:
+ try:
+ line = self.pseudo_raw_input(self.prompt)
+ except KeyboardInterrupt:
+ self.poutput('^C')
+ line = ''
+ else:
+ line = self.pseudo_raw_input(self.prompt)
+
+ # Run the command along with all associated pre and post hooks
+ stop = self.onecmd_plus_hooks(line)
+ finally:
+ if self.use_rawinput and self.completekey:
+
+ # Restore what we changed in readline
+ try:
+ readline.set_completer(self.old_completer)
+ readline.set_completer_delims(self.old_delims)
+ except NameError:
+ pass
+
+ if rl_type == RlType.GNU:
+ readline.set_completion_display_matches_hook(None)
+ rl_basic_quote_characters.value = orig_rl_basic_quote_characters_addr
+
+ elif rl_type == RlType.PYREADLINE:
+ readline.rl.mode._display_completions = orig_pyreadline_display
+
+ self.cmdqueue.clear()
+ self._script_dir.clear()
+
+ return stop
+
+ @with_argument_list
+ def do_alias(self, arglist):
+ """Define or display aliases
+
+Usage: Usage: alias [name] | [<name> <value>]
+ Where:
+ name - name of the alias being looked up, added, or replaced
+ value - what the alias will be resolved to (if adding or replacing)
+ this can contain spaces and does not need to be quoted
+
+ Without arguments, 'alias' prints a list of all aliases in a reusable form which
+ can be outputted to a startup_script to preserve aliases across sessions.
+
+ With one argument, 'alias' shows the value of the specified alias.
+ Example: alias ls (Prints the value of the alias called 'ls' if it exists)
+
+ With two or more arguments, 'alias' creates or replaces an alias.
+
+ Example: alias ls !ls -lF
+
+ If you want to use redirection or pipes in the alias, then either quote the tokens with these
+ characters or quote the entire alias value.
+
+ Examples:
+ alias save_results print_results ">" out.txt
+ alias save_results print_results "> out.txt"
+ alias save_results "print_results > out.txt"
+"""
+ # If no args were given, then print a list of current aliases
+ if not arglist:
+ for cur_alias in self.aliases:
+ self.poutput("alias {} {}".format(cur_alias, self.aliases[cur_alias]))
+
+ # The user is looking up an alias
+ elif len(arglist) == 1:
+ name = arglist[0]
+ if name in self.aliases:
+ self.poutput("alias {} {}".format(name, self.aliases[name]))
+ else:
+ self.perror("Alias {!r} not found".format(name), traceback_war=False)
+
+ # The user is creating an alias
+ else:
+ name = arglist[0]
+ value = ' '.join(arglist[1:])
+
+ # Check for a valid name
+ for cur_char in name:
+ if cur_char not in self.identchars:
+ self.perror("Alias names can only contain the following characters: {}".format(self.identchars),
+ traceback_war=False)
+ return
+
+ # Set the alias
+ self.aliases[name] = value
+ self.poutput("Alias {!r} created".format(name))
+
+ def complete_alias(self, text, line, begidx, endidx):
+ """ Tab completion for alias """
+ index_dict = \
+ {
+ 1: self.aliases,
+ 2: self.get_visible_commands()
+ }
+ return self.index_based_complete(text, line, begidx, endidx, index_dict, self.path_complete)
+
+ @with_argument_list
+ def do_unalias(self, arglist):
+ """Unsets aliases
+
+Usage: Usage: unalias [-a] name [name ...]
+ Where:
+ name - name of the alias being unset
+
+ Options:
+ -a remove all alias definitions
+"""
+ if not arglist:
+ self.do_help('unalias')
+
+ if '-a' in arglist:
+ self.aliases.clear()
+ self.poutput("All aliases cleared")
+
+ else:
+ # Get rid of duplicates
+ arglist = list(set(arglist))
+
+ for cur_arg in arglist:
+ if cur_arg in self.aliases:
+ del self.aliases[cur_arg]
+ self.poutput("Alias {!r} cleared".format(cur_arg))
+ else:
+ self.perror("Alias {!r} does not exist".format(cur_arg), traceback_war=False)
+
+ def complete_unalias(self, text, line, begidx, endidx):
+ """ Tab completion for unalias """
+ return self.basic_complete(text, line, begidx, endidx, self.aliases)
+
+ @with_argument_list
+ def do_help(self, arglist):
+ """List available commands with "help" or detailed help with "help cmd"."""
+ if not arglist or (len(arglist) == 1 and arglist[0] in ('--verbose', '-v')):
+ verbose = len(arglist) == 1 and arglist[0] in ('--verbose', '-v')
+ self._help_menu(verbose)
+ else:
+ # Getting help for a specific command
+ funcname = self._func_named(arglist[0])
+ if funcname:
+ # Check to see if this function was decorated with an argparse ArgumentParser
+ func = getattr(self, funcname)
+ if func.__dict__.get('has_parser', False):
+ # Function has an argparser, so get help based on all the arguments in case there are sub-commands
+ new_arglist = arglist[1:]
+ new_arglist.append('-h')
+
+ # Temporarily redirect all argparse output to both sys.stdout and sys.stderr to self.stdout
+ with redirect_stdout(self.stdout):
+ with redirect_stderr(self.stdout):
+ func(new_arglist)
+ else:
+ # No special behavior needed, delegate to cmd base class do_help()
+ cmd.Cmd.do_help(self, funcname[3:])
+ else:
+ # This could be a help topic
+ cmd.Cmd.do_help(self, arglist[0])
+
+ def _help_menu(self, verbose=False):
+ """Show a list of commands which help can be displayed for.
+ """
+ # Get a sorted list of help topics
+ help_topics = self.get_help_topics()
+ help_topics.sort()
+
+ # Get a sorted list of visible command names
+ visible_commands = self.get_visible_commands()
+ visible_commands.sort()
+
+ cmds_doc = []
+ cmds_undoc = []
+ cmds_cats = {}
+
+ for command in visible_commands:
+ if command in help_topics or getattr(self, self._func_named(command)).__doc__:
+ if command in help_topics:
+ help_topics.remove(command)
+ if hasattr(getattr(self, self._func_named(command)), HELP_CATEGORY):
+ category = getattr(getattr(self, self._func_named(command)), HELP_CATEGORY)
+ cmds_cats.setdefault(category, [])
+ cmds_cats[category].append(command)
+ else:
+ cmds_doc.append(command)
+ else:
+ cmds_undoc.append(command)
+
+ if len(cmds_cats) == 0:
+ # No categories found, fall back to standard behavior
+ self.poutput("{}\n".format(str(self.doc_leader)))
+ self._print_topics(self.doc_header, cmds_doc, verbose)
+ else:
+ # Categories found, Organize all commands by category
+ self.poutput('{}\n'.format(str(self.doc_leader)))
+ self.poutput('{}\n\n'.format(str(self.doc_header)))
+ for category in sorted(cmds_cats.keys()):
+ self._print_topics(category, cmds_cats[category], verbose)
+ self._print_topics('Other', cmds_doc, verbose)
+
+ self.print_topics(self.misc_header, help_topics, 15, 80)
+ self.print_topics(self.undoc_header, cmds_undoc, 15, 80)
+
+ def _print_topics(self, header, cmds, verbose):
+ """Customized version of print_topics that can switch between verbose or traditional output"""
+ if cmds:
+ if not verbose:
+ self.print_topics(header, cmds, 15, 80)
+ else:
+ self.stdout.write('{}\n'.format(str(header)))
+ widest = 0
+ # measure the commands
+ for command in cmds:
+ width = len(command)
+ if width > widest:
+ widest = width
+ # add a 4-space pad
+ widest += 4
+ if widest < 20:
+ widest = 20
+
+ if self.ruler:
+ self.stdout.write('{:{ruler}<{width}}\n'.format('', ruler=self.ruler, width=80))
+
+ for command in cmds:
+ # Try to get the documentation string
+ try:
+ # first see if there's a help function implemented
+ func = getattr(self, 'help_' + command)
+ except AttributeError:
+ # Couldn't find a help function
+ try:
+ # Now see if help_summary has been set
+ doc = getattr(self, self._func_named(command)).help_summary
+ except AttributeError:
+ # Last, try to directly access the function's doc-string
+ doc = getattr(self, self._func_named(command)).__doc__
+ else:
+ # we found the help function
+ result = StringIO()
+ # try to redirect system stdout
+ with redirect_stdout(result):
+ # save our internal stdout
+ stdout_orig = self.stdout
+ try:
+ # redirect our internal stdout
+ self.stdout = result
+ func()
+ finally:
+ # restore internal stdout
+ self.stdout = stdout_orig
+ doc = result.getvalue()
+
+ # Attempt to locate the first documentation block
+ doc_block = []
+ found_first = False
+ for doc_line in doc.splitlines():
+ str(doc_line).strip()
+ if len(doc_line.strip()) > 0:
+ doc_block.append(doc_line.strip())
+ found_first = True
+ else:
+ if found_first:
+ break
+
+ for doc_line in doc_block:
+ self.stdout.write('{: <{col_width}}{doc}\n'.format(command,
+ col_width=widest,
+ doc=doc_line))
+ command = ''
+ self.stdout.write("\n")
+
+ def do_shortcuts(self, _):
+ """Lists shortcuts (aliases) available."""
+ result = "\n".join('%s: %s' % (sc[0], sc[1]) for sc in sorted(self.shortcuts))
+ self.poutput("Shortcuts for other commands:\n{}\n".format(result))
+
+ def do_eof(self, _):
+ """Called when <Ctrl>-D is pressed."""
+ # End of script should not exit app, but <Ctrl>-D should.
+ print('') # Required for clearing line when exiting submenu
+ return self._STOP_AND_EXIT
+
+ def do_quit(self, _):
+ """Exits this application."""
+ self._should_quit = True
+ return self._STOP_AND_EXIT
+
+ def select(self, opts, prompt='Your choice? '):
+ """Presents a numbered menu to the user. Modelled after
+ the bash shell's SELECT. Returns the item chosen.
+
+ Argument ``opts`` can be:
+
+ | a single string -> will be split into one-word options
+ | a list of strings -> will be offered as options
+ | a list of tuples -> interpreted as (value, text), so
+ that the return value can differ from
+ the text advertised to the user """
+ local_opts = opts
+ if isinstance(opts, str):
+ local_opts = list(zip(opts.split(), opts.split()))
+ fulloptions = []
+ for opt in local_opts:
+ if isinstance(opt, str):
+ fulloptions.append((opt, opt))
+ else:
+ try:
+ fulloptions.append((opt[0], opt[1]))
+ except IndexError:
+ fulloptions.append((opt[0], opt[0]))
+ for (idx, (value, text)) in enumerate(fulloptions):
+ self.poutput(' %2d. %s\n' % (idx + 1, text))
+ while True:
+ response = input(prompt)
+ hlen = readline.get_current_history_length()
+ if hlen >= 1 and response != '':
+ readline.remove_history_item(hlen - 1)
+ try:
+ response = int(response)
+ result = fulloptions[response - 1][0]
+ break
+ except (ValueError, IndexError):
+ self.poutput("{!r} isn't a valid choice. Pick a number between 1 and {}:\n".format(response,
+ len(fulloptions)))
+ return result
+
+ def cmdenvironment(self):
+ """Get a summary report of read-only settings which the user cannot modify at runtime.
+
+ :return: str - summary report of read-only settings which the user cannot modify at runtime
+ """
+ read_only_settings = """
+ Commands may be terminated with: {}
+ Arguments at invocation allowed: {}
+ Output redirection and pipes allowed: {}
+ Parsing of command arguments:
+ Shell lexer mode for command argument splitting: {}
+ Strip Quotes after splitting arguments: {}
+ """.format(str(self.terminators), self.allow_cli_args, self.allow_redirection,
+ "POSIX" if POSIX_SHLEX else "non-POSIX",
+ "True" if STRIP_QUOTES_FOR_NON_POSIX and not POSIX_SHLEX else "False")
+ return read_only_settings
+
+ def show(self, args, parameter):
+ param = ''
+ if parameter:
+ param = parameter.strip().lower()
+ result = {}
+ maxlen = 0
+ for p in self.settable:
+ if (not param) or p.startswith(param):
+ result[p] = '%s: %s' % (p, str(getattr(self, p)))
+ maxlen = max(maxlen, len(result[p]))
+ if result:
+ for p in sorted(result):
+ if args.long:
+ self.poutput('{} # {}'.format(result[p].ljust(maxlen), self.settable[p]))
+ else:
+ self.poutput(result[p])
+
+ # If user has requested to see all settings, also show read-only settings
+ if args.all:
+ self.poutput('\nRead only settings:{}'.format(self.cmdenvironment()))
+ else:
+ raise LookupError("Parameter '%s' not supported (type 'show' for list of parameters)." % param)
+
+ set_parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter)
+ set_parser.add_argument('-a', '--all', action='store_true', help='display read-only settings as well')
+ set_parser.add_argument('-l', '--long', action='store_true', help='describe function of parameter')
+ set_parser.add_argument('settable', nargs='*', help='[param_name] [value]')
+
+ @with_argparser(set_parser)
+ def do_set(self, args):
+ """Sets a settable parameter or shows current settings of parameters.
+
+ Accepts abbreviated parameter names so long as there is no ambiguity.
+ Call without arguments for a list of settable parameters with their values.
+ """
+ try:
+ param_name, val = args.settable
+ val = val.strip()
+ param_name = param_name.strip().lower()
+ if param_name not in self.settable:
+ hits = [p for p in self.settable if p.startswith(param_name)]
+ if len(hits) == 1:
+ param_name = hits[0]
+ else:
+ return self.show(args, param_name)
+ current_val = getattr(self, param_name)
+ if (val[0] == val[-1]) and val[0] in ("'", '"'):
+ val = val[1:-1]
+ else:
+ val = cast(current_val, val)
+ setattr(self, param_name, val)
+ self.poutput('%s - was: %s\nnow: %s\n' % (param_name, current_val, val))
+ if current_val != val:
+ try:
+ onchange_hook = getattr(self, '_onchange_%s' % param_name)
+ onchange_hook(old=current_val, new=val)
+ except AttributeError:
+ pass
+ except (ValueError, AttributeError):
+ param = ''
+ if args.settable:
+ param = args.settable[0]
+ self.show(args, param)
+
+ def do_shell(self, command):
+ """Execute a command as if at the OS prompt.
+
+ Usage: shell <command> [arguments]"""
+
+ try:
+ # Use non-POSIX parsing to keep the quotes around the tokens
+ tokens = shlex.split(command, posix=False)
+ except ValueError as err:
+ self.perror(err, traceback_war=False)
+ return
+
+ # Support expanding ~ in quoted paths
+ for index, _ in enumerate(tokens):
+ if tokens[index]:
+ # Check if the token is quoted. Since shlex.split() passed, there isn't
+ # an unclosed quote, so we only need to check the first character.
+ first_char = tokens[index][0]
+ if first_char in QUOTES:
+ tokens[index] = strip_quotes(tokens[index])
+
+ tokens[index] = os.path.expanduser(tokens[index])
+
+ # Restore the quotes
+ if first_char in QUOTES:
+ tokens[index] = first_char + tokens[index] + first_char
+
+ expanded_command = ' '.join(tokens)
+ proc = subprocess.Popen(expanded_command, stdout=self.stdout, shell=True)
+ proc.communicate()
+
+ def complete_shell(self, text, line, begidx, endidx):
+ """Handles tab completion of executable commands and local file system paths for the shell command
+
+ :param text: str - the string prefix we are attempting to match (all returned matches must begin with it)
+ :param line: str - the current input line with leading whitespace removed
+ :param begidx: int - the beginning index of the prefix text
+ :param endidx: int - the ending index of the prefix text
+ :return: List[str] - a list of possible tab completions
+ """
+ index_dict = {1: self.shell_cmd_complete}
+ return self.index_based_complete(text, line, begidx, endidx, index_dict, self.path_complete)
+
+ def cmd_with_subs_completer(self, text, line, begidx, endidx):
+ """
+ This is a function provided for convenience to those who want an easy way to add
+ tab completion to functions that implement subcommands. By setting this as the
+ completer of the base command function, the correct completer for the chosen subcommand
+ will be called.
+
+ The use of this function requires assigning a completer function to the subcommand's parser
+ Example:
+ A command called print has a subcommands called 'names' that needs a tab completer
+ When you create the parser for names, include the completer function in the parser's defaults.
+
+ names_parser.set_defaults(func=print_names, completer=complete_print_names)
+
+ To make sure the names completer gets called, set the completer for the print function
+ in a similar fashion to what follows.
+
+ complete_print = cmd2.Cmd.cmd_with_subs_completer
+
+ When the subcommand's completer is called, this function will have stripped off all content from the
+ beginning of the command line before the subcommand, meaning the line parameter always starts with the
+ subcommand name and the index parameters reflect this change.
+
+ For instance, the command "print names -d 2" becomes "names -d 2"
+ begidx and endidx are incremented accordingly
+
+ :param text: str - the string prefix we are attempting to match (all returned matches must begin with it)
+ :param line: str - the current input line with leading whitespace removed
+ :param begidx: int - the beginning index of the prefix text
+ :param endidx: int - the ending index of the prefix text
+ :return: List[str] - a list of possible tab completions
+ """
+ # The command is the token at index 0 in the command line
+ cmd_index = 0
+
+ # The subcommand is the token at index 1 in the command line
+ subcmd_index = 1
+
+ # Get all tokens through the one being completed
+ tokens, _ = self.tokens_for_completion(line, begidx, endidx)
+ if tokens is None:
+ return []
+
+ matches = []
+
+ # Get the index of the token being completed
+ index = len(tokens) - 1
+
+ # If the token being completed is past the subcommand name, then do subcommand specific tab-completion
+ if index > subcmd_index:
+
+ # Get the command name
+ command = tokens[cmd_index]
+
+ # Get the subcommand name
+ subcommand = tokens[subcmd_index]
+
+ # Find the offset into line where the subcommand name begins
+ subcmd_start = 0
+ for cur_index in range(0, subcmd_index + 1):
+ cur_token = tokens[cur_index]
+ subcmd_start = line.find(cur_token, subcmd_start)
+
+ if cur_index != subcmd_index:
+ subcmd_start += len(cur_token)
+
+ # Strip off everything before subcommand name
+ orig_line = line
+ line = line[subcmd_start:]
+
+ # Update the indexes
+ diff = len(orig_line) - len(line)
+ begidx -= diff
+ endidx -= diff
+
+ # Call the subcommand specific completer if it exists
+ compfunc = self.get_subcommand_completer(command, subcommand)
+ if compfunc is not None:
+ matches = compfunc(self, text, line, begidx, endidx)
+
+ return matches
+
+ # noinspection PyBroadException
+ def do_py(self, arg):
+ """
+ Invoke python command, shell, or script
+
+ py <command>: Executes a Python command.
+ py: Enters interactive Python mode.
+ End with ``Ctrl-D`` (Unix) / ``Ctrl-Z`` (Windows), ``quit()``, '`exit()``.
+ Non-python commands can be issued with ``cmd("your command")``.
+ Run python code from external script files with ``run("script.py")``
+ """
+ if self._in_py:
+ self.perror("Recursively entering interactive Python consoles is not allowed.", traceback_war=False)
+ return
+ self._in_py = True
+
+ try:
+ self.pystate['self'] = self
+ arg = arg.strip()
+
+ # Support the run command even if called prior to invoking an interactive interpreter
+ def run(filename):
+ """Run a Python script file in the interactive console.
+
+ :param filename: str - filename of *.py script file to run
+ """
+ try:
+ with open(filename) as f:
+ interp.runcode(f.read())
+ except IOError as e:
+ self.perror(e)
+
+ def onecmd_plus_hooks(cmd_plus_args):
+ """Run a cmd2.Cmd command from a Python script or the interactive Python console.
+
+ :param cmd_plus_args: str - command line including command and arguments to run
+ :return: bool - True if cmdloop() should exit once leaving the interactive Python console
+ """
+ return self.onecmd_plus_hooks(cmd_plus_args + '\n')
+
+ self.pystate['run'] = run
+ self.pystate['cmd'] = onecmd_plus_hooks
+
+ localvars = (self.locals_in_py and self.pystate) or {}
+ interp = InteractiveConsole(locals=localvars)
+ interp.runcode('import sys, os;sys.path.insert(0, os.getcwd())')
+
+ if arg:
+ interp.runcode(arg)
+ else:
+ # noinspection PyShadowingBuiltins
+ def quit():
+ """Function callable from the interactive Python console to exit that environment"""
+ raise EmbeddedConsoleExit
+
+ self.pystate['quit'] = quit
+ self.pystate['exit'] = quit
+
+ keepstate = None
+ try:
+ cprt = 'Type "help", "copyright", "credits" or "license" for more information.'
+ keepstate = Statekeeper(sys, ('stdin', 'stdout'))
+ sys.stdout = self.stdout
+ sys.stdin = self.stdin
+ interp.interact(banner="Python %s on %s\n%s\n(%s)\n%s" %
+ (sys.version, sys.platform, cprt, self.__class__.__name__,
+ self.do_py.__doc__))
+ except EmbeddedConsoleExit:
+ pass
+ if keepstate is not None:
+ keepstate.restore()
+ except Exception:
+ pass
+ finally:
+ self._in_py = False
+ return self._should_quit
+
+ @with_argument_list
+ def do_pyscript(self, arglist):
+ """\nRuns a python script file inside the console
+
+ Usage: pyscript <script_path> [script_arguments]
+
+Console commands can be executed inside this script with cmd("your command")
+However, you cannot run nested "py" or "pyscript" commands from within this script
+Paths or arguments that contain spaces must be enclosed in quotes
+"""
+ if not arglist:
+ self.perror("pyscript command requires at least 1 argument ...", traceback_war=False)
+ self.do_help('pyscript')
+ return
+
+ # Get the absolute path of the script
+ script_path = os.path.expanduser(arglist[0])
+
+ # Save current command line arguments
+ orig_args = sys.argv
+
+ # Overwrite sys.argv to allow the script to take command line arguments
+ sys.argv = [script_path]
+ sys.argv.extend(arglist[1:])
+
+ # Run the script - use repr formatting to escape things which need to be escaped to prevent issues on Windows
+ self.do_py("run({!r})".format(script_path))
+
+ # Restore command line arguments to original state
+ sys.argv = orig_args
+
+ # Enable tab-completion for pyscript command
+ def complete_pyscript(self, text, line, begidx, endidx):
+ index_dict = {1: self.path_complete}
+ return self.index_based_complete(text, line, begidx, endidx, index_dict)
+
+ # Only include the do_ipy() method if IPython is available on the system
+ if ipython_available:
+ # noinspection PyMethodMayBeStatic,PyUnusedLocal
+ def do_ipy(self, arg):
+ """Enters an interactive IPython shell.
+
+ Run python code from external files with ``run filename.py``
+ End with ``Ctrl-D`` (Unix) / ``Ctrl-Z`` (Windows), ``quit()``, '`exit()``.
+ """
+ banner = 'Entering an embedded IPython shell type quit() or <Ctrl>-d to exit ...'
+ exit_msg = 'Leaving IPython, back to {}'.format(sys.argv[0])
+ embed(banner1=banner, exit_msg=exit_msg)
+
+ history_parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter)
+ history_parser_group = history_parser.add_mutually_exclusive_group()
+ history_parser_group.add_argument('-r', '--run', action='store_true', help='run selected history items')
+ history_parser_group.add_argument('-e', '--edit', action='store_true',
+ help='edit and then run selected history items')
+ history_parser_group.add_argument('-s', '--script', action='store_true', help='script format; no separation lines')
+ history_parser_group.add_argument('-o', '--output-file', metavar='FILE', help='output commands to a script file')
+ history_parser_group.add_argument('-t', '--transcript', help='output commands and results to a transcript file')
+ _history_arg_help = """empty all history items
+a one history item by number
+a..b, a:b, a:, ..b items by indices (inclusive)
+[string] items containing string
+/regex/ items matching regular expression"""
+ history_parser.add_argument('arg', nargs='?', help=_history_arg_help)
+
+ @with_argparser(history_parser)
+ def do_history(self, args):
+ """View, run, edit, and save previously entered commands."""
+ # If an argument was supplied, then retrieve partial contents of the history
+ cowardly_refuse_to_run = False
+ if args.arg:
+ # If a character indicating a slice is present, retrieve
+ # a slice of the history
+ arg = args.arg
+ if '..' in arg or ':' in arg:
+ try:
+ # Get a slice of history
+ history = self.history.span(arg)
+ except IndexError:
+ history = self.history.get(arg)
+ else:
+ # Get item(s) from history by index or string search
+ history = self.history.get(arg)
+ else:
+ # If no arg given, then retrieve the entire history
+ cowardly_refuse_to_run = True
+ # Get a copy of the history so it doesn't get mutated while we are using it
+ history = self.history[:]
+
+ if args.run:
+ if cowardly_refuse_to_run:
+ self.perror("Cowardly refusing to run all previously entered commands.", traceback_war=False)
+ self.perror("If this is what you want to do, specify '1:' as the range of history.",
+ traceback_war=False)
+ else:
+ for runme in history:
+ self.pfeedback(runme)
+ if runme:
+ self.onecmd_plus_hooks(runme)
+ elif args.edit:
+ fd, fname = tempfile.mkstemp(suffix='.txt', text=True)
+ with os.fdopen(fd, 'w') as fobj:
+ for command in history:
+ fobj.write('{}\n'.format(command))
+ try:
+ os.system('"{}" "{}"'.format(self.editor, fname))
+ self.do_load(fname)
+ except Exception:
+ raise
+ finally:
+ os.remove(fname)
+ elif args.output_file:
+ try:
+ with open(os.path.expanduser(args.output_file), 'w') as fobj:
+ for command in history:
+ fobj.write('{}\n'.format(command))
+ plural = 's' if len(history) > 1 else ''
+ self.pfeedback('{} command{} saved to {}'.format(len(history), plural, args.output_file))
+ except Exception as e:
+ self.perror('Saving {!r} - {}'.format(args.output_file, e), traceback_war=False)
+ elif args.transcript:
+ # Make sure echo is on so commands print to standard out
+ saved_echo = self.echo
+ self.echo = True
+
+ # Redirect stdout to the transcript file
+ saved_self_stdout = self.stdout
+ self.stdout = open(args.transcript, 'w')
+
+ # Run all of the commands in the history with output redirected to transcript and echo on
+ self.runcmds_plus_hooks(history)
+
+ # Restore stdout to its original state
+ self.stdout.close()
+ self.stdout = saved_self_stdout
+
+ # Set echo back to its original state
+ self.echo = saved_echo
+
+ # Post-process the file to escape un-escaped "/" regex escapes
+ with open(args.transcript, 'r') as fin:
+ data = fin.read()
+ post_processed_data = data.replace('/', '\/')
+ with open(args.transcript, 'w') as fout:
+ fout.write(post_processed_data)
+
+ plural = 's' if len(history) > 1 else ''
+ self.pfeedback('{} command{} and outputs saved to transcript file {!r}'.format(len(history), plural,
+ args.transcript))
+ else:
+ # Display the history items retrieved
+ for hi in history:
+ if args.script:
+ self.poutput(hi)
+ else:
+ self.poutput(hi.pr())
+
+ @with_argument_list
+ def do_edit(self, arglist):
+ """Edit a file in a text editor.
+
+Usage: edit [file_path]
+ Where:
+ * file_path - path to a file to open in editor
+
+The editor used is determined by the ``editor`` settable parameter.
+"set editor (program-name)" to change or set the EDITOR environment variable.
+"""
+ if not self.editor:
+ raise EnvironmentError("Please use 'set editor' to specify your text editing program of choice.")
+ filename = arglist[0] if arglist else ''
+ if filename:
+ os.system('"{}" "{}"'.format(self.editor, filename))
+ else:
+ os.system('"{}"'.format(self.editor))
+
+ # Enable tab-completion for edit command
+ def complete_edit(self, text, line, begidx, endidx):
+ index_dict = {1: self.path_complete}
+ return self.index_based_complete(text, line, begidx, endidx, index_dict)
+
+ @property
+ def _current_script_dir(self):
+ """Accessor to get the current script directory from the _script_dir LIFO queue."""
+ if self._script_dir:
+ return self._script_dir[-1]
+ else:
+ return None
+
+ @with_argument_list
+ def do__relative_load(self, arglist):
+ """Runs commands in script file that is encoded as either ASCII or UTF-8 text.
+
+ Usage: _relative_load <file_path>
+
+ optional argument:
+ file_path a file path pointing to a script
+
+Script should contain one command per line, just like command would be typed in console.
+
+If this is called from within an already-running script, the filename will be interpreted
+relative to the already-running script's directory.
+
+NOTE: This command is intended to only be used within text file scripts.
+ """
+ # If arg is None or arg is an empty string this is an error
+ if not arglist:
+ self.perror('_relative_load command requires a file path:', traceback_war=False)
+ return
+
+ file_path = arglist[0].strip()
+ # NOTE: Relative path is an absolute path, it is just relative to the current script directory
+ relative_path = os.path.join(self._current_script_dir or '', file_path)
+ self.do_load(relative_path)
+
+ def do_eos(self, _):
+ """Handles cleanup when a script has finished executing."""
+ if self._script_dir:
+ self._script_dir.pop()
+
+ @with_argument_list
+ def do_load(self, arglist):
+ """Runs commands in script file that is encoded as either ASCII or UTF-8 text.
+
+ Usage: load <file_path>
+
+ * file_path - a file path pointing to a script
+
+Script should contain one command per line, just like command would be typed in console.
+ """
+ # If arg is None or arg is an empty string this is an error
+ if not arglist:
+ self.perror('load command requires a file path:', traceback_war=False)
+ return
+
+ file_path = arglist[0].strip()
+ expanded_path = os.path.abspath(os.path.expanduser(file_path))
+
+ # Make sure expanded_path points to a file
+ if not os.path.isfile(expanded_path):
+ self.perror('{} does not exist or is not a file'.format(expanded_path), traceback_war=False)
+ return
+
+ # Make sure the file is not empty
+ if os.path.getsize(expanded_path) == 0:
+ self.perror('{} is empty'.format(expanded_path), traceback_war=False)
+ return
+
+ # Make sure the file is ASCII or UTF-8 encoded text
+ if not self.is_text_file(expanded_path):
+ self.perror('{} is not an ASCII or UTF-8 encoded text file'.format(expanded_path), traceback_war=False)
+ return
+
+ try:
+ # Read all lines of the script and insert into the head of the
+ # command queue. Add an "end of script (eos)" command to cleanup the
+ # self._script_dir list when done.
+ with open(expanded_path, encoding='utf-8') as target:
+ self.cmdqueue = target.read().splitlines() + ['eos'] + self.cmdqueue
+ except IOError as e:
+ self.perror('Problem accessing script from {}:\n{}'.format(expanded_path, e))
+ return
+
+ self._script_dir.append(os.path.dirname(expanded_path))
+
+ # Enable tab-completion for load command
+ def complete_load(self, text, line, begidx, endidx):
+ index_dict = {1: self.path_complete}
+ return self.index_based_complete(text, line, begidx, endidx, index_dict)
+
+ @staticmethod
+ def is_text_file(file_path):
+ """
+ Returns if a file contains only ASCII or UTF-8 encoded text
+ :param file_path: path to the file being checked
+ """
+ expanded_path = os.path.abspath(os.path.expanduser(file_path.strip()))
+ valid_text_file = False
+
+ # Check if the file is ASCII
+ try:
+ with codecs.open(expanded_path, encoding='ascii', errors='strict') as f:
+ # Make sure the file has at least one line of text
+ # noinspection PyUnusedLocal
+ if sum(1 for line in f) > 0:
+ valid_text_file = True
+ except IOError:
+ pass
+ except UnicodeDecodeError:
+ # The file is not ASCII. Check if it is UTF-8.
+ try:
+ with codecs.open(expanded_path, encoding='utf-8', errors='strict') as f:
+ # Make sure the file has at least one line of text
+ # noinspection PyUnusedLocal
+ if sum(1 for line in f) > 0:
+ valid_text_file = True
+ except IOError:
+ pass
+ except UnicodeDecodeError:
+ # Not UTF-8
+ pass
+
+ return valid_text_file
+
+ def run_transcript_tests(self, callargs):
+ """Runs transcript tests for provided file(s).
+
+ This is called when either -t is provided on the command line or the transcript_files argument is provided
+ during construction of the cmd2.Cmd instance.
+
+ :param callargs: List[str] - list of transcript test file names
+ """
+ class TestMyAppCase(Cmd2TestCase):
+ cmdapp = self
+
+ self.__class__.testfiles = callargs
+ sys.argv = [sys.argv[0]] # the --test argument upsets unittest.main()
+ testcase = TestMyAppCase()
+ runner = unittest.TextTestRunner()
+ runner.run(testcase)
+
+ def cmdloop(self, intro=None):
+ """This is an outer wrapper around _cmdloop() which deals with extra features provided by cmd2.
+
+ _cmdloop() provides the main loop equivalent to cmd.cmdloop(). This is a wrapper around that which deals with
+ the following extra features provided by cmd2:
+ - commands at invocation
+ - transcript testing
+ - intro banner
+
+ :param intro: str - if provided this overrides self.intro and serves as the intro banner printed once at start
+ """
+ if self.allow_cli_args:
+ parser = argparse.ArgumentParser()
+ parser.add_argument('-t', '--test', action="store_true",
+ help='Test against transcript(s) in FILE (wildcards OK)')
+ callopts, callargs = parser.parse_known_args()
+
+ # If transcript testing was called for, use other arguments as transcript files
+ if callopts.test:
+ self._transcript_files = callargs
+
+ # If commands were supplied at invocation, then add them to the command queue
+ if callargs:
+ self.cmdqueue.extend(callargs)
+
+ # Always run the preloop first
+ self.preloop()
+
+ # If transcript-based regression testing was requested, then do that instead of the main loop
+ if self._transcript_files is not None:
+ self.run_transcript_tests(self._transcript_files)
+ else:
+ # If an intro was supplied in the method call, allow it to override the default
+ if intro is not None:
+ self.intro = intro
+
+ # Print the intro, if there is one, right after the preloop
+ if self.intro is not None:
+ self.poutput(str(self.intro) + "\n")
+
+ # And then call _cmdloop() to enter the main loop
+ self._cmdloop()
+
+ # Run the postloop() no matter what
+ self.postloop()
+
+
+# noinspection PyPep8Naming
+class ParserManager:
+ """
+ Class which encapsulates all of the pyparsing parser functionality for cmd2 in a single location.
+ """
+ def __init__(self, redirector, terminators, multilineCommands, legalChars, commentGrammars, commentInProgress,
+ blankLinesAllowed, prefixParser, preparse, postparse, aliases, shortcuts):
+ """Creates and uses parsers for user input according to app's parameters."""
+
+ self.commentGrammars = commentGrammars
+ self.preparse = preparse
+ self.postparse = postparse
+ self.aliases = aliases
+ self.shortcuts = shortcuts
+
+ self.main_parser = self._build_main_parser(redirector=redirector, terminators=terminators,
+ multilineCommands=multilineCommands, legalChars=legalChars,
+ commentInProgress=commentInProgress,
+ blankLinesAllowed=blankLinesAllowed, prefixParser=prefixParser)
+ self.input_source_parser = self._build_input_source_parser(legalChars=legalChars,
+ commentInProgress=commentInProgress)
+
+ def _build_main_parser(self, redirector, terminators, multilineCommands, legalChars, commentInProgress,
+ blankLinesAllowed, prefixParser):
+ """Builds a PyParsing parser for interpreting user commands."""
+
+ # Build several parsing components that are eventually compiled into overall parser
+ output_destination_parser = (pyparsing.Literal(redirector * 2) |
+ (pyparsing.WordStart() + redirector) |
+ pyparsing.Regex('[^=]' + redirector))('output')
+
+ terminator_parser = pyparsing.Or(
+ [(hasattr(t, 'parseString') and t) or pyparsing.Literal(t) for t in terminators])('terminator')
+ string_end = pyparsing.stringEnd ^ '\nEOF'
+ multilineCommand = pyparsing.Or(
+ [pyparsing.Keyword(c, caseless=False) for c in multilineCommands])('multilineCommand')
+ oneline_command = (~multilineCommand + pyparsing.Word(legalChars))('command')
+ pipe = pyparsing.Keyword('|', identChars='|')
+ do_not_parse = self.commentGrammars | commentInProgress | pyparsing.quotedString
+ after_elements = \
+ pyparsing.Optional(pipe + pyparsing.SkipTo(output_destination_parser ^ string_end,
+ ignore=do_not_parse)('pipeTo')) + \
+ pyparsing.Optional(output_destination_parser +
+ pyparsing.SkipTo(string_end, ignore=do_not_parse).
+ setParseAction(lambda x: strip_quotes(x[0].strip()))('outputTo'))
+
+ multilineCommand.setParseAction(lambda x: x[0])
+ oneline_command.setParseAction(lambda x: x[0])
+
+ if blankLinesAllowed:
+ blankLineTerminationParser = pyparsing.NoMatch
+ else:
+ blankLineTerminator = (pyparsing.lineEnd + pyparsing.lineEnd)('terminator')
+ blankLineTerminator.setResultsName('terminator')
+ blankLineTerminationParser = ((multilineCommand ^ oneline_command) +
+ pyparsing.SkipTo(blankLineTerminator, ignore=do_not_parse).setParseAction(
+ lambda x: x[0].strip())('args') + blankLineTerminator)('statement')
+
+ multilineParser = (((multilineCommand ^ oneline_command) +
+ pyparsing.SkipTo(terminator_parser,
+ ignore=do_not_parse).setParseAction(lambda x: x[0].strip())('args') +
+ terminator_parser)('statement') +
+ pyparsing.SkipTo(output_destination_parser ^ pipe ^ string_end,
+ ignore=do_not_parse).setParseAction(lambda x: x[0].strip())('suffix') +
+ after_elements)
+ multilineParser.ignore(commentInProgress)
+
+ singleLineParser = ((oneline_command +
+ pyparsing.SkipTo(terminator_parser ^ string_end ^ pipe ^ output_destination_parser,
+ ignore=do_not_parse).setParseAction(
+ lambda x: x[0].strip())('args'))('statement') +
+ pyparsing.Optional(terminator_parser) + after_elements)
+
+ blankLineTerminationParser = blankLineTerminationParser.setResultsName('statement')
+
+ parser = prefixParser + (
+ string_end |
+ multilineParser |
+ singleLineParser |
+ blankLineTerminationParser |
+ multilineCommand + pyparsing.SkipTo(string_end, ignore=do_not_parse)
+ )
+ parser.ignore(self.commentGrammars)
+ return parser
+
+ @staticmethod
+ def _build_input_source_parser(legalChars, commentInProgress):
+ """Builds a PyParsing parser for alternate user input sources (from file, pipe, etc.)"""
+
+ input_mark = pyparsing.Literal('<')
+ input_mark.setParseAction(lambda x: '')
+
+ # Also allow spaces, slashes, and quotes
+ file_name = pyparsing.Word(legalChars + ' /\\"\'')
+
+ input_from = file_name('inputFrom')
+ input_from.setParseAction(replace_with_file_contents)
+ # a not-entirely-satisfactory way of distinguishing < as in "import from" from <
+ # as in "lesser than"
+ inputParser = input_mark + pyparsing.Optional(input_from) + pyparsing.Optional('>') + \
+ pyparsing.Optional(file_name) + (pyparsing.stringEnd | '|')
+ inputParser.ignore(commentInProgress)
+ return inputParser
+
+ def parsed(self, raw):
+ """ This function is where the actual parsing of each line occurs.
+
+ :param raw: str - the line of text as it was entered
+ :return: ParsedString - custom subclass of str with extra attributes
+ """
+ if isinstance(raw, ParsedString):
+ p = raw
+ else:
+ # preparse is an overridable hook; default makes no changes
+ s = self.preparse(raw)
+ s = self.input_source_parser.transformString(s.lstrip())
+ s = self.commentGrammars.transformString(s)
+
+ # Make a copy of aliases so we can edit it
+ tmp_aliases = list(self.aliases.keys())
+ keep_expanding = len(tmp_aliases) > 0
+
+ # Expand aliases
+ while keep_expanding:
+ for cur_alias in tmp_aliases:
+ keep_expanding = False
+
+ if s == cur_alias or s.startswith(cur_alias + ' '):
+ s = s.replace(cur_alias, self.aliases[cur_alias], 1)
+
+ # Do not expand the same alias more than once
+ tmp_aliases.remove(cur_alias)
+ keep_expanding = len(tmp_aliases) > 0
+ break
+
+ # Expand command shortcut to its full command name
+ for (shortcut, expansion) in self.shortcuts:
+ if s.startswith(shortcut):
+ # If the next character after the shortcut isn't a space, then insert one
+ shortcut_len = len(shortcut)
+ if len(s) == shortcut_len or s[shortcut_len] != ' ':
+ expansion += ' '
+
+ # Expand the shortcut
+ s = s.replace(shortcut, expansion, 1)
+ break
+
+ try:
+ result = self.main_parser.parseString(s)
+ except pyparsing.ParseException:
+ # If we have a parsing failure, treat it is an empty command and move to next prompt
+ result = self.main_parser.parseString('')
+ result['raw'] = raw
+ result['command'] = result.multilineCommand or result.command
+ result = self.postparse(result)
+ p = ParsedString(result.args)
+ p.parsed = result
+ p.parser = self.parsed
+ return p
+
+
+class HistoryItem(str):
+ """Class used to represent an item in the History list.
+
+ Thin wrapper around str class which adds a custom format for printing. It
+ also keeps track of its index in the list as well as a lowercase
+ representation of itself for convenience/efficiency.
+
+ """
+ listformat = '-------------------------[{}]\n{}\n'
+
+ # noinspection PyUnusedLocal
+ def __init__(self, instr):
+ str.__init__(self)
+ self.lowercase = self.lower()
+ self.idx = None
+
+ def pr(self):
+ """Represent a HistoryItem in a pretty fashion suitable for printing.
+
+ :return: str - pretty print string version of a HistoryItem
+ """
+ return self.listformat.format(self.idx, str(self).rstrip())
+
+
+class History(list):
+ """ A list of HistoryItems that knows how to respond to user requests. """
+
+ # noinspection PyMethodMayBeStatic
+ def _zero_based_index(self, onebased):
+ result = onebased
+ if result > 0:
+ result -= 1
+ return result
+
+ def _to_index(self, raw):
+ if raw:
+ result = self._zero_based_index(int(raw))
+ else:
+ result = None
+ return result
+
+ spanpattern = re.compile(r'^\s*(?P<start>-?\d+)?\s*(?P<separator>:|(\.{2,}))?\s*(?P<end>-?\d+)?\s*$')
+
+ def span(self, raw):
+ """Parses the input string search for a span pattern and if if found, returns a slice from the History list.
+
+ :param raw: str - string potentially containing a span of the forms a..b, a:b, a:, ..b
+ :return: List[HistoryItem] - slice from the History list
+ """
+ if raw.lower() in ('*', '-', 'all'):
+ raw = ':'
+ results = self.spanpattern.search(raw)
+ if not results:
+ raise IndexError
+ if not results.group('separator'):
+ return [self[self._to_index(results.group('start'))]]
+ start = self._to_index(results.group('start')) or 0 # Ensure start is not None
+ end = self._to_index(results.group('end'))
+ reverse = False
+ if end is not None:
+ if end < start:
+ (start, end) = (end, start)
+ reverse = True
+ end += 1
+ result = self[start:end]
+ if reverse:
+ result.reverse()
+ return result
+
+ rangePattern = re.compile(r'^\s*(?P<start>[\d]+)?\s*-\s*(?P<end>[\d]+)?\s*$')
+
+ def append(self, new):
+ """Append a HistoryItem to end of the History list
+
+ :param new: str - command line to convert to HistoryItem and add to the end of the History list
+ """
+ new = HistoryItem(new)
+ list.append(self, new)
+ new.idx = len(self)
+
+ def get(self, getme=None):
+ """Get an item or items from the History list using 1-based indexing.
+
+ :param getme: int or str - item(s) to get - either an integer index or string to search for
+ :return: List[str] - list of HistoryItems matching the retrieval criteria
+ """
+ if not getme:
+ return self
+ try:
+ getme = int(getme)
+ if getme < 0:
+ return self[:(-1 * getme)]
+ else:
+ return [self[getme - 1]]
+ except IndexError:
+ return []
+ except ValueError:
+ range_result = self.rangePattern.search(getme)
+ if range_result:
+ start = range_result.group('start') or None
+ end = range_result.group('start') or None
+ if start:
+ start = int(start) - 1
+ if end:
+ end = int(end)
+ return self[start:end]
+
+ # noinspection PyUnresolvedReferences
+ getme = getme.strip()
+
+ if getme.startswith(r'/') and getme.endswith(r'/'):
+ finder = re.compile(getme[1:-1], re.DOTALL | re.MULTILINE | re.IGNORECASE)
+
+ def isin(hi):
+ """Listcomp filter function for doing a regular expression search of History.
+
+ :param hi: HistoryItem
+ :return: bool - True if search matches
+ """
+ return finder.search(hi)
+ else:
+ def isin(hi):
+ """Listcomp filter function for doing a case-insensitive string search of History.
+
+ :param hi: HistoryItem
+ :return: bool - True if search matches
+ """
+ return getme.lower() in hi.lowercase
+ return [itm for itm in self if isin(itm)]
+
+
+def cast(current, new):
+ """Tries to force a new value into the same type as the current when trying to set the value for a parameter.
+
+ :param current: current value for the parameter, type varies
+ :param new: str - new value
+ :return: new value with same type as current, or the current value if there was an error casting
+ """
+ typ = type(current)
+ if typ == bool:
+ try:
+ return bool(int(new))
+ except (ValueError, TypeError):
+ pass
+ try:
+ new = new.lower()
+ except AttributeError:
+ pass
+ if (new == 'on') or (new[0] in ('y', 't')):
+ return True
+ if (new == 'off') or (new[0] in ('n', 'f')):
+ return False
+ else:
+ try:
+ return typ(new)
+ except (ValueError, TypeError):
+ pass
+ print("Problem setting parameter (now %s) to %s; incorrect type?" % (current, new))
+ return current
+
+
+class Statekeeper(object):
+ """Class used to save and restore state during load and py commands as well as when redirecting output or pipes."""
+ def __init__(self, obj, attribs):
+ """Use the instance attributes as a generic key-value store to copy instance attributes from outer object.
+
+ :param obj: instance of cmd2.Cmd derived class (your application instance)
+ :param attribs: Tuple[str] - tuple of strings listing attributes of obj to save a copy of
+ """
+ self.obj = obj
+ self.attribs = attribs
+ if self.obj:
+ self._save()
+
+ def _save(self):
+ """Create copies of attributes from self.obj inside this Statekeeper instance."""
+ for attrib in self.attribs:
+ setattr(self, attrib, getattr(self.obj, attrib))
+
+ def restore(self):
+ """Overwrite attributes in self.obj with the saved values stored in this Statekeeper instance."""
+ if self.obj:
+ for attrib in self.attribs:
+ setattr(self.obj, attrib, getattr(self, attrib))
+
+
+class OutputTrap(object):
+ """Instantiate an OutputTrap to divert/capture ALL stdout output. For use in transcript testing."""
+
+ def __init__(self):
+ self.contents = ''
+
+ def write(self, txt):
+ """Add text to the internal contents.
+
+ :param txt: str
+ """
+ self.contents += txt
+
+ def read(self):
+ """Read from the internal contents and then clear them out.
+
+ :return: str - text from the internal contents
+ """
+ result = self.contents
+ self.contents = ''
+ return result
+
+
+class Cmd2TestCase(unittest.TestCase):
+ """Subclass this, setting CmdApp, to make a unittest.TestCase class
+ that will execute the commands in a transcript file and expect the results shown.
+ See example.py"""
+ cmdapp = None
+
+ def fetchTranscripts(self):
+ self.transcripts = {}
+ for fileset in self.cmdapp.testfiles:
+ for fname in glob.glob(fileset):
+ tfile = open(fname)
+ self.transcripts[fname] = iter(tfile.readlines())
+ tfile.close()
+ if not len(self.transcripts):
+ raise Exception("No test files found - nothing to test.")
+
+ def setUp(self):
+ if self.cmdapp:
+ self.fetchTranscripts()
+
+ # Trap stdout
+ self._orig_stdout = self.cmdapp.stdout
+ self.cmdapp.stdout = OutputTrap()
+
+ def runTest(self): # was testall
+ if self.cmdapp:
+ its = sorted(self.transcripts.items())
+ for (fname, transcript) in its:
+ self._test_transcript(fname, transcript)
+
+ def _test_transcript(self, fname, transcript):
+ line_num = 0
+ finished = False
+ line = strip_ansi(next(transcript))
+ line_num += 1
+ while not finished:
+ # Scroll forward to where actual commands begin
+ while not line.startswith(self.cmdapp.visible_prompt):
+ try:
+ line = strip_ansi(next(transcript))
+ except StopIteration:
+ finished = True
+ break
+ line_num += 1
+ command = [line[len(self.cmdapp.visible_prompt):]]
+ line = next(transcript)
+ # Read the entirety of a multi-line command
+ while line.startswith(self.cmdapp.continuation_prompt):
+ command.append(line[len(self.cmdapp.continuation_prompt):])
+ try:
+ line = next(transcript)
+ except StopIteration:
+ raise (StopIteration,
+ 'Transcript broke off while reading command beginning at line {} with\n{}'.format(line_num,
+ command[0])
+ )
+ line_num += 1
+ command = ''.join(command)
+ # Send the command into the application and capture the resulting output
+ # TODO: Should we get the return value and act if stop == True?
+ self.cmdapp.onecmd_plus_hooks(command)
+ result = self.cmdapp.stdout.read()
+ # Read the expected result from transcript
+ if strip_ansi(line).startswith(self.cmdapp.visible_prompt):
+ message = '\nFile {}, line {}\nCommand was:\n{}\nExpected: (nothing)\nGot:\n{}\n'.format(
+ fname, line_num, command, result)
+ self.assert_(not (result.strip()), message)
+ continue
+ expected = []
+ while not strip_ansi(line).startswith(self.cmdapp.visible_prompt):
+ expected.append(line)
+ try:
+ line = next(transcript)
+ except StopIteration:
+ finished = True
+ break
+ line_num += 1
+ expected = ''.join(expected)
+
+ # transform the expected text into a valid regular expression
+ expected = self._transform_transcript_expected(expected)
+ message = '\nFile {}, line {}\nCommand was:\n{}\nExpected:\n{}\nGot:\n{}\n'.format(
+ fname, line_num, command, expected, result)
+ self.assertTrue(re.match(expected, result, re.MULTILINE | re.DOTALL), message)
+
+ def _transform_transcript_expected(self, s):
+ """parse the string with slashed regexes into a valid regex
+
+ Given a string like:
+
+ Match a 10 digit phone number: /\d{3}-\d{3}-\d{4}/
+
+ Turn it into a valid regular expression which matches the literal text
+ of the string and the regular expression. We have to remove the slashes
+ because they differentiate between plain text and a regular expression.
+ Unless the slashes are escaped, in which case they are interpreted as
+ plain text, or there is only one slash, which is treated as plain text
+ also.
+
+ Check the tests in tests/test_transcript.py to see all the edge
+ cases.
+ """
+ regex = ''
+ start = 0
+
+ while True:
+ (regex, first_slash_pos, start) = self._escaped_find(regex, s, start, False)
+ if first_slash_pos == -1:
+ # no more slashes, add the rest of the string and bail
+ regex += re.escape(s[start:])
+ break
+ else:
+ # there is a slash, add everything we have found so far
+ # add stuff before the first slash as plain text
+ regex += re.escape(s[start:first_slash_pos])
+ start = first_slash_pos+1
+ # and go find the next one
+ (regex, second_slash_pos, start) = self._escaped_find(regex, s, start, True)
+ if second_slash_pos > 0:
+ # add everything between the slashes (but not the slashes)
+ # as a regular expression
+ regex += s[start:second_slash_pos]
+ # and change where we start looking for slashed on the
+ # turn through the loop
+ start = second_slash_pos + 1
+ else:
+ # No closing slash, we have to add the first slash,
+ # and the rest of the text
+ regex += re.escape(s[start-1:])
+ break
+ return regex
+
+ @staticmethod
+ def _escaped_find(regex, s, start, in_regex):
+ """
+ Find the next slash in {s} after {start} that is not preceded by a backslash.
+
+ If we find an escaped slash, add everything up to and including it to regex,
+ updating {start}. {start} therefore serves two purposes, tells us where to start
+ looking for the next thing, and also tells us where in {s} we have already
+ added things to {regex}
+
+ {in_regex} specifies whether we are currently searching in a regex, we behave
+ differently if we are or if we aren't.
+ """
+
+ while True:
+ pos = s.find('/', start)
+ if pos == -1:
+ # no match, return to caller
+ break
+ elif pos == 0:
+ # slash at the beginning of the string, so it can't be
+ # escaped. We found it.
+ break
+ else:
+ # check if the slash is preceeded by a backslash
+ if s[pos-1:pos] == '\\':
+ # it is.
+ if in_regex:
+ # add everything up to the backslash as a
+ # regular expression
+ regex += s[start:pos-1]
+ # skip the backslash, and add the slash
+ regex += s[pos]
+ else:
+ # add everything up to the backslash as escaped
+ # plain text
+ regex += re.escape(s[start:pos-1])
+ # and then add the slash as escaped
+ # plain text
+ regex += re.escape(s[pos])
+ # update start to show we have handled everything
+ # before it
+ start = pos+1
+ # and continue to look
+ else:
+ # slash is not escaped, this is what we are looking for
+ break
+ return regex, pos, start
+
+ def tearDown(self):
+ if self.cmdapp:
+ # Restore stdout
+ self.cmdapp.stdout = self._orig_stdout
+
+
+def namedtuple_with_two_defaults(typename, field_names, default_values=('', '')):
+ """Wrapper around namedtuple which lets you treat the last value as optional.
+
+ :param typename: str - type name for the Named tuple
+ :param field_names: List[str] or space-separated string of field names
+ :param default_values: (optional) 2-element tuple containing the default values for last 2 parameters in named tuple
+ Defaults to an empty string for both of them
+ :return: namedtuple type
+ """
+ T = collections.namedtuple(typename, field_names)
+ # noinspection PyUnresolvedReferences
+ T.__new__.__defaults__ = default_values
+ return T
+
+
+class CmdResult(namedtuple_with_two_defaults('CmdResult', ['out', 'err', 'war'])):
+ """Derive a class to store results from a named tuple so we can tweak dunder methods for convenience.
+
+ This is provided as a convenience and an example for one possible way for end users to store results in
+ the self._last_result attribute of cmd2.Cmd class instances. See the "python_scripting.py" example for how it can
+ be used to enable conditional control flow.
+
+ Named tuple attributes
+ ----------------------
+ out - this is intended to store normal output data from the command and can be of any type that makes sense
+ err: str - (optional) this is intended to store an error message and it being non-empty indicates there was an error
+ Defaults to an empty string
+ war: str - (optional) this is intended to store a warning message which isn't quite an error, but of note
+ Defaults to an empty string.
+
+ NOTE: Named tuples are immutable. So the contents are there for access, not for modification.
+ """
+ def __bool__(self):
+ """If err is an empty string, treat the result as a success; otherwise treat it as a failure."""
+ return not self.err
+
+
+if __name__ == '__main__':
+ # If run as the main application, simply start a bare-bones cmd2 application with only built-in functionality.
+
+ # Set "use_ipython" to True to include the ipy command if IPython is installed, which supports advanced interactive
+ # debugging of your application via introspection on self.
+ app = Cmd(use_ipython=False)
+ app.cmdloop()