summaryrefslogtreecommitdiff
path: root/cmd2.py
diff options
context:
space:
mode:
Diffstat (limited to 'cmd2.py')
-rwxr-xr-xcmd2.py4069
1 files changed, 0 insertions, 4069 deletions
diff --git a/cmd2.py b/cmd2.py
deleted file mode 100755
index 54eff811..00000000
--- a/cmd2.py
+++ /dev/null
@@ -1,4069 +0,0 @@
-#!/usr/bin/env python
-# coding=utf-8
-"""Variant on standard library's cmd with extra features.
-
-To use, simply import cmd2.Cmd instead of cmd.Cmd; use precisely as though you
-were using the standard library's cmd, while enjoying the extra features.
-
-Searchable command history (commands: "history")
-Load commands from file, save to file, edit commands in file
-Multi-line commands
-Special-character shortcut commands (beyond cmd's "@" and "!")
-Settable environment parameters
-Parsing commands with `argparse` argument parsers (flags)
-Redirection to file with >, >>; input from file with <
-Easy transcript-based testing of applications (see examples/example.py)
-Bash-style ``select`` available
-
-Note that redirection with > and | will only work if `self.poutput()`
-is used in place of `print`.
-
-- Catherine Devlin, Jan 03 2008 - catherinedevlin.blogspot.com
-
-Git repository on GitHub at https://github.com/python-cmd2/cmd2
-"""
-import argparse
-import atexit
-import cmd
-import codecs
-import collections
-import copy
-import datetime
-import functools
-import glob
-import io
-from io import StringIO
-import os
-import platform
-import re
-import shlex
-import signal
-import subprocess
-import sys
-import tempfile
-import traceback
-import unittest
-from code import InteractiveConsole
-
-try:
- from enum34 import Enum
-except ImportError:
- from enum import Enum
-
-import pyparsing
-import pyperclip
-
-# Newer versions of pyperclip are released as a single file, but older versions had a more complicated structure
-try:
- from pyperclip.exceptions import PyperclipException
-except ImportError:
- # noinspection PyUnresolvedReferences
- from pyperclip import PyperclipException
-
-# Collection is a container that is sizable and iterable
-# It was introduced in Python 3.6. We will try to import it, otherwise use our implementation
-try:
- from collections.abc import Collection, Iterable
-except ImportError:
- from collections.abc import Sized, Iterable, Container
-
- # noinspection PyAbstractClass
- class Collection(Sized, Iterable, Container):
-
- __slots__ = ()
-
- # noinspection PyPep8Naming
- @classmethod
- def __subclasshook__(cls, C):
- if cls is Collection:
- if any("__len__" in B.__dict__ for B in C.__mro__) and \
- any("__iter__" in B.__dict__ for B in C.__mro__) and \
- any("__contains__" in B.__dict__ for B in C.__mro__):
- return True
- return NotImplemented
-
-# Python 3.4 require contextlib2 for temporarily redirecting stderr and stdout
-if sys.version_info < (3, 5):
- from contextlib2 import redirect_stdout, redirect_stderr
-else:
- from contextlib import redirect_stdout, redirect_stderr
-
-# Detect whether IPython is installed to determine if the built-in "ipy" command should be included
-ipython_available = True
-try:
- # noinspection PyUnresolvedReferences,PyPackageRequirements
- from IPython import embed
-except ImportError:
- ipython_available = False
-
-# Prefer statically linked gnureadline if available (for macOS compatibility due to issues with libedit)
-try:
- import gnureadline as readline
-except ImportError:
- # Try to import readline, but allow failure for convenience in Windows unit testing
- # Note: If this actually fails, you should install readline on Linux or Mac or pyreadline on Windows
- try:
- # noinspection PyUnresolvedReferences
- import readline
- except ImportError:
- pass
-
-# Check what implementation of readline we are using
-class RlType(Enum):
- GNU = 1
- PYREADLINE = 2
- NONE = 3
-
-rl_type = RlType.NONE
-
-if 'pyreadline' in sys.modules:
- rl_type = RlType.PYREADLINE
-
- # Save the original pyreadline display completion function since we need to override it and restore it
- # noinspection PyProtectedMember
- orig_pyreadline_display = readline.rl.mode._display_completions
-
-elif 'gnureadline' in sys.modules or 'readline' in sys.modules:
- rl_type = RlType.GNU
-
- # We need wcswidth to calculate display width of tab completions
- from wcwidth import wcswidth
-
- # Load the readline lib so we can make changes to it
- import ctypes
- readline_lib = ctypes.CDLL(readline.__file__)
-
- # Save address that rl_basic_quote_characters is pointing to since we need to override and restore it
- rl_basic_quote_characters = ctypes.c_char_p.in_dll(readline_lib, "rl_basic_quote_characters")
- orig_rl_basic_quote_characters_addr = ctypes.cast(rl_basic_quote_characters, ctypes.c_void_p).value
-
-__version__ = '0.9.0'
-
-# Pyparsing enablePackrat() can greatly speed up parsing, but problems have been seen in Python 3 in the past
-pyparsing.ParserElement.enablePackrat()
-
-# Override the default whitespace chars in Pyparsing so that newlines are not treated as whitespace
-pyparsing.ParserElement.setDefaultWhitespaceChars(' \t')
-
-
-# The next 2 variables and associated setter functions effect how arguments are parsed for decorated commands
-# which use one of the decorators: @with_argument_list, @with_argparser, or @with_argparser_and_unknown_args
-# The defaults are sane and maximize ease of use for new applications based on cmd2.
-
-# Use POSIX or Non-POSIX (Windows) rules for splitting a command-line string into a list of arguments via shlex.split()
-POSIX_SHLEX = False
-
-# Strip outer quotes for convenience if POSIX_SHLEX = False
-STRIP_QUOTES_FOR_NON_POSIX = True
-
-# Used for tab completion and word breaks. Do not change.
-QUOTES = ['"', "'"]
-REDIRECTION_CHARS = ['|', '<', '>']
-
-# optional attribute, when tagged on a function, allows cmd2 to categorize commands
-HELP_CATEGORY = 'help_category'
-HELP_SUMMARY = 'help_summary'
-
-
-def categorize(func, category):
- """Categorize a function.
-
- The help command output will group this function under the specified category heading
-
- :param func: Union[Callable, Iterable] - function to categorize
- :param category: str - category to put it in
- """
- if isinstance(func, Iterable):
- for item in func:
- setattr(item, HELP_CATEGORY, category)
- else:
- setattr(func, HELP_CATEGORY, category)
-
-
-def set_posix_shlex(val):
- """ Allows user of cmd2 to choose between POSIX and non-POSIX splitting of args for decorated commands.
-
- :param val: bool - True => POSIX, False => Non-POSIX
- """
- global POSIX_SHLEX
- POSIX_SHLEX = val
-
-
-def set_strip_quotes(val):
- """ Allows user of cmd2 to choose whether to automatically strip outer-quotes when POSIX_SHLEX is False.
-
- :param val: bool - True => strip quotes on args for decorated commands if POSIX_SHLEX is False.
- """
- global STRIP_QUOTES_FOR_NON_POSIX
- STRIP_QUOTES_FOR_NON_POSIX = val
-
-
-def _which(editor):
- try:
- editor_path = subprocess.check_output(['which', editor], stderr=subprocess.STDOUT).strip()
- editor_path = editor_path.decode()
- except subprocess.CalledProcessError:
- editor_path = None
- return editor_path
-
-
-def strip_quotes(arg):
- """ Strip outer quotes from a string.
-
- Applies to both single and double quotes.
-
- :param arg: str - string to strip outer quotes from
- :return str - same string with potentially outer quotes stripped
- """
- quote_chars = '"' + "'"
-
- if len(arg) > 1 and arg[0] == arg[-1] and arg[0] in quote_chars:
- arg = arg[1:-1]
- return arg
-
-
-def parse_quoted_string(cmdline):
- """Parse a quoted string into a list of arguments."""
- if isinstance(cmdline, list):
- # arguments are already a list, return the list we were passed
- lexed_arglist = cmdline
- else:
- # Use shlex to split the command line into a list of arguments based on shell rules
- lexed_arglist = shlex.split(cmdline, posix=POSIX_SHLEX)
- # If not using POSIX shlex, make sure to strip off outer quotes for convenience
- if not POSIX_SHLEX and STRIP_QUOTES_FOR_NON_POSIX:
- temp_arglist = []
- for arg in lexed_arglist:
- temp_arglist.append(strip_quotes(arg))
- lexed_arglist = temp_arglist
- return lexed_arglist
-
-
-def with_category(category):
- """A decorator to apply a category to a command function"""
- def cat_decorator(func):
- categorize(func, category)
- return func
- return cat_decorator
-
-
-def with_argument_list(func):
- """A decorator to alter the arguments passed to a do_* cmd2
- method. Default passes a string of whatever the user typed.
- With this decorator, the decorated method will receive a list
- of arguments parsed from user input using shlex.split()."""
- @functools.wraps(func)
- def cmd_wrapper(self, cmdline):
- lexed_arglist = parse_quoted_string(cmdline)
- return func(self, lexed_arglist)
-
- cmd_wrapper.__doc__ = func.__doc__
- return cmd_wrapper
-
-
-def with_argparser_and_unknown_args(argparser):
- """A decorator to alter a cmd2 method to populate its ``args`` argument by parsing arguments with the given
- instance of argparse.ArgumentParser, but also returning unknown args as a list.
-
- :param argparser: argparse.ArgumentParser - given instance of ArgumentParser
- :return: function that gets passed parsed args and a list of unknown args
- """
-
- # noinspection PyProtectedMember
- def arg_decorator(func):
- @functools.wraps(func)
- def cmd_wrapper(instance, cmdline):
- lexed_arglist = parse_quoted_string(cmdline)
- args, unknown = argparser.parse_known_args(lexed_arglist)
- return func(instance, args, unknown)
-
- # argparser defaults the program name to sys.argv[0]
- # we want it to be the name of our command
- argparser.prog = func.__name__[3:]
-
- # If the description has not been set, then use the method docstring if one exists
- if argparser.description is None and func.__doc__:
- argparser.description = func.__doc__
-
- if func.__doc__:
- setattr(cmd_wrapper, HELP_SUMMARY, func.__doc__)
-
- cmd_wrapper.__doc__ = argparser.format_help()
-
- # Mark this function as having an argparse ArgumentParser (used by do_help)
- cmd_wrapper.__dict__['has_parser'] = True
-
- # If there are subcommands, store their names in a list to support tab-completion of subcommand names
- if argparser._subparsers is not None:
- subcommand_names = argparser._subparsers._group_actions[0]._name_parser_map.keys()
- cmd_wrapper.__dict__['subcommand_names'] = subcommand_names
-
- return cmd_wrapper
-
- return arg_decorator
-
-
-def with_argparser(argparser):
- """A decorator to alter a cmd2 method to populate its ``args`` argument by parsing arguments
- with the given instance of argparse.ArgumentParser.
-
- :param argparser: argparse.ArgumentParser - given instance of ArgumentParser
- :return: function that gets passed parsed args
- """
-
- # noinspection PyProtectedMember
- def arg_decorator(func):
- @functools.wraps(func)
- def cmd_wrapper(instance, cmdline):
- lexed_arglist = parse_quoted_string(cmdline)
- args = argparser.parse_args(lexed_arglist)
- return func(instance, args)
-
- # argparser defaults the program name to sys.argv[0]
- # we want it to be the name of our command
- argparser.prog = func.__name__[3:]
-
- # If the description has not been set, then use the method docstring if one exists
- if argparser.description is None and func.__doc__:
- argparser.description = func.__doc__
-
- if func.__doc__:
- setattr(cmd_wrapper, HELP_SUMMARY, func.__doc__)
-
- cmd_wrapper.__doc__ = argparser.format_help()
-
- # Mark this function as having an argparse ArgumentParser (used by do_help)
- cmd_wrapper.__dict__['has_parser'] = True
-
- # If there are subcommands, store their names in a list to support tab-completion of subcommand names
- if argparser._subparsers is not None:
-
- # Key is subcommand name and value is completer function
- subcommands = collections.OrderedDict()
-
- # Get all subcommands and check if they have completer functions
- for name, parser in argparser._subparsers._group_actions[0]._name_parser_map.items():
- if 'completer' in parser._defaults:
- completer = parser._defaults['completer']
- else:
- completer = None
- subcommands[name] = completer
-
- cmd_wrapper.__dict__['subcommands'] = subcommands
-
- return cmd_wrapper
-
- return arg_decorator
-
-
-# Can we access the clipboard? Should always be true on Windows and Mac, but only sometimes on Linux
-# noinspection PyUnresolvedReferences
-try:
- # Get the version of the pyperclip module as a float
- pyperclip_ver = float('.'.join(pyperclip.__version__.split('.')[:2]))
-
- # The extraneous output bug in pyperclip on Linux using xclip was fixed in more recent versions of pyperclip
- if sys.platform.startswith('linux') and pyperclip_ver < 1.6:
- # Avoid extraneous output to stderr from xclip when clipboard is empty at cost of overwriting clipboard contents
- pyperclip.copy('')
- else:
- # Try getting the contents of the clipboard
- _ = pyperclip.paste()
-except PyperclipException:
- can_clip = False
-else:
- can_clip = True
-
-
-def get_paste_buffer():
- """Get the contents of the clipboard / paste buffer.
-
- :return: str - contents of the clipboard
- """
- pb_str = pyperclip.paste()
- return pb_str
-
-
-def write_to_paste_buffer(txt):
- """Copy text to the clipboard / paste buffer.
-
- :param txt: str - text to copy to the clipboard
- """
- pyperclip.copy(txt)
-
-
-class ParsedString(str):
- """Subclass of str which also stores a pyparsing.ParseResults object containing structured parse results."""
- # pyarsing.ParseResults - structured parse results, to provide multiple means of access to the parsed data
- parsed = None
-
- # Function which did the parsing
- parser = None
-
- def full_parsed_statement(self):
- """Used to reconstruct the full parsed statement when a command isn't recognized."""
- new = ParsedString('%s %s' % (self.parsed.command, self.parsed.args))
- new.parsed = self.parsed
- new.parser = self.parser
- return new
-
-
-def replace_with_file_contents(fname):
- """Action to perform when successfully matching parse element definition for inputFrom parser.
-
- :param fname: str - filename
- :return: str - contents of file "fname"
- """
- try:
- # Any outer quotes are not part of the filename
- unquoted_file = strip_quotes(fname[0])
- with open(os.path.expanduser(unquoted_file)) as source_file:
- result = source_file.read()
- except IOError:
- result = '< %s' % fname[0] # wasn't a file after all
-
- # TODO: IF pyparsing input parser logic gets fixed to support empty file, add support to get from paste buffer
- return result
-
-
-class EmbeddedConsoleExit(SystemExit):
- """Custom exception class for use with the py command."""
- pass
-
-
-class EmptyStatement(Exception):
- """Custom exception class for handling behavior when the user just presses <Enter>."""
- pass
-
-
-# Regular expression to match ANSI escape codes
-ANSI_ESCAPE_RE = re.compile(r'\x1b[^m]*m')
-
-
-def strip_ansi(text):
- """Strip ANSI escape codes from a string.
-
- :param text: str - a string which may contain ANSI escape codes
- :return: str - the same string with any ANSI escape codes removed
- """
- return ANSI_ESCAPE_RE.sub('', text)
-
-
-def _pop_readline_history(clear_history=True):
- """Returns a copy of readline's history and optionally clears it (default)"""
- # noinspection PyArgumentList
- history = [
- readline.get_history_item(i)
- for i in range(1, 1 + readline.get_current_history_length())
- ]
- if clear_history:
- readline.clear_history()
- return history
-
-
-def _push_readline_history(history, clear_history=True):
- """Restores readline's history and optionally clears it first (default)"""
- if clear_history:
- readline.clear_history()
- for line in history:
- readline.add_history(line)
-
-
-def _complete_from_cmd(cmd_obj, text, line, begidx, endidx):
- """Complete as though the user was typing inside cmd's cmdloop()"""
- from itertools import takewhile
- command_subcommand_params = line.split(None, 3)
-
- if len(command_subcommand_params) < (3 if text else 2):
- n = len(command_subcommand_params[0])
- n += sum(1 for _ in takewhile(str.isspace, line[n:]))
- return cmd_obj.completenames(text, line[n:], begidx - n, endidx - n)
-
- command, subcommand = command_subcommand_params[:2]
- n = len(command) + sum(1 for _ in takewhile(str.isspace, line))
- cfun = getattr(cmd_obj, 'complete_' + subcommand, cmd_obj.complete)
- return cfun(text, line[n:], begidx - n, endidx - n)
-
-
-class AddSubmenu(object):
- """Conveniently add a submenu (Cmd-like class) to a Cmd
-
- e.g. given "class SubMenu(Cmd): ..." then
-
- @AddSubmenu(SubMenu(), 'sub')
- class MyCmd(cmd.Cmd):
- ....
-
- will have the following effects:
- 1. 'sub' will interactively enter the cmdloop of a SubMenu instance
- 2. 'sub cmd args' will call do_cmd(args) in a SubMenu instance
- 3. 'sub ... [TAB]' will have the same behavior as [TAB] in a SubMenu cmdloop
- i.e., autocompletion works the way you think it should
- 4. 'help sub [cmd]' will print SubMenu's help (calls its do_help())
- """
-
- class _Nonexistent(object):
- """
- Used to mark missing attributes.
- Disable __dict__ creation since this class does nothing
- """
- __slots__ = () #
-
- def __init__(self,
- submenu,
- command,
- aliases=(),
- reformat_prompt="{super_prompt}>> {sub_prompt}",
- shared_attributes=None,
- require_predefined_shares=True,
- create_subclass=False,
- preserve_shares=False,
- persistent_history_file=None
- ):
- """Set up the class decorator
-
- submenu (Cmd): Instance of something cmd.Cmd-like
-
- command (str): The command the user types to access the SubMenu instance
-
- aliases (iterable): More commands that will behave like "command"
-
- reformat_prompt (str): Format str or None to disable
- if it's a string, it should contain one or more of:
- {super_prompt}: The current cmd's prompt
- {command}: The command in the current cmd with which it was called
- {sub_prompt}: The subordinate cmd's original prompt
- the default is "{super_prompt}{command} {sub_prompt}"
-
- shared_attributes (dict): dict of the form {'subordinate_attr': 'parent_attr'}
- the attributes are copied to the submenu at the last moment; the submenu's
- attributes are backed up before this and restored afterward
-
- require_predefined_shares: The shared attributes above must be independently
- defined in the subordinate Cmd (default: True)
-
- create_subclass: put the modifications in a subclass rather than modifying
- the existing class (default: False)
- """
- self.submenu = submenu
- self.command = command
- self.aliases = aliases
- if persistent_history_file:
- self.persistent_history_file = os.path.expanduser(persistent_history_file)
- else:
- self.persistent_history_file = None
-
- if reformat_prompt is not None and not isinstance(reformat_prompt, str):
- raise ValueError("reformat_prompt should be either a format string or None")
- self.reformat_prompt = reformat_prompt
-
- self.shared_attributes = {} if shared_attributes is None else shared_attributes
- if require_predefined_shares:
- for attr in self.shared_attributes.keys():
- if not hasattr(submenu, attr):
- raise AttributeError("The shared attribute '{attr}' is not defined in {cmd}. Either define {attr} "
- "in {cmd} or set require_predefined_shares=False."
- .format(cmd=submenu.__class__.__name__, attr=attr))
-
- self.create_subclass = create_subclass
- self.preserve_shares = preserve_shares
-
- def _get_original_attributes(self):
- return {
- attr: getattr(self.submenu, attr, AddSubmenu._Nonexistent)
- for attr in self.shared_attributes.keys()
- }
-
- def _copy_in_shared_attrs(self, parent_cmd):
- for sub_attr, par_attr in self.shared_attributes.items():
- setattr(self.submenu, sub_attr, getattr(parent_cmd, par_attr))
-
- def _copy_out_shared_attrs(self, parent_cmd, original_attributes):
- if self.preserve_shares:
- for sub_attr, par_attr in self.shared_attributes.items():
- setattr(parent_cmd, par_attr, getattr(self.submenu, sub_attr))
- else:
- for attr, value in original_attributes.items():
- if attr is not AddSubmenu._Nonexistent:
- setattr(self.submenu, attr, value)
- else:
- delattr(self.submenu, attr)
-
- def __call__(self, cmd_obj):
- """Creates a subclass of Cmd wherein the given submenu can be accessed via the given command"""
- def enter_submenu(parent_cmd, line):
- """
- This function will be bound to do_<submenu> and will change the scope of the CLI to that of the
- submenu.
- """
- submenu = self.submenu
- original_attributes = self._get_original_attributes()
- history = _pop_readline_history()
-
- if self.persistent_history_file:
- try:
- readline.read_history_file(self.persistent_history_file)
- except FileNotFoundError:
- pass
-
- try:
- # copy over any shared attributes
- self._copy_in_shared_attrs(parent_cmd)
-
- if line.parsed.args:
- # Remove the menu argument and execute the command in the submenu
- line = submenu.parser_manager.parsed(line.parsed.args)
- submenu.precmd(line)
- ret = submenu.onecmd(line)
- submenu.postcmd(ret, line)
- else:
- if self.reformat_prompt is not None:
- prompt = submenu.prompt
- submenu.prompt = self.reformat_prompt.format(
- super_prompt=parent_cmd.prompt,
- command=self.command,
- sub_prompt=prompt,
- )
- submenu.cmdloop()
- if self.reformat_prompt is not None:
- # noinspection PyUnboundLocalVariable
- self.submenu.prompt = prompt
- finally:
- # copy back original attributes
- self._copy_out_shared_attrs(parent_cmd, original_attributes)
-
- # write submenu history
- if self.persistent_history_file:
- readline.write_history_file(self.persistent_history_file)
- # reset main app history before exit
- _push_readline_history(history)
-
- def complete_submenu(_self, text, line, begidx, endidx):
- """
- This function will be bound to complete_<submenu> and will perform the complete commands of the submenu.
- """
- submenu = self.submenu
- original_attributes = self._get_original_attributes()
- try:
- # copy over any shared attributes
- self._copy_in_shared_attrs(_self)
- return _complete_from_cmd(submenu, text, line, begidx, endidx)
- finally:
- # copy back original attributes
- self._copy_out_shared_attrs(_self, original_attributes)
-
- original_do_help = cmd_obj.do_help
- original_complete_help = cmd_obj.complete_help
-
- def help_submenu(_self, line):
- """
- This function will be bound to help_<submenu> and will call the help commands of the submenu.
- """
- tokens = line.split(None, 1)
- if tokens and (tokens[0] == self.command or tokens[0] in self.aliases):
- self.submenu.do_help(tokens[1] if len(tokens) == 2 else '')
- else:
- original_do_help(_self, line)
-
- def _complete_submenu_help(_self, text, line, begidx, endidx):
- """autocomplete to match help_submenu()'s behavior"""
- tokens = line.split(None, 1)
- if len(tokens) == 2 and (
- not (not tokens[1].startswith(self.command) and not any(
- tokens[1].startswith(alias) for alias in self.aliases))
- ):
- return self.submenu.complete_help(
- text,
- tokens[1],
- begidx - line.index(tokens[1]),
- endidx - line.index(tokens[1]),
- )
- else:
- return original_complete_help(_self, text, line, begidx, endidx)
-
- if self.create_subclass:
- class _Cmd(cmd_obj):
- do_help = help_submenu
- complete_help = _complete_submenu_help
- else:
- _Cmd = cmd_obj
- _Cmd.do_help = help_submenu
- _Cmd.complete_help = _complete_submenu_help
-
- # Create bindings in the parent command to the submenus commands.
- setattr(_Cmd, 'do_' + self.command, enter_submenu)
- setattr(_Cmd, 'complete_' + self.command, complete_submenu)
-
- # Create additional bindings for aliases
- for _alias in self.aliases:
- setattr(_Cmd, 'do_' + _alias, enter_submenu)
- setattr(_Cmd, 'complete_' + _alias, complete_submenu)
- return _Cmd
-
-
-class Cmd(cmd.Cmd):
- """An easy but powerful framework for writing line-oriented command interpreters.
-
- Extends the Python Standard Library’s cmd package by adding a lot of useful features
- to the out of the box configuration.
-
- Line-oriented command interpreters are often useful for test harnesses, internal tools, and rapid prototypes.
- """
- # Attributes used to configure the ParserManager (all are not dynamically settable at runtime)
- blankLinesAllowed = False
- commentGrammars = pyparsing.Or([pyparsing.pythonStyleComment, pyparsing.cStyleComment])
- commentInProgress = pyparsing.Literal('/*') + pyparsing.SkipTo(pyparsing.stringEnd ^ '*/')
- legalChars = u'!#$%.:?@_-' + pyparsing.alphanums + pyparsing.alphas8bit
- multilineCommands = []
- prefixParser = pyparsing.Empty()
- redirector = '>' # for sending output to file
- shortcuts = {'?': 'help', '!': 'shell', '@': 'load', '@@': '_relative_load'}
- aliases = dict()
- terminators = [';'] # make sure your terminators are not in legalChars!
-
- # Attributes which are NOT dynamically settable at runtime
- allow_cli_args = True # Should arguments passed on the command-line be processed as commands?
- allow_redirection = True # Should output redirection and pipes be allowed
- default_to_shell = False # Attempt to run unrecognized commands as shell commands
- quit_on_sigint = False # Quit the loop on interrupt instead of just resetting prompt
- reserved_words = []
-
- # Attributes which ARE dynamically settable at runtime
- colors = (platform.system() != 'Windows')
- continuation_prompt = '> '
- debug = False
- echo = False
- editor = os.environ.get('EDITOR')
- if not editor:
- if sys.platform[:3] == 'win':
- editor = 'notepad'
- else:
- # Favor command-line editors first so we don't leave the terminal to edit
- for editor in ['vim', 'vi', 'emacs', 'nano', 'pico', 'gedit', 'kate', 'subl', 'geany', 'atom']:
- if _which(editor):
- break
- feedback_to_output = False # Do not include nonessentials in >, | output by default (things like timing)
- locals_in_py = True
- quiet = False # Do not suppress nonessential output
- timing = False # Prints elapsed time for each command
-
- # To make an attribute settable with the "do_set" command, add it to this ...
- # This starts out as a dictionary but gets converted to an OrderedDict sorted alphabetically by key
- settable = {'colors': 'Colorized output (*nix only)',
- 'continuation_prompt': 'On 2nd+ line of input',
- 'debug': 'Show full error stack on error',
- 'echo': 'Echo command issued into output',
- 'editor': 'Program used by ``edit``',
- 'feedback_to_output': 'Include nonessentials in `|`, `>` results',
- 'locals_in_py': 'Allow access to your application in py via self',
- 'prompt': 'The prompt issued to solicit input',
- 'quiet': "Don't print nonessential feedback",
- 'timing': 'Report execution times'}
-
- def __init__(self, completekey='tab', stdin=None, stdout=None, persistent_history_file='',
- persistent_history_length=1000, startup_script=None, use_ipython=False, transcript_files=None):
- """An easy but powerful framework for writing line-oriented command interpreters, extends Python's cmd package.
-
- :param completekey: str - (optional) readline name of a completion key, default to Tab
- :param stdin: (optional) alternate input file object, if not specified, sys.stdin is used
- :param stdout: (optional) alternate output file object, if not specified, sys.stdout is used
- :param persistent_history_file: str - (optional) file path to load a persistent readline history from
- :param persistent_history_length: int - (optional) max number of lines which will be written to the history file
- :param startup_script: str - (optional) file path to a a script to load and execute at startup
- :param use_ipython: (optional) should the "ipy" command be included for an embedded IPython shell
- :param transcript_files: str - (optional) allows running transcript tests when allow_cli_args is False
- """
- # If use_ipython is False, make sure the do_ipy() method doesn't exit
- if not use_ipython:
- try:
- del Cmd.do_ipy
- except AttributeError:
- pass
-
- # If persistent readline history is enabled, then read history from file and register to write to file at exit
- if persistent_history_file:
- persistent_history_file = os.path.expanduser(persistent_history_file)
- try:
- readline.read_history_file(persistent_history_file)
- # default history len is -1 (infinite), which may grow unruly
- readline.set_history_length(persistent_history_length)
- except FileNotFoundError:
- pass
- atexit.register(readline.write_history_file, persistent_history_file)
-
- # Call super class constructor
- super().__init__(completekey=completekey, stdin=stdin, stdout=stdout)
-
- # Commands to exclude from the help menu and tab completion
- self.hidden_commands = ['eof', 'eos', '_relative_load']
-
- # Commands to exclude from the history command
- self.exclude_from_history = '''history edit eof eos'''.split()
-
- self._finalize_app_parameters()
-
- self.initial_stdout = sys.stdout
- self.history = History()
- self.pystate = {}
- self.keywords = self.reserved_words + [fname[3:] for fname in dir(self) if fname.startswith('do_')]
- self.parser_manager = ParserManager(redirector=self.redirector, terminators=self.terminators,
- multilineCommands=self.multilineCommands,
- legalChars=self.legalChars, commentGrammars=self.commentGrammars,
- commentInProgress=self.commentInProgress,
- blankLinesAllowed=self.blankLinesAllowed, prefixParser=self.prefixParser,
- preparse=self.preparse, postparse=self.postparse, aliases=self.aliases,
- shortcuts=self.shortcuts)
- self._transcript_files = transcript_files
-
- # Used to enable the ability for a Python script to quit the application
- self._should_quit = False
-
- # True if running inside a Python script or interactive console, False otherwise
- self._in_py = False
-
- # Stores results from the last command run to enable usage of results in a Python script or interactive console
- # Built-in commands don't make use of this. It is purely there for user-defined commands and convenience.
- self._last_result = None
-
- # Used to save state during a redirection
- self.kept_state = None
- self.kept_sys = None
-
- # Codes used for exit conditions
- self._STOP_AND_EXIT = True # cmd convention
-
- self._colorcodes = {'bold': {True: '\x1b[1m', False: '\x1b[22m'},
- 'cyan': {True: '\x1b[36m', False: '\x1b[39m'},
- 'blue': {True: '\x1b[34m', False: '\x1b[39m'},
- 'red': {True: '\x1b[31m', False: '\x1b[39m'},
- 'magenta': {True: '\x1b[35m', False: '\x1b[39m'},
- 'green': {True: '\x1b[32m', False: '\x1b[39m'},
- 'underline': {True: '\x1b[4m', False: '\x1b[24m'},
- 'yellow': {True: '\x1b[33m', False: '\x1b[39m'}}
-
- # Used load command to store the current script dir as a LIFO queue to support _relative_load command
- self._script_dir = []
-
- # Used when piping command output to a shell command
- self.pipe_proc = None
-
- # Used by complete() for readline tab completion
- self.completion_matches = []
-
- # Used to keep track of whether we are redirecting or piping output
- self.redirecting = False
-
- # If this string is non-empty, then this warning message will print if a broken pipe error occurs while printing
- self.broken_pipe_warning = ''
-
- # If a startup script is provided, then add it in the queue to load
- if startup_script is not None:
- startup_script = os.path.expanduser(startup_script)
- if os.path.exists(startup_script) and os.path.getsize(startup_script) > 0:
- self.cmdqueue.append('load {}'.format(startup_script))
-
- ############################################################################################################
- # The following variables are used by tab-completion functions. They are reset each time complete() is run
- # using set_completion_defaults() and it is up to completer functions to set them before returning results.
- ############################################################################################################
-
- # If true and a single match is returned to complete(), then a space will be appended
- # if the match appears at the end of the line
- self.allow_appended_space = True
-
- # If true and a single match is returned to complete(), then a closing quote
- # will be added if there is an unmatched opening quote
- self.allow_closing_quote = True
-
- # Use this list if you are completing strings that contain a common delimiter and you only want to
- # display the final portion of the matches as the tab-completion suggestions. The full matches
- # still must be returned from your completer function. For an example, look at path_complete()
- # which uses this to show only the basename of paths as the suggestions. delimiter_complete() also
- # populates this list.
- self.display_matches = []
-
- # ----- Methods related to presenting output to the user -----
-
- @property
- def visible_prompt(self):
- """Read-only property to get the visible prompt with any ANSI escape codes stripped.
-
- Used by transcript testing to make it easier and more reliable when users are doing things like coloring the
- prompt using ANSI color codes.
-
- :return: str - prompt stripped of any ANSI escape codes
- """
- return strip_ansi(self.prompt)
-
- def _finalize_app_parameters(self):
- self.commentGrammars.ignore(pyparsing.quotedString).setParseAction(lambda x: '')
- # noinspection PyUnresolvedReferences
- self.shortcuts = sorted(self.shortcuts.items(), reverse=True)
-
- # Make sure settable parameters are sorted alphabetically by key
- self.settable = collections.OrderedDict(sorted(self.settable.items(), key=lambda t: t[0]))
-
- def poutput(self, msg, end='\n'):
- """Convenient shortcut for self.stdout.write(); by default adds newline to end if not already present.
-
- Also handles BrokenPipeError exceptions for when a commands's output has been piped to another process and
- that process terminates before the cmd2 command is finished executing.
-
- :param msg: str - message to print to current stdout - anything convertible to a str with '{}'.format() is OK
- :param end: str - string appended after the end of the message if not already present, default a newline
- """
- if msg is not None and msg != '':
- try:
- msg_str = '{}'.format(msg)
- self.stdout.write(msg_str)
- if not msg_str.endswith(end):
- self.stdout.write(end)
- except BrokenPipeError:
- # This occurs if a command's output is being piped to another process and that process closes before the
- # command is finished. If you would like your application to print a warning message, then set the
- # broken_pipe_warning attribute to the message you want printed.
- if self.broken_pipe_warning:
- sys.stderr.write(self.broken_pipe_warning)
-
- def perror(self, errmsg, exception_type=None, traceback_war=True):
- """ Print error message to sys.stderr and if debug is true, print an exception Traceback if one exists.
-
- :param errmsg: str - error message to print out
- :param exception_type: str - (optional) type of exception which precipitated this error message
- :param traceback_war: bool - (optional) if True, print a message to let user know they can enable debug
- :return:
- """
- if self.debug:
- traceback.print_exc()
-
- if exception_type is None:
- err = self.colorize("ERROR: {}\n".format(errmsg), 'red')
- sys.stderr.write(err)
- else:
- err = "EXCEPTION of type '{}' occurred with message: '{}'\n".format(exception_type, errmsg)
- sys.stderr.write(self.colorize(err, 'red'))
-
- if traceback_war:
- war = "To enable full traceback, run the following command: 'set debug true'\n"
- sys.stderr.write(self.colorize(war, 'yellow'))
-
- def pfeedback(self, msg):
- """For printing nonessential feedback. Can be silenced with `quiet`.
- Inclusion in redirected output is controlled by `feedback_to_output`."""
- if not self.quiet:
- if self.feedback_to_output:
- self.poutput(msg)
- else:
- sys.stderr.write("{}\n".format(msg))
-
- def ppaged(self, msg, end='\n'):
- """Print output using a pager if it would go off screen and stdout isn't currently being redirected.
-
- Never uses a pager inside of a script (Python or text) or when output is being redirected or piped or when
- stdout or stdin are not a fully functional terminal.
-
- :param msg: str - message to print to current stdout - anything convertible to a str with '{}'.format() is OK
- :param end: str - string appended after the end of the message if not already present, default a newline
- """
- if msg is not None and msg != '':
- try:
- msg_str = '{}'.format(msg)
- if not msg_str.endswith(end):
- msg_str += end
-
- # Attempt to detect if we are not running within a fully functional terminal.
- # Don't try to use the pager when being run by a continuous integration system like Jenkins + pexpect.
- functional_terminal = False
-
- if self.stdin.isatty() and self.stdout.isatty():
- if sys.platform.startswith('win') or os.environ.get('TERM') is not None:
- functional_terminal = True
-
- # Don't attempt to use a pager that can block if redirecting or running a script (either text or Python)
- # Also only attempt to use a pager if actually running in a real fully functional terminal
- if functional_terminal and not self.redirecting and not self._in_py and not self._script_dir:
-
- if sys.platform.startswith('win'):
- pager_cmd = 'more'
- else:
- # Here is the meaning of the various flags we are using with the less command:
- # -S causes lines longer than the screen width to be chopped (truncated) rather than wrapped
- # -R causes ANSI "color" escape sequences to be output in raw form (i.e. colors are displayed)
- # -X disables sending the termcap initialization and deinitialization strings to the terminal
- # -F causes less to automatically exit if the entire file can be displayed on the first screen
- pager_cmd = 'less -SRXF'
- self.pipe_proc = subprocess.Popen(pager_cmd, shell=True, stdin=subprocess.PIPE)
- try:
- self.pipe_proc.stdin.write(msg_str.encode('utf-8', 'replace'))
- self.pipe_proc.stdin.close()
- except (IOError, KeyboardInterrupt):
- pass
-
- # Less doesn't respect ^C, but catches it for its own UI purposes (aborting search etc. inside less)
- while True:
- try:
- self.pipe_proc.wait()
- except KeyboardInterrupt:
- pass
- else:
- break
- self.pipe_proc = None
- else:
- self.stdout.write(msg_str)
- except BrokenPipeError:
- # This occurs if a command's output is being piped to another process and that process closes before the
- # command is finished. If you would like your application to print a warning message, then set the
- # broken_pipe_warning attribute to the message you want printed.
- if self.broken_pipe_warning:
- sys.stderr.write(self.broken_pipe_warning)
-
- def colorize(self, val, color):
- """Given a string (``val``), returns that string wrapped in UNIX-style
- special characters that turn on (and then off) text color and style.
- If the ``colors`` environment parameter is ``False``, or the application
- is running on Windows, will return ``val`` unchanged.
- ``color`` should be one of the supported strings (or styles):
- red/blue/green/cyan/magenta, bold, underline"""
- if self.colors and (self.stdout == self.initial_stdout):
- return self._colorcodes[color][True] + val + self._colorcodes[color][False]
- return val
-
- def get_subcommands(self, command):
- """
- Returns a list of a command's subcommand names if they exist
- :param command: the command we are querying
- :return: A subcommand list or None
- """
-
- subcommand_names = None
-
- # Check if is a valid command
- funcname = self._func_named(command)
-
- if funcname:
- # Check to see if this function was decorated with an argparse ArgumentParser
- func = getattr(self, funcname)
- subcommands = func.__dict__.get('subcommands', None)
- if subcommands is not None:
- subcommand_names = subcommands.keys()
-
- return subcommand_names
-
- def get_subcommand_completer(self, command, subcommand):
- """
- Returns a subcommand's tab completion function if one exists
- :param command: command which owns the subcommand
- :param subcommand: the subcommand we are querying
- :return: A completer or None
- """
-
- completer = None
-
- # Check if is a valid command
- funcname = self._func_named(command)
-
- if funcname:
- # Check to see if this function was decorated with an argparse ArgumentParser
- func = getattr(self, funcname)
- subcommands = func.__dict__.get('subcommands', None)
- if subcommands is not None:
- completer = subcommands[subcommand]
-
- return completer
-
- # ----- Methods related to tab completion -----
-
- def set_completion_defaults(self):
- """
- Resets tab completion settings
- Needs to be called each time readline runs tab completion
- """
- self.allow_appended_space = True
- self.allow_closing_quote = True
- self.display_matches = []
-
- def tokens_for_completion(self, line, begidx, endidx):
- """
- Used by tab completion functions to get all tokens through the one being completed
- :param line: str - the current input line with leading whitespace removed
- :param begidx: int - the beginning index of the prefix text
- :param endidx: int - the ending index of the prefix text
- :return: A 2 item tuple where the items are
- On Success
- tokens: list of unquoted tokens
- this is generally the list needed for tab completion functions
- raw_tokens: list of tokens with any quotes preserved
- this can be used to know if a token was quoted or is missing a closing quote
-
- Both lists are guaranteed to have at least 1 item
- The last item in both lists is the token being tab completed
-
- On Failure
- Both items are None
- """
- unclosed_quote = ''
- quotes_to_try = copy.copy(QUOTES)
-
- tmp_line = line[:endidx]
- tmp_endidx = endidx
-
- # Parse the line into tokens
- while True:
- try:
- # Use non-POSIX parsing to keep the quotes around the tokens
- initial_tokens = shlex.split(tmp_line[:tmp_endidx], posix=False)
-
- # If the cursor is at an empty token outside of a quoted string,
- # then that is the token being completed. Add it to the list.
- if not unclosed_quote and begidx == tmp_endidx:
- initial_tokens.append('')
- break
- except ValueError:
- # ValueError can be caused by missing closing quote
- if not quotes_to_try:
- # Since we have no more quotes to try, something else
- # is causing the parsing error. Return None since
- # this means the line is malformed.
- return None, None
-
- # Add a closing quote and try to parse again
- unclosed_quote = quotes_to_try[0]
- quotes_to_try = quotes_to_try[1:]
-
- tmp_line = line[:endidx]
- tmp_line += unclosed_quote
- tmp_endidx = endidx + 1
-
- if self.allow_redirection:
-
- # Since redirection is enabled, we need to treat redirection characters (|, <, >)
- # as word breaks when they are in unquoted strings. Go through each token
- # and further split them on these characters. Each run of redirect characters
- # is treated as a single token.
- raw_tokens = []
-
- for cur_initial_token in initial_tokens:
-
- # Save tokens up to 1 character in length or quoted tokens. No need to parse these.
- if len(cur_initial_token) <= 1 or cur_initial_token[0] in QUOTES:
- raw_tokens.append(cur_initial_token)
- continue
-
- # Iterate over each character in this token
- cur_index = 0
- cur_char = cur_initial_token[cur_index]
-
- # Keep track of the token we are building
- cur_raw_token = ''
-
- while True:
- if cur_char not in REDIRECTION_CHARS:
-
- # Keep appending to cur_raw_token until we hit a redirect char
- while cur_char not in REDIRECTION_CHARS:
- cur_raw_token += cur_char
- cur_index += 1
- if cur_index < len(cur_initial_token):
- cur_char = cur_initial_token[cur_index]
- else:
- break
-
- else:
- redirect_char = cur_char
-
- # Keep appending to cur_raw_token until we hit something other than redirect_char
- while cur_char == redirect_char:
- cur_raw_token += cur_char
- cur_index += 1
- if cur_index < len(cur_initial_token):
- cur_char = cur_initial_token[cur_index]
- else:
- break
-
- # Save the current token
- raw_tokens.append(cur_raw_token)
- cur_raw_token = ''
-
- # Check if we've viewed all characters
- if cur_index >= len(cur_initial_token):
- break
- else:
- raw_tokens = initial_tokens
-
- # Save the unquoted tokens
- tokens = [strip_quotes(cur_token) for cur_token in raw_tokens]
-
- # If the token being completed had an unclosed quote, we need
- # to remove the closing quote that was added in order for it
- # to match what was on the command line.
- if unclosed_quote:
- raw_tokens[-1] = raw_tokens[-1][:-1]
-
- return tokens, raw_tokens
-
- # noinspection PyUnusedLocal
- @staticmethod
- def basic_complete(text, line, begidx, endidx, match_against):
- """
- Performs tab completion against a list
-
- :param text: str - the string prefix we are attempting to match (all returned matches must begin with it)
- :param line: str - the current input line with leading whitespace removed
- :param begidx: int - the beginning index of the prefix text
- :param endidx: int - the ending index of the prefix text
- :param match_against: Collection - the list being matched against
- :return: List[str] - a list of possible tab completions
- """
- return [cur_match for cur_match in match_against if cur_match.startswith(text)]
-
- def delimiter_complete(self, text, line, begidx, endidx, match_against, delimiter):
- """
- Performs tab completion against a list but each match is split on a delimiter and only
- the portion of the match being tab completed is shown as the completion suggestions.
- This is useful if you match against strings that are hierarchical in nature and have a
- common delimiter.
-
- An easy way to illustrate this concept is path completion since paths are just directories/files
- delimited by a slash. If you are tab completing items in /home/user you don't get the following
- as suggestions:
-
- /home/user/file.txt /home/user/program.c
- /home/user/maps/ /home/user/cmd2.py
-
- Instead you are shown:
-
- file.txt program.c
- maps/ cmd2.py
-
- For a large set of data, this can be visually more pleasing and easier to search.
-
- Another example would be strings formatted with the following syntax: company::department::name
- In this case the delimiter would be :: and the user could easily narrow down what they are looking
- for if they were only shown suggestions in the category they are at in the string.
-
- :param text: str - the string prefix we are attempting to match (all returned matches must begin with it)
- :param line: str - the current input line with leading whitespace removed
- :param begidx: int - the beginning index of the prefix text
- :param endidx: int - the ending index of the prefix text
- :param match_against: Collection - the list being matched against
- :param delimiter: str - what delimits each portion of the matches (ex: paths are delimited by a slash)
- :return: List[str] - a list of possible tab completions
- """
- matches = self.basic_complete(text, line, begidx, endidx, match_against)
-
- # Display only the portion of the match that's being completed based on delimiter
- if matches:
-
- # Get the common beginning for the matches
- common_prefix = os.path.commonprefix(matches)
- prefix_tokens = common_prefix.split(delimiter)
-
- # Calculate what portion of the match we are completing
- display_token_index = 0
- if prefix_tokens:
- display_token_index = len(prefix_tokens) - 1
-
- # Get this portion for each match and store them in self.display_matches
- for cur_match in matches:
- match_tokens = cur_match.split(delimiter)
- display_token = match_tokens[display_token_index]
-
- if not display_token:
- display_token = delimiter
- self.display_matches.append(display_token)
-
- return matches
-
- def flag_based_complete(self, text, line, begidx, endidx, flag_dict, all_else=None):
- """
- Tab completes based on a particular flag preceding the token being completed
- :param text: str - the string prefix we are attempting to match (all returned matches must begin with it)
- :param line: str - the current input line with leading whitespace removed
- :param begidx: int - the beginning index of the prefix text
- :param endidx: int - the ending index of the prefix text
- :param flag_dict: dict - dictionary whose structure is the following:
- keys - flags (ex: -c, --create) that result in tab completion for the next
- argument in the command line
- values - there are two types of values
- 1. iterable list of strings to match against (dictionaries, lists, etc.)
- 2. function that performs tab completion (ex: path_complete)
- :param all_else: Collection or function - an optional parameter for tab completing any token that isn't preceded
- by a flag in flag_dict
- :return: List[str] - a list of possible tab completions
- """
- # Get all tokens through the one being completed
- tokens, _ = self.tokens_for_completion(line, begidx, endidx)
- if tokens is None:
- return []
-
- completions_matches = []
- match_against = all_else
-
- # Must have at least 2 args for a flag to precede the token being completed
- if len(tokens) > 1:
- flag = tokens[-2]
- if flag in flag_dict:
- match_against = flag_dict[flag]
-
- # Perform tab completion using a Collection
- if isinstance(match_against, Collection):
- completions_matches = self.basic_complete(text, line, begidx, endidx, match_against)
-
- # Perform tab completion using a function
- elif callable(match_against):
- completions_matches = match_against(text, line, begidx, endidx)
-
- return completions_matches
-
- def index_based_complete(self, text, line, begidx, endidx, index_dict, all_else=None):
- """
- Tab completes based on a fixed position in the input string
- :param text: str - the string prefix we are attempting to match (all returned matches must begin with it)
- :param line: str - the current input line with leading whitespace removed
- :param begidx: int - the beginning index of the prefix text
- :param endidx: int - the ending index of the prefix text
- :param index_dict: dict - dictionary whose structure is the following:
- keys - 0-based token indexes into command line that determine which tokens
- perform tab completion
- values - there are two types of values
- 1. iterable list of strings to match against (dictionaries, lists, etc.)
- 2. function that performs tab completion (ex: path_complete)
- :param all_else: Collection or function - an optional parameter for tab completing any token that isn't at an
- index in index_dict
- :return: List[str] - a list of possible tab completions
- """
- # Get all tokens through the one being completed
- tokens, _ = self.tokens_for_completion(line, begidx, endidx)
- if tokens is None:
- return []
-
- matches = []
-
- # Get the index of the token being completed
- index = len(tokens) - 1
-
- # Check if token is at an index in the dictionary
- if index in index_dict:
- match_against = index_dict[index]
- else:
- match_against = all_else
-
- # Perform tab completion using a Collection
- if isinstance(match_against, Collection):
- matches = self.basic_complete(text, line, begidx, endidx, match_against)
-
- # Perform tab completion using a function
- elif callable(match_against):
- matches = match_against(text, line, begidx, endidx)
-
- return matches
-
- # noinspection PyUnusedLocal
- def path_complete(self, text, line, begidx, endidx, dir_exe_only=False, dir_only=False):
- """Performs completion of local file system paths
-
- :param text: str - the string prefix we are attempting to match (all returned matches must begin with it)
- :param line: str - the current input line with leading whitespace removed
- :param begidx: int - the beginning index of the prefix text
- :param endidx: int - the ending index of the prefix text
- :param dir_exe_only: bool - only return directories and executables, not non-executable files
- :param dir_only: bool - only return directories
- :return: List[str] - a list of possible tab completions
- """
-
- # Used to complete ~ and ~user strings
- def complete_users():
-
- # We are returning ~user strings that resolve to directories,
- # so don't append a space or quote in the case of a single result.
- self.allow_appended_space = False
- self.allow_closing_quote = False
-
- users = []
-
- # Windows lacks the pwd module so we can't get a list of users.
- # Instead we will add a slash once the user enters text that
- # resolves to an existing home directory.
- if sys.platform.startswith('win'):
- expanded_path = os.path.expanduser(text)
- if os.path.isdir(expanded_path):
- users.append(text + os.path.sep)
- else:
- import pwd
-
- # Iterate through a list of users from the password database
- for cur_pw in pwd.getpwall():
-
- # Check if the user has an existing home dir
- if os.path.isdir(cur_pw.pw_dir):
-
- # Add a ~ to the user to match against text
- cur_user = '~' + cur_pw.pw_name
- if cur_user.startswith(text):
- if add_trailing_sep_if_dir:
- cur_user += os.path.sep
- users.append(cur_user)
-
- return users
-
- # Determine if a trailing separator should be appended to directory completions
- add_trailing_sep_if_dir = False
- if endidx == len(line) or (endidx < len(line) and line[endidx] != os.path.sep):
- add_trailing_sep_if_dir = True
-
- # Used to replace cwd in the final results
- cwd = os.getcwd()
- cwd_added = False
-
- # Used to replace expanded user path in final result
- orig_tilde_path = ''
- expanded_tilde_path = ''
-
- # If the search text is blank, then search in the CWD for *
- if not text:
- search_str = os.path.join(os.getcwd(), '*')
- cwd_added = True
- else:
- # Purposely don't match any path containing wildcards - what we are doing is complicated enough!
- wildcards = ['*', '?']
- for wildcard in wildcards:
- if wildcard in text:
- return []
-
- # Start the search string
- search_str = text + '*'
-
- # Handle tilde expansion and completion
- if text.startswith('~'):
- sep_index = text.find(os.path.sep, 1)
-
- # If there is no slash, then the user is still completing the user after the tilde
- if sep_index == -1:
- return complete_users()
-
- # Otherwise expand the user dir
- else:
- search_str = os.path.expanduser(search_str)
-
- # Get what we need to restore the original tilde path later
- orig_tilde_path = text[:sep_index]
- expanded_tilde_path = os.path.expanduser(orig_tilde_path)
-
- # If the search text does not have a directory, then use the cwd
- elif not os.path.dirname(text):
- search_str = os.path.join(os.getcwd(), search_str)
- cwd_added = True
-
- # Find all matching path completions
- matches = glob.glob(search_str)
-
- # Filter based on type
- if dir_exe_only:
- matches = [c for c in matches if os.path.isdir(c) or os.access(c, os.X_OK)]
- elif dir_only:
- matches = [c for c in matches if os.path.isdir(c)]
-
- # Don't append a space or closing quote to directory
- if len(matches) == 1 and os.path.isdir(matches[0]):
- self.allow_appended_space = False
- self.allow_closing_quote = False
-
- # Build display_matches and add a slash to directories
- for index, cur_match in enumerate(matches):
-
- # Display only the basename of this path in the tab-completion suggestions
- self.display_matches.append(os.path.basename(cur_match))
-
- # Add a separator after directories if the next character isn't already a separator
- if os.path.isdir(cur_match) and add_trailing_sep_if_dir:
- matches[index] += os.path.sep
- self.display_matches[index] += os.path.sep
-
- # Remove cwd if it was added to match the text readline expects
- if cwd_added:
- matches = [cur_path.replace(cwd + os.path.sep, '', 1) for cur_path in matches]
-
- # Restore the tilde string if we expanded one to match the text readline expects
- if expanded_tilde_path:
- matches = [cur_path.replace(expanded_tilde_path, orig_tilde_path, 1) for cur_path in matches]
-
- return matches
-
- @staticmethod
- def get_exes_in_path(starts_with):
- """
- Returns names of executables in a user's path
- :param starts_with: str - what the exes should start with. leave blank for all exes in path.
- :return: List[str] - a list of matching exe names
- """
- # Purposely don't match any executable containing wildcards
- wildcards = ['*', '?']
- for wildcard in wildcards:
- if wildcard in starts_with:
- return []
-
- # Get a list of every directory in the PATH environment variable and ignore symbolic links
- paths = [p for p in os.getenv('PATH').split(os.path.pathsep) if not os.path.islink(p)]
-
- # Use a set to store exe names since there can be duplicates
- exes_set = set()
-
- # Find every executable file in the user's path that matches the pattern
- for path in paths:
- full_path = os.path.join(path, starts_with)
- matches = [f for f in glob.glob(full_path + '*') if os.path.isfile(f) and os.access(f, os.X_OK)]
-
- for match in matches:
- exes_set.add(os.path.basename(match))
-
- return list(exes_set)
-
- def shell_cmd_complete(self, text, line, begidx, endidx, complete_blank=False):
- """Performs completion of executables either in a user's path or a given path
- :param text: str - the string prefix we are attempting to match (all returned matches must begin with it)
- :param line: str - the current input line with leading whitespace removed
- :param begidx: int - the beginning index of the prefix text
- :param endidx: int - the ending index of the prefix text
- :param complete_blank: bool - If True, then a blank will complete all shell commands in a user's path
- If False, then no completion is performed
- Defaults to False to match Bash shell behavior
- :return: List[str] - a list of possible tab completions
- """
- # Don't tab complete anything if no shell command has been started
- if not complete_blank and not text:
- return []
-
- # If there are no path characters in the search text, then do shell command completion in the user's path
- if not text.startswith('~') and os.path.sep not in text:
- return self.get_exes_in_path(text)
-
- # Otherwise look for executables in the given path
- else:
- return self.path_complete(text, line, begidx, endidx, dir_exe_only=True)
-
- def _redirect_complete(self, text, line, begidx, endidx, compfunc):
- """
- Called by complete() as the first tab completion function for all commands
- It determines if it should tab complete for redirection (|, <, >, >>) or use the
- completer function for the current command
-
- :param text: str - the string prefix we are attempting to match (all returned matches must begin with it)
- :param line: str - the current input line with leading whitespace removed
- :param begidx: int - the beginning index of the prefix text
- :param endidx: int - the ending index of the prefix text
- :param compfunc: Callable - the completer function for the current command
- this will be called if we aren't completing for redirection
- :return: List[str] - a list of possible tab completions
- """
- if self.allow_redirection:
-
- # Get all tokens through the one being completed. We want the raw tokens
- # so we can tell if redirection strings are quoted and ignore them.
- _, raw_tokens = self.tokens_for_completion(line, begidx, endidx)
- if raw_tokens is None:
- return []
-
- if len(raw_tokens) > 1:
-
- # Build a list of all redirection tokens
- all_redirects = REDIRECTION_CHARS + ['>>']
-
- # Check if there are redirection strings prior to the token being completed
- seen_pipe = False
- has_redirection = False
-
- for cur_token in raw_tokens[:-1]:
- if cur_token in all_redirects:
- has_redirection = True
-
- if cur_token == '|':
- seen_pipe = True
-
- # Get token prior to the one being completed
- prior_token = raw_tokens[-2]
-
- # If a pipe is right before the token being completed, complete a shell command as the piped process
- if prior_token == '|':
- return self.shell_cmd_complete(text, line, begidx, endidx)
-
- # Otherwise do path completion either as files to redirectors or arguments to the piped process
- elif prior_token in all_redirects or seen_pipe:
- return self.path_complete(text, line, begidx, endidx)
-
- # If there were redirection strings anywhere on the command line, then we
- # are no longer tab completing for the current command
- elif has_redirection:
- return []
-
- # Call the command's completer function
- return compfunc(text, line, begidx, endidx)
-
- @staticmethod
- def _pad_matches_to_display(matches_to_display):
- """
- Adds padding to the matches being displayed as tab completion suggestions.
- The default padding of readline/pyreadine is small and not visually appealing
- especially if matches have spaces. It appears very squished together.
-
- :param matches_to_display: the matches being padded
- :return: the padded matches and length of padding that was added
- """
- if rl_type == RlType.GNU:
- # Add 2 to the padding of 2 that readline uses for a total of 4.
- padding = 2 * ' '
-
- elif rl_type == RlType.PYREADLINE:
- # Add 3 to the padding of 1 that pyreadline uses for a total of 4.
- padding = 3 * ' '
-
- else:
- return matches_to_display, 0
-
- return [cur_match + padding for cur_match in matches_to_display], len(padding)
-
- def _display_matches_gnu_readline(self, substitution, matches, longest_match_length):
- """
- Prints a match list using GNU readline's rl_display_match_list()
- This exists to print self.display_matches if it has data. Otherwise matches prints.
-
- :param substitution: str - the substitution written to the command line
- :param matches: list[str] - the tab completion matches to display
- :param longest_match_length: int - longest printed length of the matches
- """
- if rl_type == RlType.GNU:
-
- # Check if we should show display_matches
- if self.display_matches:
- matches_to_display = self.display_matches
-
- # Recalculate longest_match_length for display_matches
- longest_match_length = 0
-
- for cur_match in matches_to_display:
- cur_length = wcswidth(cur_match)
- if cur_length > longest_match_length:
- longest_match_length = cur_length
- else:
- matches_to_display = matches
-
- # Add padding for visual appeal
- matches_to_display, padding_length = self._pad_matches_to_display(matches_to_display)
- longest_match_length += padding_length
-
- # We will use readline's display function (rl_display_match_list()), so we
- # need to encode our string as bytes to place in a C array.
- encoded_substitution = bytes(substitution, encoding='utf-8')
- encoded_matches = [bytes(cur_match, encoding='utf-8') for cur_match in matches_to_display]
-
- # rl_display_match_list() expects matches to be in argv format where
- # substitution is the first element, followed by the matches, and then a NULL.
- # noinspection PyCallingNonCallable,PyTypeChecker
- strings_array = (ctypes.c_char_p * (1 + len(encoded_matches) + 1))()
-
- # Copy in the encoded strings and add a NULL to the end
- strings_array[0] = encoded_substitution
- strings_array[1:-1] = encoded_matches
- strings_array[-1] = None
-
- # Call readline's display function
- # rl_display_match_list(strings_array, number of completion matches, longest match length)
- readline_lib.rl_display_match_list(strings_array, len(encoded_matches), longest_match_length)
-
- # rl_forced_update_display() is the proper way to redraw the prompt and line, but we
- # have to use ctypes to do it since Python's readline API does not wrap the function
- readline_lib.rl_forced_update_display()
-
- # Since we updated the display, readline asks that rl_display_fixed be set for efficiency
- display_fixed = ctypes.c_int.in_dll(readline_lib, "rl_display_fixed")
- display_fixed.value = 1
-
- def _display_matches_pyreadline(self, matches):
- """
- Prints a match list using pyreadline's _display_completions()
- This exists to print self.display_matches if it has data. Otherwise matches prints.
-
- :param matches: list[str] - the tab completion matches to display
- """
- if rl_type == RlType.PYREADLINE:
-
- # Check if we should show display_matches
- if self.display_matches:
- matches_to_display = self.display_matches
- else:
- matches_to_display = matches
-
- # Add padding for visual appeal
- matches_to_display, _ = self._pad_matches_to_display(matches_to_display)
-
- # Display the matches
- orig_pyreadline_display(matches_to_display)
-
- # ----- Methods which override stuff in cmd -----
-
- def complete(self, text, state):
- """Override of command method which returns the next possible completion for 'text'.
-
- If a command has not been entered, then complete against command list.
- Otherwise try to call complete_<command> to get list of completions.
-
- This method gets called directly by readline because it is set as the tab-completion function.
-
- This completer function is called as complete(text, state), for state in 0, 1, 2, …, until it returns a
- non-string value. It should return the next possible completion starting with text.
-
- :param text: str - the current word that user is typing
- :param state: int - non-negative integer
- """
- if state == 0:
- unclosed_quote = ''
- self.set_completion_defaults()
-
- # lstrip the original line
- orig_line = readline.get_line_buffer()
- line = orig_line.lstrip()
- stripped = len(orig_line) - len(line)
-
- # Calculate new indexes for the stripped line. If the cursor is at a position before the end of a
- # line of spaces, then the following math could result in negative indexes. Enforce a max of 0.
- begidx = max(readline.get_begidx() - stripped, 0)
- endidx = max(readline.get_endidx() - stripped, 0)
-
- # Shortcuts are not word break characters when tab completing. Therefore shortcuts become part
- # of the text variable if there isn't a word break, like a space, after it. We need to remove it
- # from text and update the indexes. This only applies if we are at the the beginning of the line.
- shortcut_to_restore = ''
- if begidx == 0:
- for (shortcut, expansion) in self.shortcuts:
- if text.startswith(shortcut):
- # Save the shortcut to restore later
- shortcut_to_restore = shortcut
-
- # Adjust text and where it begins
- text = text[len(shortcut_to_restore):]
- begidx += len(shortcut_to_restore)
- break
-
- # If begidx is greater than 0, then we are no longer completing the command
- if begidx > 0:
-
- # Parse the command line
- command, args, expanded_line = self.parseline(line)
-
- # We overwrote line with a properly formatted but fully stripped version
- # Restore the end spaces since line is only supposed to be lstripped when
- # passed to completer functions according to Python docs
- rstripped_len = len(line) - len(line.rstrip())
- expanded_line += ' ' * rstripped_len
-
- # Fix the index values if expanded_line has a different size than line
- if len(expanded_line) != len(line):
- diff = len(expanded_line) - len(line)
- begidx += diff
- endidx += diff
-
- # Overwrite line to pass into completers
- line = expanded_line
-
- # Get all tokens through the one being completed
- tokens, raw_tokens = self.tokens_for_completion(line, begidx, endidx)
-
- # Either had a parsing error or are trying to complete the command token
- # The latter can happen if default_to_shell is True and parseline() allowed
- # assumed something like " or ' was a command.
- if tokens is None or len(tokens) == 1:
- self.completion_matches = []
- return None
-
- # Text we need to remove from completions later
- text_to_remove = ''
-
- # Get the token being completed with any opening quote preserved
- raw_completion_token = raw_tokens[-1]
-
- # Check if the token being completed has an opening quote
- if raw_completion_token and raw_completion_token[0] in QUOTES:
-
- # Since the token is still being completed, we know the opening quote is unclosed
- unclosed_quote = raw_completion_token[0]
-
- # readline still performs word breaks after a quote. Therefore something like quoted search
- # text with a space would have resulted in begidx pointing to the middle of the token we
- # we want to complete. Figure out where that token actually begins and save the beginning
- # portion of it that was not part of the text readline gave us. We will remove it from the
- # completions later since readline expects them to start with the original text.
- actual_begidx = line[:endidx].rfind(tokens[-1])
-
- if actual_begidx != begidx:
- text_to_remove = line[actual_begidx:begidx]
-
- # Adjust text and where it begins so the completer routines
- # get unbroken search text to complete on.
- text = text_to_remove + text
- begidx = actual_begidx
-
- # Check if a valid command was entered
- if command in self.get_all_commands():
- # Get the completer function for this command
- try:
- compfunc = getattr(self, 'complete_' + command)
- except AttributeError:
- compfunc = self.completedefault
-
- subcommands = self.get_subcommands(command)
- if subcommands is not None:
- # Since there are subcommands, then try completing those if the cursor is in
- # the token at index 1, otherwise default to using compfunc
- index_dict = {1: subcommands}
- compfunc = functools.partial(self.index_based_complete,
- index_dict=index_dict,
- all_else=compfunc)
-
- # A valid command was not entered
- else:
- # Check if this command should be run as a shell command
- if self.default_to_shell and command in self.get_exes_in_path(command):
- compfunc = self.path_complete
- else:
- compfunc = self.completedefault
-
- # Attempt tab completion for redirection first, and if that isn't occurring,
- # call the completer function for the current command
- self.completion_matches = self._redirect_complete(text, line, begidx, endidx, compfunc)
-
- if self.completion_matches:
-
- # Eliminate duplicates
- matches_set = set(self.completion_matches)
- self.completion_matches = list(matches_set)
-
- display_matches_set = set(self.display_matches)
- self.display_matches = list(display_matches_set)
-
- # Check if display_matches has been used. If so, then matches
- # on delimited strings like paths was done.
- if self.display_matches:
- matches_delimited = True
- else:
- matches_delimited = False
-
- # Since self.display_matches is empty, set it to self.completion_matches
- # before we alter them. That way the suggestions will reflect how we parsed
- # the token being completed and not how readline did.
- self.display_matches = copy.copy(self.completion_matches)
-
- # Check if we need to add an opening quote
- if not unclosed_quote:
-
- add_quote = False
-
- # This is the tab completion text that will appear on the command line.
- common_prefix = os.path.commonprefix(self.completion_matches)
-
- if matches_delimited:
- # Check if any portion of the display matches appears in the tab completion
- display_prefix = os.path.commonprefix(self.display_matches)
-
- # For delimited matches, we check what appears before the display
- # matches (common_prefix) as well as the display matches themselves.
- if (' ' in common_prefix) or (display_prefix and ' ' in ''.join(self.display_matches)):
- add_quote = True
-
- # If there is a tab completion and any match has a space, then add an opening quote
- elif common_prefix and ' ' in ''.join(self.completion_matches):
- add_quote = True
-
- if add_quote:
- # Figure out what kind of quote to add and save it as the unclosed_quote
- if '"' in ''.join(self.completion_matches):
- unclosed_quote = "'"
- else:
- unclosed_quote = '"'
-
- self.completion_matches = [unclosed_quote + match for match in self.completion_matches]
-
- # Check if we need to remove text from the beginning of tab completions
- elif text_to_remove:
- self.completion_matches = \
- [m.replace(text_to_remove, '', 1) for m in self.completion_matches]
-
- # Check if we need to restore a shortcut in the tab completions
- # so it doesn't get erased from the command line
- if shortcut_to_restore:
- self.completion_matches = \
- [shortcut_to_restore + match for match in self.completion_matches]
-
- else:
- # Complete token against aliases and command names
- alias_names = set(self.aliases.keys())
- visible_commands = set(self.get_visible_commands())
- strs_to_match = list(alias_names | visible_commands)
- self.completion_matches = self.basic_complete(text, line, begidx, endidx, strs_to_match)
-
- # Handle single result
- if len(self.completion_matches) == 1:
- str_to_append = ''
-
- # Add a closing quote if needed and allowed
- if self.allow_closing_quote and unclosed_quote:
- str_to_append += unclosed_quote
-
- # If we are at the end of the line, then add a space if allowed
- if self.allow_appended_space and endidx == len(line):
- str_to_append += ' '
-
- self.completion_matches[0] += str_to_append
-
- # Otherwise sort matches
- elif self.completion_matches:
- self.completion_matches.sort()
- self.display_matches.sort()
-
- try:
- return self.completion_matches[state]
- except IndexError:
- return None
-
- def get_all_commands(self):
- """
- Returns a list of all commands
- """
- return [cur_name[3:] for cur_name in self.get_names() if cur_name.startswith('do_')]
-
- def get_visible_commands(self):
- """
- Returns a list of commands that have not been hidden
- """
- commands = self.get_all_commands()
-
- # Remove the hidden commands
- for name in self.hidden_commands:
- if name in commands:
- commands.remove(name)
-
- return commands
-
- def get_help_topics(self):
- """ Returns a list of help topics """
- return [name[5:] for name in self.get_names() if name.startswith('help_')]
-
- def complete_help(self, text, line, begidx, endidx):
- """
- Override of parent class method to handle tab completing subcommands and not showing hidden commands
- Returns a list of possible tab completions
- """
-
- # The command is the token at index 1 in the command line
- cmd_index = 1
-
- # The subcommand is the token at index 2 in the command line
- subcmd_index = 2
-
- # Get all tokens through the one being completed
- tokens, _ = self.tokens_for_completion(line, begidx, endidx)
- if tokens is None:
- return []
-
- matches = []
-
- # Get the index of the token being completed
- index = len(tokens) - 1
-
- # Check if we are completing a command or help topic
- if index == cmd_index:
-
- # Complete token against topics and visible commands
- topics = set(self.get_help_topics())
- visible_commands = set(self.get_visible_commands())
- strs_to_match = list(topics | visible_commands)
- matches = self.basic_complete(text, line, begidx, endidx, strs_to_match)
-
- # Check if we are completing a subcommand
- elif index == subcmd_index:
-
- # Match subcommands if any exist
- command = tokens[cmd_index]
- matches = self.basic_complete(text, line, begidx, endidx, self.get_subcommands(command))
-
- return matches
-
- # noinspection PyUnusedLocal
- def sigint_handler(self, signum, frame):
- """Signal handler for SIGINTs which typically come from Ctrl-C events.
-
- If you need custom SIGINT behavior, then override this function.
-
- :param signum: int - signal number
- :param frame
- """
-
- # Save copy of pipe_proc since it could theoretically change while this is running
- pipe_proc = self.pipe_proc
-
- if pipe_proc is not None:
- pipe_proc.terminate()
-
- # Re-raise a KeyboardInterrupt so other parts of the code can catch it
- raise KeyboardInterrupt("Got a keyboard interrupt")
-
- def preloop(self):
- """"Hook method executed once when the cmdloop() method is called."""
-
- # Register a default SIGINT signal handler for Ctrl+C
- signal.signal(signal.SIGINT, self.sigint_handler)
-
- def precmd(self, statement):
- """Hook method executed just before the command is processed by ``onecmd()`` and after adding it to the history.
-
- :param statement: ParsedString - subclass of str which also contains pyparsing ParseResults instance
- :return: ParsedString - a potentially modified version of the input ParsedString statement
- """
- return statement
-
- # ----- Methods which are cmd2-specific lifecycle hooks which are not present in cmd -----
-
- # noinspection PyMethodMayBeStatic
- def preparse(self, raw):
- """Hook method executed just before the command line is interpreted, but after the input prompt is generated.
-
- :param raw: str - raw command line input
- :return: str - potentially modified raw command line input
- """
- return raw
-
- # noinspection PyMethodMayBeStatic
- def postparse(self, parse_result):
- """Hook that runs immediately after parsing the command-line but before ``parsed()`` returns a ParsedString.
-
- :param parse_result: pyparsing.ParseResults - parsing results output by the pyparsing parser
- :return: pyparsing.ParseResults - potentially modified ParseResults object
- """
- return parse_result
-
- # noinspection PyMethodMayBeStatic
- def postparsing_precmd(self, statement):
- """This runs after parsing the command-line, but before anything else; even before adding cmd to history.
-
- NOTE: This runs before precmd() and prior to any potential output redirection or piping.
-
- If you wish to fatally fail this command and exit the application entirely, set stop = True.
-
- If you wish to just fail this command you can do so by raising an exception:
-
- - raise EmptyStatement - will silently fail and do nothing
- - raise <AnyOtherException> - will fail and print an error message
-
- :param statement: - the parsed command-line statement
- :return: (bool, statement) - (stop, statement) containing a potentially modified version of the statement
- """
- stop = False
- return stop, statement
-
- # noinspection PyMethodMayBeStatic
- def postparsing_postcmd(self, stop):
- """This runs after everything else, including after postcmd().
-
- It even runs when an empty line is entered. Thus, if you need to do something like update the prompt due
- to notifications from a background thread, then this is the method you want to override to do it.
-
- :param stop: bool - True implies the entire application should exit.
- :return: bool - True implies the entire application should exit.
- """
- if not sys.platform.startswith('win'):
- # Fix those annoying problems that occur with terminal programs like "less" when you pipe to them
- if self.stdin.isatty():
- proc = subprocess.Popen(shlex.split('stty sane'))
- proc.communicate()
- return stop
-
- def parseline(self, line):
- """Parse the line into a command name and a string containing the arguments.
-
- NOTE: This is an override of a parent class method. It is only used by other parent class methods. But
- we do need to override it here so that the additional shortcuts present in cmd2 get properly expanded for
- purposes of tab completion.
-
- Used for command tab completion. Returns a tuple containing (command, args, line).
- 'command' and 'args' may be None if the line couldn't be parsed.
-
- :param line: str - line read by readline
- :return: (str, str, str) - tuple containing (command, args, line)
- """
- line = line.strip()
-
- if not line:
- # Deal with empty line or all whitespace line
- return None, None, line
-
- # Make a copy of aliases so we can edit it
- tmp_aliases = list(self.aliases.keys())
- keep_expanding = len(tmp_aliases) > 0
-
- # Expand aliases
- while keep_expanding:
- for cur_alias in tmp_aliases:
- keep_expanding = False
-
- if line == cur_alias or line.startswith(cur_alias + ' '):
- line = line.replace(cur_alias, self.aliases[cur_alias], 1)
-
- # Do not expand the same alias more than once
- tmp_aliases.remove(cur_alias)
- keep_expanding = len(tmp_aliases) > 0
- break
-
- # Expand command shortcut to its full command name
- for (shortcut, expansion) in self.shortcuts:
- if line.startswith(shortcut):
- # If the next character after the shortcut isn't a space, then insert one
- shortcut_len = len(shortcut)
- if len(line) == shortcut_len or line[shortcut_len] != ' ':
- expansion += ' '
-
- # Expand the shortcut
- line = line.replace(shortcut, expansion, 1)
- break
-
- i, n = 0, len(line)
-
- # If we are allowing shell commands, then allow any character in the command
- if self.default_to_shell:
- while i < n and line[i] != ' ':
- i += 1
-
- # Otherwise only allow those in identchars
- else:
- while i < n and line[i] in self.identchars:
- i += 1
-
- command, arg = line[:i], line[i:].strip()
-
- return command, arg, line
-
- def onecmd_plus_hooks(self, line):
- """Top-level function called by cmdloop() to handle parsing a line and running the command and all of its hooks.
-
- :param line: str - line of text read from input
- :return: bool - True if cmdloop() should exit, False otherwise
- """
- stop = 0
- try:
- statement = self._complete_statement(line)
- (stop, statement) = self.postparsing_precmd(statement)
- if stop:
- return self.postparsing_postcmd(stop)
-
- try:
- if self.allow_redirection:
- self._redirect_output(statement)
- timestart = datetime.datetime.now()
- statement = self.precmd(statement)
- stop = self.onecmd(statement)
- stop = self.postcmd(stop, statement)
- if self.timing:
- self.pfeedback('Elapsed: %s' % str(datetime.datetime.now() - timestart))
- finally:
- if self.allow_redirection:
- self._restore_output(statement)
- except EmptyStatement:
- pass
- except ValueError as ex:
- # If shlex.split failed on syntax, let user know whats going on
- self.perror("Invalid syntax: {}".format(ex), traceback_war=False)
- except Exception as ex:
- self.perror(ex, type(ex).__name__)
- finally:
- return self.postparsing_postcmd(stop)
-
- def runcmds_plus_hooks(self, cmds):
- """Convenience method to run multiple commands by onecmd_plus_hooks.
-
- This method adds the given cmds to the command queue and processes the
- queue until completion or an error causes it to abort. Scripts that are
- loaded will have their commands added to the queue. Scripts may even
- load other scripts recursively. This means, however, that you should not
- use this method if there is a running cmdloop or some other event-loop.
- This method is only intended to be used in "one-off" scenarios.
-
- NOTE: You may need this method even if you only have one command. If
- that command is a load, then you will need this command to fully process
- all the subsequent commands that are loaded from the script file. This
- is an improvement over onecmd_plus_hooks, which expects to be used
- inside of a command loop which does the processing of loaded commands.
-
- Example: cmd_obj.runcmds_plus_hooks(['load myscript.txt'])
-
- :param cmds: list - Command strings suitable for onecmd_plus_hooks.
- :return: bool - True implies the entire application should exit.
-
- """
- stop = False
- self.cmdqueue = list(cmds) + self.cmdqueue
- try:
- while self.cmdqueue and not stop:
- line = self.cmdqueue.pop(0)
- if self.echo and line != 'eos':
- self.poutput('{}{}'.format(self.prompt, line))
-
- stop = self.onecmd_plus_hooks(line)
- finally:
- # Clear out the command queue and script directory stack, just in
- # case we hit an error and they were not completed.
- self.cmdqueue = []
- self._script_dir = []
- # NOTE: placing this return here inside the finally block will
- # swallow exceptions. This is consistent with what is done in
- # onecmd_plus_hooks and _cmdloop, although it may not be
- # necessary/desired here.
- return stop
-
- def _complete_statement(self, line):
- """Keep accepting lines of input until the command is complete."""
- if not line or (not pyparsing.Or(self.commentGrammars).setParseAction(lambda x: '').transformString(line)):
- raise EmptyStatement()
- statement = self.parser_manager.parsed(line)
- while statement.parsed.multilineCommand and (statement.parsed.terminator == ''):
- statement = '%s\n%s' % (statement.parsed.raw,
- self.pseudo_raw_input(self.continuation_prompt))
- statement = self.parser_manager.parsed(statement)
- if not statement.parsed.command:
- raise EmptyStatement()
- return statement
-
- def _redirect_output(self, statement):
- """Handles output redirection for >, >>, and |.
-
- :param statement: ParsedString - subclass of str which also contains pyparsing ParseResults instance
- """
- if statement.parsed.pipeTo:
- self.kept_state = Statekeeper(self, ('stdout',))
-
- # Create a pipe with read and write sides
- read_fd, write_fd = os.pipe()
-
- # Open each side of the pipe and set stdout accordingly
- # noinspection PyTypeChecker
- self.stdout = io.open(write_fd, 'w')
- self.redirecting = True
- # noinspection PyTypeChecker
- subproc_stdin = io.open(read_fd, 'r')
-
- # We want Popen to raise an exception if it fails to open the process. Thus we don't set shell to True.
- try:
- self.pipe_proc = subprocess.Popen(shlex.split(statement.parsed.pipeTo), stdin=subproc_stdin)
- except Exception as ex:
- # Restore stdout to what it was and close the pipe
- self.stdout.close()
- subproc_stdin.close()
- self.pipe_proc = None
- self.kept_state.restore()
- self.kept_state = None
- self.redirecting = False
-
- # Re-raise the exception
- raise ex
- elif statement.parsed.output:
- if (not statement.parsed.outputTo) and (not can_clip):
- raise EnvironmentError('Cannot redirect to paste buffer; install ``xclip`` and re-run to enable')
- self.kept_state = Statekeeper(self, ('stdout',))
- self.kept_sys = Statekeeper(sys, ('stdout',))
- self.redirecting = True
- if statement.parsed.outputTo:
- mode = 'w'
- if statement.parsed.output == 2 * self.redirector:
- mode = 'a'
- sys.stdout = self.stdout = open(os.path.expanduser(statement.parsed.outputTo), mode)
- else:
- sys.stdout = self.stdout = tempfile.TemporaryFile(mode="w+")
- if statement.parsed.output == '>>':
- self.poutput(get_paste_buffer())
-
- def _restore_output(self, statement):
- """Handles restoring state after output redirection as well as the actual pipe operation if present.
-
- :param statement: ParsedString - subclass of str which also contains pyparsing ParseResults instance
- """
- # If we have redirected output to a file or the clipboard or piped it to a shell command, then restore state
- if self.kept_state is not None:
- # If we redirected output to the clipboard
- if statement.parsed.output and not statement.parsed.outputTo:
- self.stdout.seek(0)
- write_to_paste_buffer(self.stdout.read())
-
- try:
- # Close the file or pipe that stdout was redirected to
- self.stdout.close()
- except BrokenPipeError:
- pass
- finally:
- # Restore self.stdout
- self.kept_state.restore()
- self.kept_state = None
-
- # If we were piping output to a shell command, then close the subprocess the shell command was running in
- if self.pipe_proc is not None:
- self.pipe_proc.communicate()
- self.pipe_proc = None
-
- # Restore sys.stdout if need be
- if self.kept_sys is not None:
- self.kept_sys.restore()
- self.kept_sys = None
-
- self.redirecting = False
-
- def _func_named(self, arg):
- """Gets the method name associated with a given command.
-
- :param arg: str - command to look up method name which implements it
- :return: str - method name which implements the given command
- """
- result = None
- target = 'do_' + arg
- if target in dir(self):
- result = target
- return result
-
- def onecmd(self, line):
- """ This executes the actual do_* method for a command.
-
- If the command provided doesn't exist, then it executes _default() instead.
-
- :param line: ParsedString - subclass of string including the pyparsing ParseResults
- :return: bool - a flag indicating whether the interpretation of commands should stop
- """
- statement = self.parser_manager.parsed(line)
- funcname = self._func_named(statement.parsed.command)
- if not funcname:
- return self.default(statement)
-
- # Since we have a valid command store it in the history
- if statement.parsed.command not in self.exclude_from_history:
- self.history.append(statement.parsed.raw)
-
- try:
- func = getattr(self, funcname)
- except AttributeError:
- return self.default(statement)
-
- stop = func(statement)
- return stop
-
- def default(self, statement):
- """Executed when the command given isn't a recognized command implemented by a do_* method.
-
- :param statement: ParsedString - subclass of string including the pyparsing ParseResults
- :return:
- """
- arg = statement.full_parsed_statement()
- if self.default_to_shell:
- result = os.system(arg)
- # If os.system() succeeded, then don't print warning about unknown command
- if not result:
- return
-
- # Print out a message stating this is an unknown command
- self.poutput('*** Unknown syntax: {}\n'.format(arg))
-
- @staticmethod
- def _surround_ansi_escapes(prompt, start="\x01", end="\x02"):
- """Overcome bug in GNU Readline in relation to calculation of prompt length in presence of ANSI escape codes.
-
- :param prompt: str - original prompt
- :param start: str - start code to tell GNU Readline about beginning of invisible characters
- :param end: str - end code to tell GNU Readline about end of invisible characters
- :return: str - prompt safe to pass to GNU Readline
- """
- # Windows terminals don't use ANSI escape codes and Windows readline isn't based on GNU Readline
- if sys.platform == "win32":
- return prompt
-
- escaped = False
- result = ""
-
- for c in prompt:
- if c == "\x1b" and not escaped:
- result += start + c
- escaped = True
- elif c.isalpha() and escaped:
- result += c + end
- escaped = False
- else:
- result += c
-
- return result
-
- def pseudo_raw_input(self, prompt):
- """
- began life as a copy of cmd's cmdloop; like raw_input but
-
- - accounts for changed stdin, stdout
- - if input is a pipe (instead of a tty), look at self.echo
- to decide whether to print the prompt and the input
- """
-
- # Deal with the vagaries of readline and ANSI escape codes
- safe_prompt = self._surround_ansi_escapes(prompt)
-
- if self.use_rawinput:
- try:
- if sys.stdin.isatty():
- line = input(safe_prompt)
- else:
- line = input()
- if self.echo:
- sys.stdout.write('{}{}\n'.format(safe_prompt, line))
- except EOFError:
- line = 'eof'
- else:
- if self.stdin.isatty():
- # on a tty, print the prompt first, then read the line
- self.poutput(safe_prompt, end='')
- self.stdout.flush()
- line = self.stdin.readline()
- if len(line) == 0:
- line = 'eof'
- else:
- # we are reading from a pipe, read the line to see if there is
- # anything there, if so, then decide whether to print the
- # prompt or not
- line = self.stdin.readline()
- if len(line):
- # we read something, output the prompt and the something
- if self.echo:
- self.poutput('{}{}'.format(safe_prompt, line))
- else:
- line = 'eof'
- return line.strip()
-
- def _cmdloop(self):
- """Repeatedly issue a prompt, accept input, parse an initial prefix
- off the received input, and dispatch to action methods, passing them
- the remainder of the line as argument.
-
- This serves the same role as cmd.cmdloop().
-
- :return: bool - True implies the entire application should exit.
- """
- # An almost perfect copy from Cmd; however, the pseudo_raw_input portion
- # has been split out so that it can be called separately
- if self.use_rawinput and self.completekey:
-
- # Set up readline for our tab completion needs
- if rl_type == RlType.GNU:
- readline.set_completion_display_matches_hook(self._display_matches_gnu_readline)
-
- # Set GNU readline's rl_basic_quote_characters to NULL so it won't automatically add a closing quote
- # We don't need to worry about setting rl_completion_suppress_quote since we never declared
- # rl_completer_quote_characters.
- rl_basic_quote_characters.value = None
-
- elif rl_type == RlType.PYREADLINE:
- readline.rl.mode._display_completions = self._display_matches_pyreadline
-
- try:
- self.old_completer = readline.get_completer()
- self.old_delims = readline.get_completer_delims()
- readline.set_completer(self.complete)
-
- # Break words on whitespace and quotes when tab completing
- completer_delims = " \t\n" + ''.join(QUOTES)
-
- if self.allow_redirection:
- # If redirection is allowed, then break words on those characters too
- completer_delims += ''.join(REDIRECTION_CHARS)
-
- readline.set_completer_delims(completer_delims)
-
- # Enable tab completion
- readline.parse_and_bind(self.completekey + ": complete")
- except NameError:
- pass
- stop = None
- try:
- while not stop:
- if self.cmdqueue:
- # Run command out of cmdqueue if nonempty (populated by load command or commands at invocation)
- line = self.cmdqueue.pop(0)
-
- if self.echo and line != 'eos':
- self.poutput('{}{}'.format(self.prompt, line))
- else:
- # Otherwise, read a command from stdin
- if not self.quit_on_sigint:
- try:
- line = self.pseudo_raw_input(self.prompt)
- except KeyboardInterrupt:
- self.poutput('^C')
- line = ''
- else:
- line = self.pseudo_raw_input(self.prompt)
-
- # Run the command along with all associated pre and post hooks
- stop = self.onecmd_plus_hooks(line)
- finally:
- if self.use_rawinput and self.completekey:
-
- # Restore what we changed in readline
- try:
- readline.set_completer(self.old_completer)
- readline.set_completer_delims(self.old_delims)
- except NameError:
- pass
-
- if rl_type == RlType.GNU:
- readline.set_completion_display_matches_hook(None)
- rl_basic_quote_characters.value = orig_rl_basic_quote_characters_addr
-
- elif rl_type == RlType.PYREADLINE:
- readline.rl.mode._display_completions = orig_pyreadline_display
-
- self.cmdqueue.clear()
- self._script_dir.clear()
-
- return stop
-
- @with_argument_list
- def do_alias(self, arglist):
- """Define or display aliases
-
-Usage: Usage: alias [name] | [<name> <value>]
- Where:
- name - name of the alias being looked up, added, or replaced
- value - what the alias will be resolved to (if adding or replacing)
- this can contain spaces and does not need to be quoted
-
- Without arguments, 'alias' prints a list of all aliases in a reusable form which
- can be outputted to a startup_script to preserve aliases across sessions.
-
- With one argument, 'alias' shows the value of the specified alias.
- Example: alias ls (Prints the value of the alias called 'ls' if it exists)
-
- With two or more arguments, 'alias' creates or replaces an alias.
-
- Example: alias ls !ls -lF
-
- If you want to use redirection or pipes in the alias, then either quote the tokens with these
- characters or quote the entire alias value.
-
- Examples:
- alias save_results print_results ">" out.txt
- alias save_results print_results "> out.txt"
- alias save_results "print_results > out.txt"
-"""
- # If no args were given, then print a list of current aliases
- if not arglist:
- for cur_alias in self.aliases:
- self.poutput("alias {} {}".format(cur_alias, self.aliases[cur_alias]))
-
- # The user is looking up an alias
- elif len(arglist) == 1:
- name = arglist[0]
- if name in self.aliases:
- self.poutput("alias {} {}".format(name, self.aliases[name]))
- else:
- self.perror("Alias {!r} not found".format(name), traceback_war=False)
-
- # The user is creating an alias
- else:
- name = arglist[0]
- value = ' '.join(arglist[1:])
-
- # Check for a valid name
- for cur_char in name:
- if cur_char not in self.identchars:
- self.perror("Alias names can only contain the following characters: {}".format(self.identchars),
- traceback_war=False)
- return
-
- # Set the alias
- self.aliases[name] = value
- self.poutput("Alias {!r} created".format(name))
-
- def complete_alias(self, text, line, begidx, endidx):
- """ Tab completion for alias """
- index_dict = \
- {
- 1: self.aliases,
- 2: self.get_visible_commands()
- }
- return self.index_based_complete(text, line, begidx, endidx, index_dict, self.path_complete)
-
- @with_argument_list
- def do_unalias(self, arglist):
- """Unsets aliases
-
-Usage: Usage: unalias [-a] name [name ...]
- Where:
- name - name of the alias being unset
-
- Options:
- -a remove all alias definitions
-"""
- if not arglist:
- self.do_help('unalias')
-
- if '-a' in arglist:
- self.aliases.clear()
- self.poutput("All aliases cleared")
-
- else:
- # Get rid of duplicates
- arglist = list(set(arglist))
-
- for cur_arg in arglist:
- if cur_arg in self.aliases:
- del self.aliases[cur_arg]
- self.poutput("Alias {!r} cleared".format(cur_arg))
- else:
- self.perror("Alias {!r} does not exist".format(cur_arg), traceback_war=False)
-
- def complete_unalias(self, text, line, begidx, endidx):
- """ Tab completion for unalias """
- return self.basic_complete(text, line, begidx, endidx, self.aliases)
-
- @with_argument_list
- def do_help(self, arglist):
- """List available commands with "help" or detailed help with "help cmd"."""
- if not arglist or (len(arglist) == 1 and arglist[0] in ('--verbose', '-v')):
- verbose = len(arglist) == 1 and arglist[0] in ('--verbose', '-v')
- self._help_menu(verbose)
- else:
- # Getting help for a specific command
- funcname = self._func_named(arglist[0])
- if funcname:
- # Check to see if this function was decorated with an argparse ArgumentParser
- func = getattr(self, funcname)
- if func.__dict__.get('has_parser', False):
- # Function has an argparser, so get help based on all the arguments in case there are sub-commands
- new_arglist = arglist[1:]
- new_arglist.append('-h')
-
- # Temporarily redirect all argparse output to both sys.stdout and sys.stderr to self.stdout
- with redirect_stdout(self.stdout):
- with redirect_stderr(self.stdout):
- func(new_arglist)
- else:
- # No special behavior needed, delegate to cmd base class do_help()
- cmd.Cmd.do_help(self, funcname[3:])
- else:
- # This could be a help topic
- cmd.Cmd.do_help(self, arglist[0])
-
- def _help_menu(self, verbose=False):
- """Show a list of commands which help can be displayed for.
- """
- # Get a sorted list of help topics
- help_topics = self.get_help_topics()
- help_topics.sort()
-
- # Get a sorted list of visible command names
- visible_commands = self.get_visible_commands()
- visible_commands.sort()
-
- cmds_doc = []
- cmds_undoc = []
- cmds_cats = {}
-
- for command in visible_commands:
- if command in help_topics or getattr(self, self._func_named(command)).__doc__:
- if command in help_topics:
- help_topics.remove(command)
- if hasattr(getattr(self, self._func_named(command)), HELP_CATEGORY):
- category = getattr(getattr(self, self._func_named(command)), HELP_CATEGORY)
- cmds_cats.setdefault(category, [])
- cmds_cats[category].append(command)
- else:
- cmds_doc.append(command)
- else:
- cmds_undoc.append(command)
-
- if len(cmds_cats) == 0:
- # No categories found, fall back to standard behavior
- self.poutput("{}\n".format(str(self.doc_leader)))
- self._print_topics(self.doc_header, cmds_doc, verbose)
- else:
- # Categories found, Organize all commands by category
- self.poutput('{}\n'.format(str(self.doc_leader)))
- self.poutput('{}\n\n'.format(str(self.doc_header)))
- for category in sorted(cmds_cats.keys()):
- self._print_topics(category, cmds_cats[category], verbose)
- self._print_topics('Other', cmds_doc, verbose)
-
- self.print_topics(self.misc_header, help_topics, 15, 80)
- self.print_topics(self.undoc_header, cmds_undoc, 15, 80)
-
- def _print_topics(self, header, cmds, verbose):
- """Customized version of print_topics that can switch between verbose or traditional output"""
- if cmds:
- if not verbose:
- self.print_topics(header, cmds, 15, 80)
- else:
- self.stdout.write('{}\n'.format(str(header)))
- widest = 0
- # measure the commands
- for command in cmds:
- width = len(command)
- if width > widest:
- widest = width
- # add a 4-space pad
- widest += 4
- if widest < 20:
- widest = 20
-
- if self.ruler:
- self.stdout.write('{:{ruler}<{width}}\n'.format('', ruler=self.ruler, width=80))
-
- for command in cmds:
- # Try to get the documentation string
- try:
- # first see if there's a help function implemented
- func = getattr(self, 'help_' + command)
- except AttributeError:
- # Couldn't find a help function
- try:
- # Now see if help_summary has been set
- doc = getattr(self, self._func_named(command)).help_summary
- except AttributeError:
- # Last, try to directly access the function's doc-string
- doc = getattr(self, self._func_named(command)).__doc__
- else:
- # we found the help function
- result = StringIO()
- # try to redirect system stdout
- with redirect_stdout(result):
- # save our internal stdout
- stdout_orig = self.stdout
- try:
- # redirect our internal stdout
- self.stdout = result
- func()
- finally:
- # restore internal stdout
- self.stdout = stdout_orig
- doc = result.getvalue()
-
- # Attempt to locate the first documentation block
- doc_block = []
- found_first = False
- for doc_line in doc.splitlines():
- str(doc_line).strip()
- if len(doc_line.strip()) > 0:
- doc_block.append(doc_line.strip())
- found_first = True
- else:
- if found_first:
- break
-
- for doc_line in doc_block:
- self.stdout.write('{: <{col_width}}{doc}\n'.format(command,
- col_width=widest,
- doc=doc_line))
- command = ''
- self.stdout.write("\n")
-
- def do_shortcuts(self, _):
- """Lists shortcuts (aliases) available."""
- result = "\n".join('%s: %s' % (sc[0], sc[1]) for sc in sorted(self.shortcuts))
- self.poutput("Shortcuts for other commands:\n{}\n".format(result))
-
- def do_eof(self, _):
- """Called when <Ctrl>-D is pressed."""
- # End of script should not exit app, but <Ctrl>-D should.
- print('') # Required for clearing line when exiting submenu
- return self._STOP_AND_EXIT
-
- def do_quit(self, _):
- """Exits this application."""
- self._should_quit = True
- return self._STOP_AND_EXIT
-
- def select(self, opts, prompt='Your choice? '):
- """Presents a numbered menu to the user. Modelled after
- the bash shell's SELECT. Returns the item chosen.
-
- Argument ``opts`` can be:
-
- | a single string -> will be split into one-word options
- | a list of strings -> will be offered as options
- | a list of tuples -> interpreted as (value, text), so
- that the return value can differ from
- the text advertised to the user """
- local_opts = opts
- if isinstance(opts, str):
- local_opts = list(zip(opts.split(), opts.split()))
- fulloptions = []
- for opt in local_opts:
- if isinstance(opt, str):
- fulloptions.append((opt, opt))
- else:
- try:
- fulloptions.append((opt[0], opt[1]))
- except IndexError:
- fulloptions.append((opt[0], opt[0]))
- for (idx, (value, text)) in enumerate(fulloptions):
- self.poutput(' %2d. %s\n' % (idx + 1, text))
- while True:
- response = input(prompt)
- hlen = readline.get_current_history_length()
- if hlen >= 1 and response != '':
- readline.remove_history_item(hlen - 1)
- try:
- response = int(response)
- result = fulloptions[response - 1][0]
- break
- except (ValueError, IndexError):
- self.poutput("{!r} isn't a valid choice. Pick a number between 1 and {}:\n".format(response,
- len(fulloptions)))
- return result
-
- def cmdenvironment(self):
- """Get a summary report of read-only settings which the user cannot modify at runtime.
-
- :return: str - summary report of read-only settings which the user cannot modify at runtime
- """
- read_only_settings = """
- Commands may be terminated with: {}
- Arguments at invocation allowed: {}
- Output redirection and pipes allowed: {}
- Parsing of command arguments:
- Shell lexer mode for command argument splitting: {}
- Strip Quotes after splitting arguments: {}
- """.format(str(self.terminators), self.allow_cli_args, self.allow_redirection,
- "POSIX" if POSIX_SHLEX else "non-POSIX",
- "True" if STRIP_QUOTES_FOR_NON_POSIX and not POSIX_SHLEX else "False")
- return read_only_settings
-
- def show(self, args, parameter):
- param = ''
- if parameter:
- param = parameter.strip().lower()
- result = {}
- maxlen = 0
- for p in self.settable:
- if (not param) or p.startswith(param):
- result[p] = '%s: %s' % (p, str(getattr(self, p)))
- maxlen = max(maxlen, len(result[p]))
- if result:
- for p in sorted(result):
- if args.long:
- self.poutput('{} # {}'.format(result[p].ljust(maxlen), self.settable[p]))
- else:
- self.poutput(result[p])
-
- # If user has requested to see all settings, also show read-only settings
- if args.all:
- self.poutput('\nRead only settings:{}'.format(self.cmdenvironment()))
- else:
- raise LookupError("Parameter '%s' not supported (type 'show' for list of parameters)." % param)
-
- set_parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter)
- set_parser.add_argument('-a', '--all', action='store_true', help='display read-only settings as well')
- set_parser.add_argument('-l', '--long', action='store_true', help='describe function of parameter')
- set_parser.add_argument('settable', nargs='*', help='[param_name] [value]')
-
- @with_argparser(set_parser)
- def do_set(self, args):
- """Sets a settable parameter or shows current settings of parameters.
-
- Accepts abbreviated parameter names so long as there is no ambiguity.
- Call without arguments for a list of settable parameters with their values.
- """
- try:
- param_name, val = args.settable
- val = val.strip()
- param_name = param_name.strip().lower()
- if param_name not in self.settable:
- hits = [p for p in self.settable if p.startswith(param_name)]
- if len(hits) == 1:
- param_name = hits[0]
- else:
- return self.show(args, param_name)
- current_val = getattr(self, param_name)
- if (val[0] == val[-1]) and val[0] in ("'", '"'):
- val = val[1:-1]
- else:
- val = cast(current_val, val)
- setattr(self, param_name, val)
- self.poutput('%s - was: %s\nnow: %s\n' % (param_name, current_val, val))
- if current_val != val:
- try:
- onchange_hook = getattr(self, '_onchange_%s' % param_name)
- onchange_hook(old=current_val, new=val)
- except AttributeError:
- pass
- except (ValueError, AttributeError):
- param = ''
- if args.settable:
- param = args.settable[0]
- self.show(args, param)
-
- def do_shell(self, command):
- """Execute a command as if at the OS prompt.
-
- Usage: shell <command> [arguments]"""
-
- try:
- # Use non-POSIX parsing to keep the quotes around the tokens
- tokens = shlex.split(command, posix=False)
- except ValueError as err:
- self.perror(err, traceback_war=False)
- return
-
- # Support expanding ~ in quoted paths
- for index, _ in enumerate(tokens):
- if tokens[index]:
- # Check if the token is quoted. Since shlex.split() passed, there isn't
- # an unclosed quote, so we only need to check the first character.
- first_char = tokens[index][0]
- if first_char in QUOTES:
- tokens[index] = strip_quotes(tokens[index])
-
- tokens[index] = os.path.expanduser(tokens[index])
-
- # Restore the quotes
- if first_char in QUOTES:
- tokens[index] = first_char + tokens[index] + first_char
-
- expanded_command = ' '.join(tokens)
- proc = subprocess.Popen(expanded_command, stdout=self.stdout, shell=True)
- proc.communicate()
-
- def complete_shell(self, text, line, begidx, endidx):
- """Handles tab completion of executable commands and local file system paths for the shell command
-
- :param text: str - the string prefix we are attempting to match (all returned matches must begin with it)
- :param line: str - the current input line with leading whitespace removed
- :param begidx: int - the beginning index of the prefix text
- :param endidx: int - the ending index of the prefix text
- :return: List[str] - a list of possible tab completions
- """
- index_dict = {1: self.shell_cmd_complete}
- return self.index_based_complete(text, line, begidx, endidx, index_dict, self.path_complete)
-
- def cmd_with_subs_completer(self, text, line, begidx, endidx):
- """
- This is a function provided for convenience to those who want an easy way to add
- tab completion to functions that implement subcommands. By setting this as the
- completer of the base command function, the correct completer for the chosen subcommand
- will be called.
-
- The use of this function requires assigning a completer function to the subcommand's parser
- Example:
- A command called print has a subcommands called 'names' that needs a tab completer
- When you create the parser for names, include the completer function in the parser's defaults.
-
- names_parser.set_defaults(func=print_names, completer=complete_print_names)
-
- To make sure the names completer gets called, set the completer for the print function
- in a similar fashion to what follows.
-
- complete_print = cmd2.Cmd.cmd_with_subs_completer
-
- When the subcommand's completer is called, this function will have stripped off all content from the
- beginning of the command line before the subcommand, meaning the line parameter always starts with the
- subcommand name and the index parameters reflect this change.
-
- For instance, the command "print names -d 2" becomes "names -d 2"
- begidx and endidx are incremented accordingly
-
- :param text: str - the string prefix we are attempting to match (all returned matches must begin with it)
- :param line: str - the current input line with leading whitespace removed
- :param begidx: int - the beginning index of the prefix text
- :param endidx: int - the ending index of the prefix text
- :return: List[str] - a list of possible tab completions
- """
- # The command is the token at index 0 in the command line
- cmd_index = 0
-
- # The subcommand is the token at index 1 in the command line
- subcmd_index = 1
-
- # Get all tokens through the one being completed
- tokens, _ = self.tokens_for_completion(line, begidx, endidx)
- if tokens is None:
- return []
-
- matches = []
-
- # Get the index of the token being completed
- index = len(tokens) - 1
-
- # If the token being completed is past the subcommand name, then do subcommand specific tab-completion
- if index > subcmd_index:
-
- # Get the command name
- command = tokens[cmd_index]
-
- # Get the subcommand name
- subcommand = tokens[subcmd_index]
-
- # Find the offset into line where the subcommand name begins
- subcmd_start = 0
- for cur_index in range(0, subcmd_index + 1):
- cur_token = tokens[cur_index]
- subcmd_start = line.find(cur_token, subcmd_start)
-
- if cur_index != subcmd_index:
- subcmd_start += len(cur_token)
-
- # Strip off everything before subcommand name
- orig_line = line
- line = line[subcmd_start:]
-
- # Update the indexes
- diff = len(orig_line) - len(line)
- begidx -= diff
- endidx -= diff
-
- # Call the subcommand specific completer if it exists
- compfunc = self.get_subcommand_completer(command, subcommand)
- if compfunc is not None:
- matches = compfunc(self, text, line, begidx, endidx)
-
- return matches
-
- # noinspection PyBroadException
- def do_py(self, arg):
- """
- Invoke python command, shell, or script
-
- py <command>: Executes a Python command.
- py: Enters interactive Python mode.
- End with ``Ctrl-D`` (Unix) / ``Ctrl-Z`` (Windows), ``quit()``, '`exit()``.
- Non-python commands can be issued with ``cmd("your command")``.
- Run python code from external script files with ``run("script.py")``
- """
- if self._in_py:
- self.perror("Recursively entering interactive Python consoles is not allowed.", traceback_war=False)
- return
- self._in_py = True
-
- try:
- self.pystate['self'] = self
- arg = arg.strip()
-
- # Support the run command even if called prior to invoking an interactive interpreter
- def run(filename):
- """Run a Python script file in the interactive console.
-
- :param filename: str - filename of *.py script file to run
- """
- try:
- with open(filename) as f:
- interp.runcode(f.read())
- except IOError as e:
- self.perror(e)
-
- def onecmd_plus_hooks(cmd_plus_args):
- """Run a cmd2.Cmd command from a Python script or the interactive Python console.
-
- :param cmd_plus_args: str - command line including command and arguments to run
- :return: bool - True if cmdloop() should exit once leaving the interactive Python console
- """
- return self.onecmd_plus_hooks(cmd_plus_args + '\n')
-
- self.pystate['run'] = run
- self.pystate['cmd'] = onecmd_plus_hooks
-
- localvars = (self.locals_in_py and self.pystate) or {}
- interp = InteractiveConsole(locals=localvars)
- interp.runcode('import sys, os;sys.path.insert(0, os.getcwd())')
-
- if arg:
- interp.runcode(arg)
- else:
- # noinspection PyShadowingBuiltins
- def quit():
- """Function callable from the interactive Python console to exit that environment"""
- raise EmbeddedConsoleExit
-
- self.pystate['quit'] = quit
- self.pystate['exit'] = quit
-
- keepstate = None
- try:
- cprt = 'Type "help", "copyright", "credits" or "license" for more information.'
- keepstate = Statekeeper(sys, ('stdin', 'stdout'))
- sys.stdout = self.stdout
- sys.stdin = self.stdin
- interp.interact(banner="Python %s on %s\n%s\n(%s)\n%s" %
- (sys.version, sys.platform, cprt, self.__class__.__name__,
- self.do_py.__doc__))
- except EmbeddedConsoleExit:
- pass
- if keepstate is not None:
- keepstate.restore()
- except Exception:
- pass
- finally:
- self._in_py = False
- return self._should_quit
-
- @with_argument_list
- def do_pyscript(self, arglist):
- """\nRuns a python script file inside the console
-
- Usage: pyscript <script_path> [script_arguments]
-
-Console commands can be executed inside this script with cmd("your command")
-However, you cannot run nested "py" or "pyscript" commands from within this script
-Paths or arguments that contain spaces must be enclosed in quotes
-"""
- if not arglist:
- self.perror("pyscript command requires at least 1 argument ...", traceback_war=False)
- self.do_help('pyscript')
- return
-
- # Get the absolute path of the script
- script_path = os.path.expanduser(arglist[0])
-
- # Save current command line arguments
- orig_args = sys.argv
-
- # Overwrite sys.argv to allow the script to take command line arguments
- sys.argv = [script_path]
- sys.argv.extend(arglist[1:])
-
- # Run the script - use repr formatting to escape things which need to be escaped to prevent issues on Windows
- self.do_py("run({!r})".format(script_path))
-
- # Restore command line arguments to original state
- sys.argv = orig_args
-
- # Enable tab-completion for pyscript command
- def complete_pyscript(self, text, line, begidx, endidx):
- index_dict = {1: self.path_complete}
- return self.index_based_complete(text, line, begidx, endidx, index_dict)
-
- # Only include the do_ipy() method if IPython is available on the system
- if ipython_available:
- # noinspection PyMethodMayBeStatic,PyUnusedLocal
- def do_ipy(self, arg):
- """Enters an interactive IPython shell.
-
- Run python code from external files with ``run filename.py``
- End with ``Ctrl-D`` (Unix) / ``Ctrl-Z`` (Windows), ``quit()``, '`exit()``.
- """
- banner = 'Entering an embedded IPython shell type quit() or <Ctrl>-d to exit ...'
- exit_msg = 'Leaving IPython, back to {}'.format(sys.argv[0])
- embed(banner1=banner, exit_msg=exit_msg)
-
- history_parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter)
- history_parser_group = history_parser.add_mutually_exclusive_group()
- history_parser_group.add_argument('-r', '--run', action='store_true', help='run selected history items')
- history_parser_group.add_argument('-e', '--edit', action='store_true',
- help='edit and then run selected history items')
- history_parser_group.add_argument('-s', '--script', action='store_true', help='script format; no separation lines')
- history_parser_group.add_argument('-o', '--output-file', metavar='FILE', help='output commands to a script file')
- history_parser_group.add_argument('-t', '--transcript', help='output commands and results to a transcript file')
- _history_arg_help = """empty all history items
-a one history item by number
-a..b, a:b, a:, ..b items by indices (inclusive)
-[string] items containing string
-/regex/ items matching regular expression"""
- history_parser.add_argument('arg', nargs='?', help=_history_arg_help)
-
- @with_argparser(history_parser)
- def do_history(self, args):
- """View, run, edit, and save previously entered commands."""
- # If an argument was supplied, then retrieve partial contents of the history
- cowardly_refuse_to_run = False
- if args.arg:
- # If a character indicating a slice is present, retrieve
- # a slice of the history
- arg = args.arg
- if '..' in arg or ':' in arg:
- try:
- # Get a slice of history
- history = self.history.span(arg)
- except IndexError:
- history = self.history.get(arg)
- else:
- # Get item(s) from history by index or string search
- history = self.history.get(arg)
- else:
- # If no arg given, then retrieve the entire history
- cowardly_refuse_to_run = True
- # Get a copy of the history so it doesn't get mutated while we are using it
- history = self.history[:]
-
- if args.run:
- if cowardly_refuse_to_run:
- self.perror("Cowardly refusing to run all previously entered commands.", traceback_war=False)
- self.perror("If this is what you want to do, specify '1:' as the range of history.",
- traceback_war=False)
- else:
- for runme in history:
- self.pfeedback(runme)
- if runme:
- self.onecmd_plus_hooks(runme)
- elif args.edit:
- fd, fname = tempfile.mkstemp(suffix='.txt', text=True)
- with os.fdopen(fd, 'w') as fobj:
- for command in history:
- fobj.write('{}\n'.format(command))
- try:
- os.system('"{}" "{}"'.format(self.editor, fname))
- self.do_load(fname)
- except Exception:
- raise
- finally:
- os.remove(fname)
- elif args.output_file:
- try:
- with open(os.path.expanduser(args.output_file), 'w') as fobj:
- for command in history:
- fobj.write('{}\n'.format(command))
- plural = 's' if len(history) > 1 else ''
- self.pfeedback('{} command{} saved to {}'.format(len(history), plural, args.output_file))
- except Exception as e:
- self.perror('Saving {!r} - {}'.format(args.output_file, e), traceback_war=False)
- elif args.transcript:
- # Make sure echo is on so commands print to standard out
- saved_echo = self.echo
- self.echo = True
-
- # Redirect stdout to the transcript file
- saved_self_stdout = self.stdout
- self.stdout = open(args.transcript, 'w')
-
- # Run all of the commands in the history with output redirected to transcript and echo on
- self.runcmds_plus_hooks(history)
-
- # Restore stdout to its original state
- self.stdout.close()
- self.stdout = saved_self_stdout
-
- # Set echo back to its original state
- self.echo = saved_echo
-
- # Post-process the file to escape un-escaped "/" regex escapes
- with open(args.transcript, 'r') as fin:
- data = fin.read()
- post_processed_data = data.replace('/', '\/')
- with open(args.transcript, 'w') as fout:
- fout.write(post_processed_data)
-
- plural = 's' if len(history) > 1 else ''
- self.pfeedback('{} command{} and outputs saved to transcript file {!r}'.format(len(history), plural,
- args.transcript))
- else:
- # Display the history items retrieved
- for hi in history:
- if args.script:
- self.poutput(hi)
- else:
- self.poutput(hi.pr())
-
- @with_argument_list
- def do_edit(self, arglist):
- """Edit a file in a text editor.
-
-Usage: edit [file_path]
- Where:
- * file_path - path to a file to open in editor
-
-The editor used is determined by the ``editor`` settable parameter.
-"set editor (program-name)" to change or set the EDITOR environment variable.
-"""
- if not self.editor:
- raise EnvironmentError("Please use 'set editor' to specify your text editing program of choice.")
- filename = arglist[0] if arglist else ''
- if filename:
- os.system('"{}" "{}"'.format(self.editor, filename))
- else:
- os.system('"{}"'.format(self.editor))
-
- # Enable tab-completion for edit command
- def complete_edit(self, text, line, begidx, endidx):
- index_dict = {1: self.path_complete}
- return self.index_based_complete(text, line, begidx, endidx, index_dict)
-
- @property
- def _current_script_dir(self):
- """Accessor to get the current script directory from the _script_dir LIFO queue."""
- if self._script_dir:
- return self._script_dir[-1]
- else:
- return None
-
- @with_argument_list
- def do__relative_load(self, arglist):
- """Runs commands in script file that is encoded as either ASCII or UTF-8 text.
-
- Usage: _relative_load <file_path>
-
- optional argument:
- file_path a file path pointing to a script
-
-Script should contain one command per line, just like command would be typed in console.
-
-If this is called from within an already-running script, the filename will be interpreted
-relative to the already-running script's directory.
-
-NOTE: This command is intended to only be used within text file scripts.
- """
- # If arg is None or arg is an empty string this is an error
- if not arglist:
- self.perror('_relative_load command requires a file path:', traceback_war=False)
- return
-
- file_path = arglist[0].strip()
- # NOTE: Relative path is an absolute path, it is just relative to the current script directory
- relative_path = os.path.join(self._current_script_dir or '', file_path)
- self.do_load(relative_path)
-
- def do_eos(self, _):
- """Handles cleanup when a script has finished executing."""
- if self._script_dir:
- self._script_dir.pop()
-
- @with_argument_list
- def do_load(self, arglist):
- """Runs commands in script file that is encoded as either ASCII or UTF-8 text.
-
- Usage: load <file_path>
-
- * file_path - a file path pointing to a script
-
-Script should contain one command per line, just like command would be typed in console.
- """
- # If arg is None or arg is an empty string this is an error
- if not arglist:
- self.perror('load command requires a file path:', traceback_war=False)
- return
-
- file_path = arglist[0].strip()
- expanded_path = os.path.abspath(os.path.expanduser(file_path))
-
- # Make sure expanded_path points to a file
- if not os.path.isfile(expanded_path):
- self.perror('{} does not exist or is not a file'.format(expanded_path), traceback_war=False)
- return
-
- # Make sure the file is not empty
- if os.path.getsize(expanded_path) == 0:
- self.perror('{} is empty'.format(expanded_path), traceback_war=False)
- return
-
- # Make sure the file is ASCII or UTF-8 encoded text
- if not self.is_text_file(expanded_path):
- self.perror('{} is not an ASCII or UTF-8 encoded text file'.format(expanded_path), traceback_war=False)
- return
-
- try:
- # Read all lines of the script and insert into the head of the
- # command queue. Add an "end of script (eos)" command to cleanup the
- # self._script_dir list when done.
- with open(expanded_path, encoding='utf-8') as target:
- self.cmdqueue = target.read().splitlines() + ['eos'] + self.cmdqueue
- except IOError as e:
- self.perror('Problem accessing script from {}:\n{}'.format(expanded_path, e))
- return
-
- self._script_dir.append(os.path.dirname(expanded_path))
-
- # Enable tab-completion for load command
- def complete_load(self, text, line, begidx, endidx):
- index_dict = {1: self.path_complete}
- return self.index_based_complete(text, line, begidx, endidx, index_dict)
-
- @staticmethod
- def is_text_file(file_path):
- """
- Returns if a file contains only ASCII or UTF-8 encoded text
- :param file_path: path to the file being checked
- """
- expanded_path = os.path.abspath(os.path.expanduser(file_path.strip()))
- valid_text_file = False
-
- # Check if the file is ASCII
- try:
- with codecs.open(expanded_path, encoding='ascii', errors='strict') as f:
- # Make sure the file has at least one line of text
- # noinspection PyUnusedLocal
- if sum(1 for line in f) > 0:
- valid_text_file = True
- except IOError:
- pass
- except UnicodeDecodeError:
- # The file is not ASCII. Check if it is UTF-8.
- try:
- with codecs.open(expanded_path, encoding='utf-8', errors='strict') as f:
- # Make sure the file has at least one line of text
- # noinspection PyUnusedLocal
- if sum(1 for line in f) > 0:
- valid_text_file = True
- except IOError:
- pass
- except UnicodeDecodeError:
- # Not UTF-8
- pass
-
- return valid_text_file
-
- def run_transcript_tests(self, callargs):
- """Runs transcript tests for provided file(s).
-
- This is called when either -t is provided on the command line or the transcript_files argument is provided
- during construction of the cmd2.Cmd instance.
-
- :param callargs: List[str] - list of transcript test file names
- """
- class TestMyAppCase(Cmd2TestCase):
- cmdapp = self
-
- self.__class__.testfiles = callargs
- sys.argv = [sys.argv[0]] # the --test argument upsets unittest.main()
- testcase = TestMyAppCase()
- runner = unittest.TextTestRunner()
- runner.run(testcase)
-
- def cmdloop(self, intro=None):
- """This is an outer wrapper around _cmdloop() which deals with extra features provided by cmd2.
-
- _cmdloop() provides the main loop equivalent to cmd.cmdloop(). This is a wrapper around that which deals with
- the following extra features provided by cmd2:
- - commands at invocation
- - transcript testing
- - intro banner
-
- :param intro: str - if provided this overrides self.intro and serves as the intro banner printed once at start
- """
- if self.allow_cli_args:
- parser = argparse.ArgumentParser()
- parser.add_argument('-t', '--test', action="store_true",
- help='Test against transcript(s) in FILE (wildcards OK)')
- callopts, callargs = parser.parse_known_args()
-
- # If transcript testing was called for, use other arguments as transcript files
- if callopts.test:
- self._transcript_files = callargs
-
- # If commands were supplied at invocation, then add them to the command queue
- if callargs:
- self.cmdqueue.extend(callargs)
-
- # Always run the preloop first
- self.preloop()
-
- # If transcript-based regression testing was requested, then do that instead of the main loop
- if self._transcript_files is not None:
- self.run_transcript_tests(self._transcript_files)
- else:
- # If an intro was supplied in the method call, allow it to override the default
- if intro is not None:
- self.intro = intro
-
- # Print the intro, if there is one, right after the preloop
- if self.intro is not None:
- self.poutput(str(self.intro) + "\n")
-
- # And then call _cmdloop() to enter the main loop
- self._cmdloop()
-
- # Run the postloop() no matter what
- self.postloop()
-
-
-# noinspection PyPep8Naming
-class ParserManager:
- """
- Class which encapsulates all of the pyparsing parser functionality for cmd2 in a single location.
- """
- def __init__(self, redirector, terminators, multilineCommands, legalChars, commentGrammars, commentInProgress,
- blankLinesAllowed, prefixParser, preparse, postparse, aliases, shortcuts):
- """Creates and uses parsers for user input according to app's parameters."""
-
- self.commentGrammars = commentGrammars
- self.preparse = preparse
- self.postparse = postparse
- self.aliases = aliases
- self.shortcuts = shortcuts
-
- self.main_parser = self._build_main_parser(redirector=redirector, terminators=terminators,
- multilineCommands=multilineCommands, legalChars=legalChars,
- commentInProgress=commentInProgress,
- blankLinesAllowed=blankLinesAllowed, prefixParser=prefixParser)
- self.input_source_parser = self._build_input_source_parser(legalChars=legalChars,
- commentInProgress=commentInProgress)
-
- def _build_main_parser(self, redirector, terminators, multilineCommands, legalChars, commentInProgress,
- blankLinesAllowed, prefixParser):
- """Builds a PyParsing parser for interpreting user commands."""
-
- # Build several parsing components that are eventually compiled into overall parser
- output_destination_parser = (pyparsing.Literal(redirector * 2) |
- (pyparsing.WordStart() + redirector) |
- pyparsing.Regex('[^=]' + redirector))('output')
-
- terminator_parser = pyparsing.Or(
- [(hasattr(t, 'parseString') and t) or pyparsing.Literal(t) for t in terminators])('terminator')
- string_end = pyparsing.stringEnd ^ '\nEOF'
- multilineCommand = pyparsing.Or(
- [pyparsing.Keyword(c, caseless=False) for c in multilineCommands])('multilineCommand')
- oneline_command = (~multilineCommand + pyparsing.Word(legalChars))('command')
- pipe = pyparsing.Keyword('|', identChars='|')
- do_not_parse = self.commentGrammars | commentInProgress | pyparsing.quotedString
- after_elements = \
- pyparsing.Optional(pipe + pyparsing.SkipTo(output_destination_parser ^ string_end,
- ignore=do_not_parse)('pipeTo')) + \
- pyparsing.Optional(output_destination_parser +
- pyparsing.SkipTo(string_end, ignore=do_not_parse).
- setParseAction(lambda x: strip_quotes(x[0].strip()))('outputTo'))
-
- multilineCommand.setParseAction(lambda x: x[0])
- oneline_command.setParseAction(lambda x: x[0])
-
- if blankLinesAllowed:
- blankLineTerminationParser = pyparsing.NoMatch
- else:
- blankLineTerminator = (pyparsing.lineEnd + pyparsing.lineEnd)('terminator')
- blankLineTerminator.setResultsName('terminator')
- blankLineTerminationParser = ((multilineCommand ^ oneline_command) +
- pyparsing.SkipTo(blankLineTerminator, ignore=do_not_parse).setParseAction(
- lambda x: x[0].strip())('args') + blankLineTerminator)('statement')
-
- multilineParser = (((multilineCommand ^ oneline_command) +
- pyparsing.SkipTo(terminator_parser,
- ignore=do_not_parse).setParseAction(lambda x: x[0].strip())('args') +
- terminator_parser)('statement') +
- pyparsing.SkipTo(output_destination_parser ^ pipe ^ string_end,
- ignore=do_not_parse).setParseAction(lambda x: x[0].strip())('suffix') +
- after_elements)
- multilineParser.ignore(commentInProgress)
-
- singleLineParser = ((oneline_command +
- pyparsing.SkipTo(terminator_parser ^ string_end ^ pipe ^ output_destination_parser,
- ignore=do_not_parse).setParseAction(
- lambda x: x[0].strip())('args'))('statement') +
- pyparsing.Optional(terminator_parser) + after_elements)
-
- blankLineTerminationParser = blankLineTerminationParser.setResultsName('statement')
-
- parser = prefixParser + (
- string_end |
- multilineParser |
- singleLineParser |
- blankLineTerminationParser |
- multilineCommand + pyparsing.SkipTo(string_end, ignore=do_not_parse)
- )
- parser.ignore(self.commentGrammars)
- return parser
-
- @staticmethod
- def _build_input_source_parser(legalChars, commentInProgress):
- """Builds a PyParsing parser for alternate user input sources (from file, pipe, etc.)"""
-
- input_mark = pyparsing.Literal('<')
- input_mark.setParseAction(lambda x: '')
-
- # Also allow spaces, slashes, and quotes
- file_name = pyparsing.Word(legalChars + ' /\\"\'')
-
- input_from = file_name('inputFrom')
- input_from.setParseAction(replace_with_file_contents)
- # a not-entirely-satisfactory way of distinguishing < as in "import from" from <
- # as in "lesser than"
- inputParser = input_mark + pyparsing.Optional(input_from) + pyparsing.Optional('>') + \
- pyparsing.Optional(file_name) + (pyparsing.stringEnd | '|')
- inputParser.ignore(commentInProgress)
- return inputParser
-
- def parsed(self, raw):
- """ This function is where the actual parsing of each line occurs.
-
- :param raw: str - the line of text as it was entered
- :return: ParsedString - custom subclass of str with extra attributes
- """
- if isinstance(raw, ParsedString):
- p = raw
- else:
- # preparse is an overridable hook; default makes no changes
- s = self.preparse(raw)
- s = self.input_source_parser.transformString(s.lstrip())
- s = self.commentGrammars.transformString(s)
-
- # Make a copy of aliases so we can edit it
- tmp_aliases = list(self.aliases.keys())
- keep_expanding = len(tmp_aliases) > 0
-
- # Expand aliases
- while keep_expanding:
- for cur_alias in tmp_aliases:
- keep_expanding = False
-
- if s == cur_alias or s.startswith(cur_alias + ' '):
- s = s.replace(cur_alias, self.aliases[cur_alias], 1)
-
- # Do not expand the same alias more than once
- tmp_aliases.remove(cur_alias)
- keep_expanding = len(tmp_aliases) > 0
- break
-
- # Expand command shortcut to its full command name
- for (shortcut, expansion) in self.shortcuts:
- if s.startswith(shortcut):
- # If the next character after the shortcut isn't a space, then insert one
- shortcut_len = len(shortcut)
- if len(s) == shortcut_len or s[shortcut_len] != ' ':
- expansion += ' '
-
- # Expand the shortcut
- s = s.replace(shortcut, expansion, 1)
- break
-
- try:
- result = self.main_parser.parseString(s)
- except pyparsing.ParseException:
- # If we have a parsing failure, treat it is an empty command and move to next prompt
- result = self.main_parser.parseString('')
- result['raw'] = raw
- result['command'] = result.multilineCommand or result.command
- result = self.postparse(result)
- p = ParsedString(result.args)
- p.parsed = result
- p.parser = self.parsed
- return p
-
-
-class HistoryItem(str):
- """Class used to represent an item in the History list.
-
- Thin wrapper around str class which adds a custom format for printing. It
- also keeps track of its index in the list as well as a lowercase
- representation of itself for convenience/efficiency.
-
- """
- listformat = '-------------------------[{}]\n{}\n'
-
- # noinspection PyUnusedLocal
- def __init__(self, instr):
- str.__init__(self)
- self.lowercase = self.lower()
- self.idx = None
-
- def pr(self):
- """Represent a HistoryItem in a pretty fashion suitable for printing.
-
- :return: str - pretty print string version of a HistoryItem
- """
- return self.listformat.format(self.idx, str(self).rstrip())
-
-
-class History(list):
- """ A list of HistoryItems that knows how to respond to user requests. """
-
- # noinspection PyMethodMayBeStatic
- def _zero_based_index(self, onebased):
- result = onebased
- if result > 0:
- result -= 1
- return result
-
- def _to_index(self, raw):
- if raw:
- result = self._zero_based_index(int(raw))
- else:
- result = None
- return result
-
- spanpattern = re.compile(r'^\s*(?P<start>-?\d+)?\s*(?P<separator>:|(\.{2,}))?\s*(?P<end>-?\d+)?\s*$')
-
- def span(self, raw):
- """Parses the input string search for a span pattern and if if found, returns a slice from the History list.
-
- :param raw: str - string potentially containing a span of the forms a..b, a:b, a:, ..b
- :return: List[HistoryItem] - slice from the History list
- """
- if raw.lower() in ('*', '-', 'all'):
- raw = ':'
- results = self.spanpattern.search(raw)
- if not results:
- raise IndexError
- if not results.group('separator'):
- return [self[self._to_index(results.group('start'))]]
- start = self._to_index(results.group('start')) or 0 # Ensure start is not None
- end = self._to_index(results.group('end'))
- reverse = False
- if end is not None:
- if end < start:
- (start, end) = (end, start)
- reverse = True
- end += 1
- result = self[start:end]
- if reverse:
- result.reverse()
- return result
-
- rangePattern = re.compile(r'^\s*(?P<start>[\d]+)?\s*-\s*(?P<end>[\d]+)?\s*$')
-
- def append(self, new):
- """Append a HistoryItem to end of the History list
-
- :param new: str - command line to convert to HistoryItem and add to the end of the History list
- """
- new = HistoryItem(new)
- list.append(self, new)
- new.idx = len(self)
-
- def get(self, getme=None):
- """Get an item or items from the History list using 1-based indexing.
-
- :param getme: int or str - item(s) to get - either an integer index or string to search for
- :return: List[str] - list of HistoryItems matching the retrieval criteria
- """
- if not getme:
- return self
- try:
- getme = int(getme)
- if getme < 0:
- return self[:(-1 * getme)]
- else:
- return [self[getme - 1]]
- except IndexError:
- return []
- except ValueError:
- range_result = self.rangePattern.search(getme)
- if range_result:
- start = range_result.group('start') or None
- end = range_result.group('start') or None
- if start:
- start = int(start) - 1
- if end:
- end = int(end)
- return self[start:end]
-
- # noinspection PyUnresolvedReferences
- getme = getme.strip()
-
- if getme.startswith(r'/') and getme.endswith(r'/'):
- finder = re.compile(getme[1:-1], re.DOTALL | re.MULTILINE | re.IGNORECASE)
-
- def isin(hi):
- """Listcomp filter function for doing a regular expression search of History.
-
- :param hi: HistoryItem
- :return: bool - True if search matches
- """
- return finder.search(hi)
- else:
- def isin(hi):
- """Listcomp filter function for doing a case-insensitive string search of History.
-
- :param hi: HistoryItem
- :return: bool - True if search matches
- """
- return getme.lower() in hi.lowercase
- return [itm for itm in self if isin(itm)]
-
-
-def cast(current, new):
- """Tries to force a new value into the same type as the current when trying to set the value for a parameter.
-
- :param current: current value for the parameter, type varies
- :param new: str - new value
- :return: new value with same type as current, or the current value if there was an error casting
- """
- typ = type(current)
- if typ == bool:
- try:
- return bool(int(new))
- except (ValueError, TypeError):
- pass
- try:
- new = new.lower()
- except AttributeError:
- pass
- if (new == 'on') or (new[0] in ('y', 't')):
- return True
- if (new == 'off') or (new[0] in ('n', 'f')):
- return False
- else:
- try:
- return typ(new)
- except (ValueError, TypeError):
- pass
- print("Problem setting parameter (now %s) to %s; incorrect type?" % (current, new))
- return current
-
-
-class Statekeeper(object):
- """Class used to save and restore state during load and py commands as well as when redirecting output or pipes."""
- def __init__(self, obj, attribs):
- """Use the instance attributes as a generic key-value store to copy instance attributes from outer object.
-
- :param obj: instance of cmd2.Cmd derived class (your application instance)
- :param attribs: Tuple[str] - tuple of strings listing attributes of obj to save a copy of
- """
- self.obj = obj
- self.attribs = attribs
- if self.obj:
- self._save()
-
- def _save(self):
- """Create copies of attributes from self.obj inside this Statekeeper instance."""
- for attrib in self.attribs:
- setattr(self, attrib, getattr(self.obj, attrib))
-
- def restore(self):
- """Overwrite attributes in self.obj with the saved values stored in this Statekeeper instance."""
- if self.obj:
- for attrib in self.attribs:
- setattr(self.obj, attrib, getattr(self, attrib))
-
-
-class OutputTrap(object):
- """Instantiate an OutputTrap to divert/capture ALL stdout output. For use in transcript testing."""
-
- def __init__(self):
- self.contents = ''
-
- def write(self, txt):
- """Add text to the internal contents.
-
- :param txt: str
- """
- self.contents += txt
-
- def read(self):
- """Read from the internal contents and then clear them out.
-
- :return: str - text from the internal contents
- """
- result = self.contents
- self.contents = ''
- return result
-
-
-class Cmd2TestCase(unittest.TestCase):
- """Subclass this, setting CmdApp, to make a unittest.TestCase class
- that will execute the commands in a transcript file and expect the results shown.
- See example.py"""
- cmdapp = None
-
- def fetchTranscripts(self):
- self.transcripts = {}
- for fileset in self.cmdapp.testfiles:
- for fname in glob.glob(fileset):
- tfile = open(fname)
- self.transcripts[fname] = iter(tfile.readlines())
- tfile.close()
- if not len(self.transcripts):
- raise Exception("No test files found - nothing to test.")
-
- def setUp(self):
- if self.cmdapp:
- self.fetchTranscripts()
-
- # Trap stdout
- self._orig_stdout = self.cmdapp.stdout
- self.cmdapp.stdout = OutputTrap()
-
- def runTest(self): # was testall
- if self.cmdapp:
- its = sorted(self.transcripts.items())
- for (fname, transcript) in its:
- self._test_transcript(fname, transcript)
-
- def _test_transcript(self, fname, transcript):
- line_num = 0
- finished = False
- line = strip_ansi(next(transcript))
- line_num += 1
- while not finished:
- # Scroll forward to where actual commands begin
- while not line.startswith(self.cmdapp.visible_prompt):
- try:
- line = strip_ansi(next(transcript))
- except StopIteration:
- finished = True
- break
- line_num += 1
- command = [line[len(self.cmdapp.visible_prompt):]]
- line = next(transcript)
- # Read the entirety of a multi-line command
- while line.startswith(self.cmdapp.continuation_prompt):
- command.append(line[len(self.cmdapp.continuation_prompt):])
- try:
- line = next(transcript)
- except StopIteration:
- raise (StopIteration,
- 'Transcript broke off while reading command beginning at line {} with\n{}'.format(line_num,
- command[0])
- )
- line_num += 1
- command = ''.join(command)
- # Send the command into the application and capture the resulting output
- # TODO: Should we get the return value and act if stop == True?
- self.cmdapp.onecmd_plus_hooks(command)
- result = self.cmdapp.stdout.read()
- # Read the expected result from transcript
- if strip_ansi(line).startswith(self.cmdapp.visible_prompt):
- message = '\nFile {}, line {}\nCommand was:\n{}\nExpected: (nothing)\nGot:\n{}\n'.format(
- fname, line_num, command, result)
- self.assert_(not (result.strip()), message)
- continue
- expected = []
- while not strip_ansi(line).startswith(self.cmdapp.visible_prompt):
- expected.append(line)
- try:
- line = next(transcript)
- except StopIteration:
- finished = True
- break
- line_num += 1
- expected = ''.join(expected)
-
- # transform the expected text into a valid regular expression
- expected = self._transform_transcript_expected(expected)
- message = '\nFile {}, line {}\nCommand was:\n{}\nExpected:\n{}\nGot:\n{}\n'.format(
- fname, line_num, command, expected, result)
- self.assertTrue(re.match(expected, result, re.MULTILINE | re.DOTALL), message)
-
- def _transform_transcript_expected(self, s):
- """parse the string with slashed regexes into a valid regex
-
- Given a string like:
-
- Match a 10 digit phone number: /\d{3}-\d{3}-\d{4}/
-
- Turn it into a valid regular expression which matches the literal text
- of the string and the regular expression. We have to remove the slashes
- because they differentiate between plain text and a regular expression.
- Unless the slashes are escaped, in which case they are interpreted as
- plain text, or there is only one slash, which is treated as plain text
- also.
-
- Check the tests in tests/test_transcript.py to see all the edge
- cases.
- """
- regex = ''
- start = 0
-
- while True:
- (regex, first_slash_pos, start) = self._escaped_find(regex, s, start, False)
- if first_slash_pos == -1:
- # no more slashes, add the rest of the string and bail
- regex += re.escape(s[start:])
- break
- else:
- # there is a slash, add everything we have found so far
- # add stuff before the first slash as plain text
- regex += re.escape(s[start:first_slash_pos])
- start = first_slash_pos+1
- # and go find the next one
- (regex, second_slash_pos, start) = self._escaped_find(regex, s, start, True)
- if second_slash_pos > 0:
- # add everything between the slashes (but not the slashes)
- # as a regular expression
- regex += s[start:second_slash_pos]
- # and change where we start looking for slashed on the
- # turn through the loop
- start = second_slash_pos + 1
- else:
- # No closing slash, we have to add the first slash,
- # and the rest of the text
- regex += re.escape(s[start-1:])
- break
- return regex
-
- @staticmethod
- def _escaped_find(regex, s, start, in_regex):
- """
- Find the next slash in {s} after {start} that is not preceded by a backslash.
-
- If we find an escaped slash, add everything up to and including it to regex,
- updating {start}. {start} therefore serves two purposes, tells us where to start
- looking for the next thing, and also tells us where in {s} we have already
- added things to {regex}
-
- {in_regex} specifies whether we are currently searching in a regex, we behave
- differently if we are or if we aren't.
- """
-
- while True:
- pos = s.find('/', start)
- if pos == -1:
- # no match, return to caller
- break
- elif pos == 0:
- # slash at the beginning of the string, so it can't be
- # escaped. We found it.
- break
- else:
- # check if the slash is preceeded by a backslash
- if s[pos-1:pos] == '\\':
- # it is.
- if in_regex:
- # add everything up to the backslash as a
- # regular expression
- regex += s[start:pos-1]
- # skip the backslash, and add the slash
- regex += s[pos]
- else:
- # add everything up to the backslash as escaped
- # plain text
- regex += re.escape(s[start:pos-1])
- # and then add the slash as escaped
- # plain text
- regex += re.escape(s[pos])
- # update start to show we have handled everything
- # before it
- start = pos+1
- # and continue to look
- else:
- # slash is not escaped, this is what we are looking for
- break
- return regex, pos, start
-
- def tearDown(self):
- if self.cmdapp:
- # Restore stdout
- self.cmdapp.stdout = self._orig_stdout
-
-
-def namedtuple_with_two_defaults(typename, field_names, default_values=('', '')):
- """Wrapper around namedtuple which lets you treat the last value as optional.
-
- :param typename: str - type name for the Named tuple
- :param field_names: List[str] or space-separated string of field names
- :param default_values: (optional) 2-element tuple containing the default values for last 2 parameters in named tuple
- Defaults to an empty string for both of them
- :return: namedtuple type
- """
- T = collections.namedtuple(typename, field_names)
- # noinspection PyUnresolvedReferences
- T.__new__.__defaults__ = default_values
- return T
-
-
-class CmdResult(namedtuple_with_two_defaults('CmdResult', ['out', 'err', 'war'])):
- """Derive a class to store results from a named tuple so we can tweak dunder methods for convenience.
-
- This is provided as a convenience and an example for one possible way for end users to store results in
- the self._last_result attribute of cmd2.Cmd class instances. See the "python_scripting.py" example for how it can
- be used to enable conditional control flow.
-
- Named tuple attributes
- ----------------------
- out - this is intended to store normal output data from the command and can be of any type that makes sense
- err: str - (optional) this is intended to store an error message and it being non-empty indicates there was an error
- Defaults to an empty string
- war: str - (optional) this is intended to store a warning message which isn't quite an error, but of note
- Defaults to an empty string.
-
- NOTE: Named tuples are immutable. So the contents are there for access, not for modification.
- """
- def __bool__(self):
- """If err is an empty string, treat the result as a success; otherwise treat it as a failure."""
- return not self.err
-
-
-if __name__ == '__main__':
- # If run as the main application, simply start a bare-bones cmd2 application with only built-in functionality.
-
- # Set "use_ipython" to True to include the ipy command if IPython is installed, which supports advanced interactive
- # debugging of your application via introspection on self.
- app = Cmd(use_ipython=False)
- app.cmdloop()