diff options
Diffstat (limited to 'cmd2.py')
-rwxr-xr-x | cmd2.py | 4069 |
1 files changed, 0 insertions, 4069 deletions
diff --git a/cmd2.py b/cmd2.py deleted file mode 100755 index 54eff811..00000000 --- a/cmd2.py +++ /dev/null @@ -1,4069 +0,0 @@ -#!/usr/bin/env python -# coding=utf-8 -"""Variant on standard library's cmd with extra features. - -To use, simply import cmd2.Cmd instead of cmd.Cmd; use precisely as though you -were using the standard library's cmd, while enjoying the extra features. - -Searchable command history (commands: "history") -Load commands from file, save to file, edit commands in file -Multi-line commands -Special-character shortcut commands (beyond cmd's "@" and "!") -Settable environment parameters -Parsing commands with `argparse` argument parsers (flags) -Redirection to file with >, >>; input from file with < -Easy transcript-based testing of applications (see examples/example.py) -Bash-style ``select`` available - -Note that redirection with > and | will only work if `self.poutput()` -is used in place of `print`. - -- Catherine Devlin, Jan 03 2008 - catherinedevlin.blogspot.com - -Git repository on GitHub at https://github.com/python-cmd2/cmd2 -""" -import argparse -import atexit -import cmd -import codecs -import collections -import copy -import datetime -import functools -import glob -import io -from io import StringIO -import os -import platform -import re -import shlex -import signal -import subprocess -import sys -import tempfile -import traceback -import unittest -from code import InteractiveConsole - -try: - from enum34 import Enum -except ImportError: - from enum import Enum - -import pyparsing -import pyperclip - -# Newer versions of pyperclip are released as a single file, but older versions had a more complicated structure -try: - from pyperclip.exceptions import PyperclipException -except ImportError: - # noinspection PyUnresolvedReferences - from pyperclip import PyperclipException - -# Collection is a container that is sizable and iterable -# It was introduced in Python 3.6. We will try to import it, otherwise use our implementation -try: - from collections.abc import Collection, Iterable -except ImportError: - from collections.abc import Sized, Iterable, Container - - # noinspection PyAbstractClass - class Collection(Sized, Iterable, Container): - - __slots__ = () - - # noinspection PyPep8Naming - @classmethod - def __subclasshook__(cls, C): - if cls is Collection: - if any("__len__" in B.__dict__ for B in C.__mro__) and \ - any("__iter__" in B.__dict__ for B in C.__mro__) and \ - any("__contains__" in B.__dict__ for B in C.__mro__): - return True - return NotImplemented - -# Python 3.4 require contextlib2 for temporarily redirecting stderr and stdout -if sys.version_info < (3, 5): - from contextlib2 import redirect_stdout, redirect_stderr -else: - from contextlib import redirect_stdout, redirect_stderr - -# Detect whether IPython is installed to determine if the built-in "ipy" command should be included -ipython_available = True -try: - # noinspection PyUnresolvedReferences,PyPackageRequirements - from IPython import embed -except ImportError: - ipython_available = False - -# Prefer statically linked gnureadline if available (for macOS compatibility due to issues with libedit) -try: - import gnureadline as readline -except ImportError: - # Try to import readline, but allow failure for convenience in Windows unit testing - # Note: If this actually fails, you should install readline on Linux or Mac or pyreadline on Windows - try: - # noinspection PyUnresolvedReferences - import readline - except ImportError: - pass - -# Check what implementation of readline we are using -class RlType(Enum): - GNU = 1 - PYREADLINE = 2 - NONE = 3 - -rl_type = RlType.NONE - -if 'pyreadline' in sys.modules: - rl_type = RlType.PYREADLINE - - # Save the original pyreadline display completion function since we need to override it and restore it - # noinspection PyProtectedMember - orig_pyreadline_display = readline.rl.mode._display_completions - -elif 'gnureadline' in sys.modules or 'readline' in sys.modules: - rl_type = RlType.GNU - - # We need wcswidth to calculate display width of tab completions - from wcwidth import wcswidth - - # Load the readline lib so we can make changes to it - import ctypes - readline_lib = ctypes.CDLL(readline.__file__) - - # Save address that rl_basic_quote_characters is pointing to since we need to override and restore it - rl_basic_quote_characters = ctypes.c_char_p.in_dll(readline_lib, "rl_basic_quote_characters") - orig_rl_basic_quote_characters_addr = ctypes.cast(rl_basic_quote_characters, ctypes.c_void_p).value - -__version__ = '0.9.0' - -# Pyparsing enablePackrat() can greatly speed up parsing, but problems have been seen in Python 3 in the past -pyparsing.ParserElement.enablePackrat() - -# Override the default whitespace chars in Pyparsing so that newlines are not treated as whitespace -pyparsing.ParserElement.setDefaultWhitespaceChars(' \t') - - -# The next 2 variables and associated setter functions effect how arguments are parsed for decorated commands -# which use one of the decorators: @with_argument_list, @with_argparser, or @with_argparser_and_unknown_args -# The defaults are sane and maximize ease of use for new applications based on cmd2. - -# Use POSIX or Non-POSIX (Windows) rules for splitting a command-line string into a list of arguments via shlex.split() -POSIX_SHLEX = False - -# Strip outer quotes for convenience if POSIX_SHLEX = False -STRIP_QUOTES_FOR_NON_POSIX = True - -# Used for tab completion and word breaks. Do not change. -QUOTES = ['"', "'"] -REDIRECTION_CHARS = ['|', '<', '>'] - -# optional attribute, when tagged on a function, allows cmd2 to categorize commands -HELP_CATEGORY = 'help_category' -HELP_SUMMARY = 'help_summary' - - -def categorize(func, category): - """Categorize a function. - - The help command output will group this function under the specified category heading - - :param func: Union[Callable, Iterable] - function to categorize - :param category: str - category to put it in - """ - if isinstance(func, Iterable): - for item in func: - setattr(item, HELP_CATEGORY, category) - else: - setattr(func, HELP_CATEGORY, category) - - -def set_posix_shlex(val): - """ Allows user of cmd2 to choose between POSIX and non-POSIX splitting of args for decorated commands. - - :param val: bool - True => POSIX, False => Non-POSIX - """ - global POSIX_SHLEX - POSIX_SHLEX = val - - -def set_strip_quotes(val): - """ Allows user of cmd2 to choose whether to automatically strip outer-quotes when POSIX_SHLEX is False. - - :param val: bool - True => strip quotes on args for decorated commands if POSIX_SHLEX is False. - """ - global STRIP_QUOTES_FOR_NON_POSIX - STRIP_QUOTES_FOR_NON_POSIX = val - - -def _which(editor): - try: - editor_path = subprocess.check_output(['which', editor], stderr=subprocess.STDOUT).strip() - editor_path = editor_path.decode() - except subprocess.CalledProcessError: - editor_path = None - return editor_path - - -def strip_quotes(arg): - """ Strip outer quotes from a string. - - Applies to both single and double quotes. - - :param arg: str - string to strip outer quotes from - :return str - same string with potentially outer quotes stripped - """ - quote_chars = '"' + "'" - - if len(arg) > 1 and arg[0] == arg[-1] and arg[0] in quote_chars: - arg = arg[1:-1] - return arg - - -def parse_quoted_string(cmdline): - """Parse a quoted string into a list of arguments.""" - if isinstance(cmdline, list): - # arguments are already a list, return the list we were passed - lexed_arglist = cmdline - else: - # Use shlex to split the command line into a list of arguments based on shell rules - lexed_arglist = shlex.split(cmdline, posix=POSIX_SHLEX) - # If not using POSIX shlex, make sure to strip off outer quotes for convenience - if not POSIX_SHLEX and STRIP_QUOTES_FOR_NON_POSIX: - temp_arglist = [] - for arg in lexed_arglist: - temp_arglist.append(strip_quotes(arg)) - lexed_arglist = temp_arglist - return lexed_arglist - - -def with_category(category): - """A decorator to apply a category to a command function""" - def cat_decorator(func): - categorize(func, category) - return func - return cat_decorator - - -def with_argument_list(func): - """A decorator to alter the arguments passed to a do_* cmd2 - method. Default passes a string of whatever the user typed. - With this decorator, the decorated method will receive a list - of arguments parsed from user input using shlex.split().""" - @functools.wraps(func) - def cmd_wrapper(self, cmdline): - lexed_arglist = parse_quoted_string(cmdline) - return func(self, lexed_arglist) - - cmd_wrapper.__doc__ = func.__doc__ - return cmd_wrapper - - -def with_argparser_and_unknown_args(argparser): - """A decorator to alter a cmd2 method to populate its ``args`` argument by parsing arguments with the given - instance of argparse.ArgumentParser, but also returning unknown args as a list. - - :param argparser: argparse.ArgumentParser - given instance of ArgumentParser - :return: function that gets passed parsed args and a list of unknown args - """ - - # noinspection PyProtectedMember - def arg_decorator(func): - @functools.wraps(func) - def cmd_wrapper(instance, cmdline): - lexed_arglist = parse_quoted_string(cmdline) - args, unknown = argparser.parse_known_args(lexed_arglist) - return func(instance, args, unknown) - - # argparser defaults the program name to sys.argv[0] - # we want it to be the name of our command - argparser.prog = func.__name__[3:] - - # If the description has not been set, then use the method docstring if one exists - if argparser.description is None and func.__doc__: - argparser.description = func.__doc__ - - if func.__doc__: - setattr(cmd_wrapper, HELP_SUMMARY, func.__doc__) - - cmd_wrapper.__doc__ = argparser.format_help() - - # Mark this function as having an argparse ArgumentParser (used by do_help) - cmd_wrapper.__dict__['has_parser'] = True - - # If there are subcommands, store their names in a list to support tab-completion of subcommand names - if argparser._subparsers is not None: - subcommand_names = argparser._subparsers._group_actions[0]._name_parser_map.keys() - cmd_wrapper.__dict__['subcommand_names'] = subcommand_names - - return cmd_wrapper - - return arg_decorator - - -def with_argparser(argparser): - """A decorator to alter a cmd2 method to populate its ``args`` argument by parsing arguments - with the given instance of argparse.ArgumentParser. - - :param argparser: argparse.ArgumentParser - given instance of ArgumentParser - :return: function that gets passed parsed args - """ - - # noinspection PyProtectedMember - def arg_decorator(func): - @functools.wraps(func) - def cmd_wrapper(instance, cmdline): - lexed_arglist = parse_quoted_string(cmdline) - args = argparser.parse_args(lexed_arglist) - return func(instance, args) - - # argparser defaults the program name to sys.argv[0] - # we want it to be the name of our command - argparser.prog = func.__name__[3:] - - # If the description has not been set, then use the method docstring if one exists - if argparser.description is None and func.__doc__: - argparser.description = func.__doc__ - - if func.__doc__: - setattr(cmd_wrapper, HELP_SUMMARY, func.__doc__) - - cmd_wrapper.__doc__ = argparser.format_help() - - # Mark this function as having an argparse ArgumentParser (used by do_help) - cmd_wrapper.__dict__['has_parser'] = True - - # If there are subcommands, store their names in a list to support tab-completion of subcommand names - if argparser._subparsers is not None: - - # Key is subcommand name and value is completer function - subcommands = collections.OrderedDict() - - # Get all subcommands and check if they have completer functions - for name, parser in argparser._subparsers._group_actions[0]._name_parser_map.items(): - if 'completer' in parser._defaults: - completer = parser._defaults['completer'] - else: - completer = None - subcommands[name] = completer - - cmd_wrapper.__dict__['subcommands'] = subcommands - - return cmd_wrapper - - return arg_decorator - - -# Can we access the clipboard? Should always be true on Windows and Mac, but only sometimes on Linux -# noinspection PyUnresolvedReferences -try: - # Get the version of the pyperclip module as a float - pyperclip_ver = float('.'.join(pyperclip.__version__.split('.')[:2])) - - # The extraneous output bug in pyperclip on Linux using xclip was fixed in more recent versions of pyperclip - if sys.platform.startswith('linux') and pyperclip_ver < 1.6: - # Avoid extraneous output to stderr from xclip when clipboard is empty at cost of overwriting clipboard contents - pyperclip.copy('') - else: - # Try getting the contents of the clipboard - _ = pyperclip.paste() -except PyperclipException: - can_clip = False -else: - can_clip = True - - -def get_paste_buffer(): - """Get the contents of the clipboard / paste buffer. - - :return: str - contents of the clipboard - """ - pb_str = pyperclip.paste() - return pb_str - - -def write_to_paste_buffer(txt): - """Copy text to the clipboard / paste buffer. - - :param txt: str - text to copy to the clipboard - """ - pyperclip.copy(txt) - - -class ParsedString(str): - """Subclass of str which also stores a pyparsing.ParseResults object containing structured parse results.""" - # pyarsing.ParseResults - structured parse results, to provide multiple means of access to the parsed data - parsed = None - - # Function which did the parsing - parser = None - - def full_parsed_statement(self): - """Used to reconstruct the full parsed statement when a command isn't recognized.""" - new = ParsedString('%s %s' % (self.parsed.command, self.parsed.args)) - new.parsed = self.parsed - new.parser = self.parser - return new - - -def replace_with_file_contents(fname): - """Action to perform when successfully matching parse element definition for inputFrom parser. - - :param fname: str - filename - :return: str - contents of file "fname" - """ - try: - # Any outer quotes are not part of the filename - unquoted_file = strip_quotes(fname[0]) - with open(os.path.expanduser(unquoted_file)) as source_file: - result = source_file.read() - except IOError: - result = '< %s' % fname[0] # wasn't a file after all - - # TODO: IF pyparsing input parser logic gets fixed to support empty file, add support to get from paste buffer - return result - - -class EmbeddedConsoleExit(SystemExit): - """Custom exception class for use with the py command.""" - pass - - -class EmptyStatement(Exception): - """Custom exception class for handling behavior when the user just presses <Enter>.""" - pass - - -# Regular expression to match ANSI escape codes -ANSI_ESCAPE_RE = re.compile(r'\x1b[^m]*m') - - -def strip_ansi(text): - """Strip ANSI escape codes from a string. - - :param text: str - a string which may contain ANSI escape codes - :return: str - the same string with any ANSI escape codes removed - """ - return ANSI_ESCAPE_RE.sub('', text) - - -def _pop_readline_history(clear_history=True): - """Returns a copy of readline's history and optionally clears it (default)""" - # noinspection PyArgumentList - history = [ - readline.get_history_item(i) - for i in range(1, 1 + readline.get_current_history_length()) - ] - if clear_history: - readline.clear_history() - return history - - -def _push_readline_history(history, clear_history=True): - """Restores readline's history and optionally clears it first (default)""" - if clear_history: - readline.clear_history() - for line in history: - readline.add_history(line) - - -def _complete_from_cmd(cmd_obj, text, line, begidx, endidx): - """Complete as though the user was typing inside cmd's cmdloop()""" - from itertools import takewhile - command_subcommand_params = line.split(None, 3) - - if len(command_subcommand_params) < (3 if text else 2): - n = len(command_subcommand_params[0]) - n += sum(1 for _ in takewhile(str.isspace, line[n:])) - return cmd_obj.completenames(text, line[n:], begidx - n, endidx - n) - - command, subcommand = command_subcommand_params[:2] - n = len(command) + sum(1 for _ in takewhile(str.isspace, line)) - cfun = getattr(cmd_obj, 'complete_' + subcommand, cmd_obj.complete) - return cfun(text, line[n:], begidx - n, endidx - n) - - -class AddSubmenu(object): - """Conveniently add a submenu (Cmd-like class) to a Cmd - - e.g. given "class SubMenu(Cmd): ..." then - - @AddSubmenu(SubMenu(), 'sub') - class MyCmd(cmd.Cmd): - .... - - will have the following effects: - 1. 'sub' will interactively enter the cmdloop of a SubMenu instance - 2. 'sub cmd args' will call do_cmd(args) in a SubMenu instance - 3. 'sub ... [TAB]' will have the same behavior as [TAB] in a SubMenu cmdloop - i.e., autocompletion works the way you think it should - 4. 'help sub [cmd]' will print SubMenu's help (calls its do_help()) - """ - - class _Nonexistent(object): - """ - Used to mark missing attributes. - Disable __dict__ creation since this class does nothing - """ - __slots__ = () # - - def __init__(self, - submenu, - command, - aliases=(), - reformat_prompt="{super_prompt}>> {sub_prompt}", - shared_attributes=None, - require_predefined_shares=True, - create_subclass=False, - preserve_shares=False, - persistent_history_file=None - ): - """Set up the class decorator - - submenu (Cmd): Instance of something cmd.Cmd-like - - command (str): The command the user types to access the SubMenu instance - - aliases (iterable): More commands that will behave like "command" - - reformat_prompt (str): Format str or None to disable - if it's a string, it should contain one or more of: - {super_prompt}: The current cmd's prompt - {command}: The command in the current cmd with which it was called - {sub_prompt}: The subordinate cmd's original prompt - the default is "{super_prompt}{command} {sub_prompt}" - - shared_attributes (dict): dict of the form {'subordinate_attr': 'parent_attr'} - the attributes are copied to the submenu at the last moment; the submenu's - attributes are backed up before this and restored afterward - - require_predefined_shares: The shared attributes above must be independently - defined in the subordinate Cmd (default: True) - - create_subclass: put the modifications in a subclass rather than modifying - the existing class (default: False) - """ - self.submenu = submenu - self.command = command - self.aliases = aliases - if persistent_history_file: - self.persistent_history_file = os.path.expanduser(persistent_history_file) - else: - self.persistent_history_file = None - - if reformat_prompt is not None and not isinstance(reformat_prompt, str): - raise ValueError("reformat_prompt should be either a format string or None") - self.reformat_prompt = reformat_prompt - - self.shared_attributes = {} if shared_attributes is None else shared_attributes - if require_predefined_shares: - for attr in self.shared_attributes.keys(): - if not hasattr(submenu, attr): - raise AttributeError("The shared attribute '{attr}' is not defined in {cmd}. Either define {attr} " - "in {cmd} or set require_predefined_shares=False." - .format(cmd=submenu.__class__.__name__, attr=attr)) - - self.create_subclass = create_subclass - self.preserve_shares = preserve_shares - - def _get_original_attributes(self): - return { - attr: getattr(self.submenu, attr, AddSubmenu._Nonexistent) - for attr in self.shared_attributes.keys() - } - - def _copy_in_shared_attrs(self, parent_cmd): - for sub_attr, par_attr in self.shared_attributes.items(): - setattr(self.submenu, sub_attr, getattr(parent_cmd, par_attr)) - - def _copy_out_shared_attrs(self, parent_cmd, original_attributes): - if self.preserve_shares: - for sub_attr, par_attr in self.shared_attributes.items(): - setattr(parent_cmd, par_attr, getattr(self.submenu, sub_attr)) - else: - for attr, value in original_attributes.items(): - if attr is not AddSubmenu._Nonexistent: - setattr(self.submenu, attr, value) - else: - delattr(self.submenu, attr) - - def __call__(self, cmd_obj): - """Creates a subclass of Cmd wherein the given submenu can be accessed via the given command""" - def enter_submenu(parent_cmd, line): - """ - This function will be bound to do_<submenu> and will change the scope of the CLI to that of the - submenu. - """ - submenu = self.submenu - original_attributes = self._get_original_attributes() - history = _pop_readline_history() - - if self.persistent_history_file: - try: - readline.read_history_file(self.persistent_history_file) - except FileNotFoundError: - pass - - try: - # copy over any shared attributes - self._copy_in_shared_attrs(parent_cmd) - - if line.parsed.args: - # Remove the menu argument and execute the command in the submenu - line = submenu.parser_manager.parsed(line.parsed.args) - submenu.precmd(line) - ret = submenu.onecmd(line) - submenu.postcmd(ret, line) - else: - if self.reformat_prompt is not None: - prompt = submenu.prompt - submenu.prompt = self.reformat_prompt.format( - super_prompt=parent_cmd.prompt, - command=self.command, - sub_prompt=prompt, - ) - submenu.cmdloop() - if self.reformat_prompt is not None: - # noinspection PyUnboundLocalVariable - self.submenu.prompt = prompt - finally: - # copy back original attributes - self._copy_out_shared_attrs(parent_cmd, original_attributes) - - # write submenu history - if self.persistent_history_file: - readline.write_history_file(self.persistent_history_file) - # reset main app history before exit - _push_readline_history(history) - - def complete_submenu(_self, text, line, begidx, endidx): - """ - This function will be bound to complete_<submenu> and will perform the complete commands of the submenu. - """ - submenu = self.submenu - original_attributes = self._get_original_attributes() - try: - # copy over any shared attributes - self._copy_in_shared_attrs(_self) - return _complete_from_cmd(submenu, text, line, begidx, endidx) - finally: - # copy back original attributes - self._copy_out_shared_attrs(_self, original_attributes) - - original_do_help = cmd_obj.do_help - original_complete_help = cmd_obj.complete_help - - def help_submenu(_self, line): - """ - This function will be bound to help_<submenu> and will call the help commands of the submenu. - """ - tokens = line.split(None, 1) - if tokens and (tokens[0] == self.command or tokens[0] in self.aliases): - self.submenu.do_help(tokens[1] if len(tokens) == 2 else '') - else: - original_do_help(_self, line) - - def _complete_submenu_help(_self, text, line, begidx, endidx): - """autocomplete to match help_submenu()'s behavior""" - tokens = line.split(None, 1) - if len(tokens) == 2 and ( - not (not tokens[1].startswith(self.command) and not any( - tokens[1].startswith(alias) for alias in self.aliases)) - ): - return self.submenu.complete_help( - text, - tokens[1], - begidx - line.index(tokens[1]), - endidx - line.index(tokens[1]), - ) - else: - return original_complete_help(_self, text, line, begidx, endidx) - - if self.create_subclass: - class _Cmd(cmd_obj): - do_help = help_submenu - complete_help = _complete_submenu_help - else: - _Cmd = cmd_obj - _Cmd.do_help = help_submenu - _Cmd.complete_help = _complete_submenu_help - - # Create bindings in the parent command to the submenus commands. - setattr(_Cmd, 'do_' + self.command, enter_submenu) - setattr(_Cmd, 'complete_' + self.command, complete_submenu) - - # Create additional bindings for aliases - for _alias in self.aliases: - setattr(_Cmd, 'do_' + _alias, enter_submenu) - setattr(_Cmd, 'complete_' + _alias, complete_submenu) - return _Cmd - - -class Cmd(cmd.Cmd): - """An easy but powerful framework for writing line-oriented command interpreters. - - Extends the Python Standard Library’s cmd package by adding a lot of useful features - to the out of the box configuration. - - Line-oriented command interpreters are often useful for test harnesses, internal tools, and rapid prototypes. - """ - # Attributes used to configure the ParserManager (all are not dynamically settable at runtime) - blankLinesAllowed = False - commentGrammars = pyparsing.Or([pyparsing.pythonStyleComment, pyparsing.cStyleComment]) - commentInProgress = pyparsing.Literal('/*') + pyparsing.SkipTo(pyparsing.stringEnd ^ '*/') - legalChars = u'!#$%.:?@_-' + pyparsing.alphanums + pyparsing.alphas8bit - multilineCommands = [] - prefixParser = pyparsing.Empty() - redirector = '>' # for sending output to file - shortcuts = {'?': 'help', '!': 'shell', '@': 'load', '@@': '_relative_load'} - aliases = dict() - terminators = [';'] # make sure your terminators are not in legalChars! - - # Attributes which are NOT dynamically settable at runtime - allow_cli_args = True # Should arguments passed on the command-line be processed as commands? - allow_redirection = True # Should output redirection and pipes be allowed - default_to_shell = False # Attempt to run unrecognized commands as shell commands - quit_on_sigint = False # Quit the loop on interrupt instead of just resetting prompt - reserved_words = [] - - # Attributes which ARE dynamically settable at runtime - colors = (platform.system() != 'Windows') - continuation_prompt = '> ' - debug = False - echo = False - editor = os.environ.get('EDITOR') - if not editor: - if sys.platform[:3] == 'win': - editor = 'notepad' - else: - # Favor command-line editors first so we don't leave the terminal to edit - for editor in ['vim', 'vi', 'emacs', 'nano', 'pico', 'gedit', 'kate', 'subl', 'geany', 'atom']: - if _which(editor): - break - feedback_to_output = False # Do not include nonessentials in >, | output by default (things like timing) - locals_in_py = True - quiet = False # Do not suppress nonessential output - timing = False # Prints elapsed time for each command - - # To make an attribute settable with the "do_set" command, add it to this ... - # This starts out as a dictionary but gets converted to an OrderedDict sorted alphabetically by key - settable = {'colors': 'Colorized output (*nix only)', - 'continuation_prompt': 'On 2nd+ line of input', - 'debug': 'Show full error stack on error', - 'echo': 'Echo command issued into output', - 'editor': 'Program used by ``edit``', - 'feedback_to_output': 'Include nonessentials in `|`, `>` results', - 'locals_in_py': 'Allow access to your application in py via self', - 'prompt': 'The prompt issued to solicit input', - 'quiet': "Don't print nonessential feedback", - 'timing': 'Report execution times'} - - def __init__(self, completekey='tab', stdin=None, stdout=None, persistent_history_file='', - persistent_history_length=1000, startup_script=None, use_ipython=False, transcript_files=None): - """An easy but powerful framework for writing line-oriented command interpreters, extends Python's cmd package. - - :param completekey: str - (optional) readline name of a completion key, default to Tab - :param stdin: (optional) alternate input file object, if not specified, sys.stdin is used - :param stdout: (optional) alternate output file object, if not specified, sys.stdout is used - :param persistent_history_file: str - (optional) file path to load a persistent readline history from - :param persistent_history_length: int - (optional) max number of lines which will be written to the history file - :param startup_script: str - (optional) file path to a a script to load and execute at startup - :param use_ipython: (optional) should the "ipy" command be included for an embedded IPython shell - :param transcript_files: str - (optional) allows running transcript tests when allow_cli_args is False - """ - # If use_ipython is False, make sure the do_ipy() method doesn't exit - if not use_ipython: - try: - del Cmd.do_ipy - except AttributeError: - pass - - # If persistent readline history is enabled, then read history from file and register to write to file at exit - if persistent_history_file: - persistent_history_file = os.path.expanduser(persistent_history_file) - try: - readline.read_history_file(persistent_history_file) - # default history len is -1 (infinite), which may grow unruly - readline.set_history_length(persistent_history_length) - except FileNotFoundError: - pass - atexit.register(readline.write_history_file, persistent_history_file) - - # Call super class constructor - super().__init__(completekey=completekey, stdin=stdin, stdout=stdout) - - # Commands to exclude from the help menu and tab completion - self.hidden_commands = ['eof', 'eos', '_relative_load'] - - # Commands to exclude from the history command - self.exclude_from_history = '''history edit eof eos'''.split() - - self._finalize_app_parameters() - - self.initial_stdout = sys.stdout - self.history = History() - self.pystate = {} - self.keywords = self.reserved_words + [fname[3:] for fname in dir(self) if fname.startswith('do_')] - self.parser_manager = ParserManager(redirector=self.redirector, terminators=self.terminators, - multilineCommands=self.multilineCommands, - legalChars=self.legalChars, commentGrammars=self.commentGrammars, - commentInProgress=self.commentInProgress, - blankLinesAllowed=self.blankLinesAllowed, prefixParser=self.prefixParser, - preparse=self.preparse, postparse=self.postparse, aliases=self.aliases, - shortcuts=self.shortcuts) - self._transcript_files = transcript_files - - # Used to enable the ability for a Python script to quit the application - self._should_quit = False - - # True if running inside a Python script or interactive console, False otherwise - self._in_py = False - - # Stores results from the last command run to enable usage of results in a Python script or interactive console - # Built-in commands don't make use of this. It is purely there for user-defined commands and convenience. - self._last_result = None - - # Used to save state during a redirection - self.kept_state = None - self.kept_sys = None - - # Codes used for exit conditions - self._STOP_AND_EXIT = True # cmd convention - - self._colorcodes = {'bold': {True: '\x1b[1m', False: '\x1b[22m'}, - 'cyan': {True: '\x1b[36m', False: '\x1b[39m'}, - 'blue': {True: '\x1b[34m', False: '\x1b[39m'}, - 'red': {True: '\x1b[31m', False: '\x1b[39m'}, - 'magenta': {True: '\x1b[35m', False: '\x1b[39m'}, - 'green': {True: '\x1b[32m', False: '\x1b[39m'}, - 'underline': {True: '\x1b[4m', False: '\x1b[24m'}, - 'yellow': {True: '\x1b[33m', False: '\x1b[39m'}} - - # Used load command to store the current script dir as a LIFO queue to support _relative_load command - self._script_dir = [] - - # Used when piping command output to a shell command - self.pipe_proc = None - - # Used by complete() for readline tab completion - self.completion_matches = [] - - # Used to keep track of whether we are redirecting or piping output - self.redirecting = False - - # If this string is non-empty, then this warning message will print if a broken pipe error occurs while printing - self.broken_pipe_warning = '' - - # If a startup script is provided, then add it in the queue to load - if startup_script is not None: - startup_script = os.path.expanduser(startup_script) - if os.path.exists(startup_script) and os.path.getsize(startup_script) > 0: - self.cmdqueue.append('load {}'.format(startup_script)) - - ############################################################################################################ - # The following variables are used by tab-completion functions. They are reset each time complete() is run - # using set_completion_defaults() and it is up to completer functions to set them before returning results. - ############################################################################################################ - - # If true and a single match is returned to complete(), then a space will be appended - # if the match appears at the end of the line - self.allow_appended_space = True - - # If true and a single match is returned to complete(), then a closing quote - # will be added if there is an unmatched opening quote - self.allow_closing_quote = True - - # Use this list if you are completing strings that contain a common delimiter and you only want to - # display the final portion of the matches as the tab-completion suggestions. The full matches - # still must be returned from your completer function. For an example, look at path_complete() - # which uses this to show only the basename of paths as the suggestions. delimiter_complete() also - # populates this list. - self.display_matches = [] - - # ----- Methods related to presenting output to the user ----- - - @property - def visible_prompt(self): - """Read-only property to get the visible prompt with any ANSI escape codes stripped. - - Used by transcript testing to make it easier and more reliable when users are doing things like coloring the - prompt using ANSI color codes. - - :return: str - prompt stripped of any ANSI escape codes - """ - return strip_ansi(self.prompt) - - def _finalize_app_parameters(self): - self.commentGrammars.ignore(pyparsing.quotedString).setParseAction(lambda x: '') - # noinspection PyUnresolvedReferences - self.shortcuts = sorted(self.shortcuts.items(), reverse=True) - - # Make sure settable parameters are sorted alphabetically by key - self.settable = collections.OrderedDict(sorted(self.settable.items(), key=lambda t: t[0])) - - def poutput(self, msg, end='\n'): - """Convenient shortcut for self.stdout.write(); by default adds newline to end if not already present. - - Also handles BrokenPipeError exceptions for when a commands's output has been piped to another process and - that process terminates before the cmd2 command is finished executing. - - :param msg: str - message to print to current stdout - anything convertible to a str with '{}'.format() is OK - :param end: str - string appended after the end of the message if not already present, default a newline - """ - if msg is not None and msg != '': - try: - msg_str = '{}'.format(msg) - self.stdout.write(msg_str) - if not msg_str.endswith(end): - self.stdout.write(end) - except BrokenPipeError: - # This occurs if a command's output is being piped to another process and that process closes before the - # command is finished. If you would like your application to print a warning message, then set the - # broken_pipe_warning attribute to the message you want printed. - if self.broken_pipe_warning: - sys.stderr.write(self.broken_pipe_warning) - - def perror(self, errmsg, exception_type=None, traceback_war=True): - """ Print error message to sys.stderr and if debug is true, print an exception Traceback if one exists. - - :param errmsg: str - error message to print out - :param exception_type: str - (optional) type of exception which precipitated this error message - :param traceback_war: bool - (optional) if True, print a message to let user know they can enable debug - :return: - """ - if self.debug: - traceback.print_exc() - - if exception_type is None: - err = self.colorize("ERROR: {}\n".format(errmsg), 'red') - sys.stderr.write(err) - else: - err = "EXCEPTION of type '{}' occurred with message: '{}'\n".format(exception_type, errmsg) - sys.stderr.write(self.colorize(err, 'red')) - - if traceback_war: - war = "To enable full traceback, run the following command: 'set debug true'\n" - sys.stderr.write(self.colorize(war, 'yellow')) - - def pfeedback(self, msg): - """For printing nonessential feedback. Can be silenced with `quiet`. - Inclusion in redirected output is controlled by `feedback_to_output`.""" - if not self.quiet: - if self.feedback_to_output: - self.poutput(msg) - else: - sys.stderr.write("{}\n".format(msg)) - - def ppaged(self, msg, end='\n'): - """Print output using a pager if it would go off screen and stdout isn't currently being redirected. - - Never uses a pager inside of a script (Python or text) or when output is being redirected or piped or when - stdout or stdin are not a fully functional terminal. - - :param msg: str - message to print to current stdout - anything convertible to a str with '{}'.format() is OK - :param end: str - string appended after the end of the message if not already present, default a newline - """ - if msg is not None and msg != '': - try: - msg_str = '{}'.format(msg) - if not msg_str.endswith(end): - msg_str += end - - # Attempt to detect if we are not running within a fully functional terminal. - # Don't try to use the pager when being run by a continuous integration system like Jenkins + pexpect. - functional_terminal = False - - if self.stdin.isatty() and self.stdout.isatty(): - if sys.platform.startswith('win') or os.environ.get('TERM') is not None: - functional_terminal = True - - # Don't attempt to use a pager that can block if redirecting or running a script (either text or Python) - # Also only attempt to use a pager if actually running in a real fully functional terminal - if functional_terminal and not self.redirecting and not self._in_py and not self._script_dir: - - if sys.platform.startswith('win'): - pager_cmd = 'more' - else: - # Here is the meaning of the various flags we are using with the less command: - # -S causes lines longer than the screen width to be chopped (truncated) rather than wrapped - # -R causes ANSI "color" escape sequences to be output in raw form (i.e. colors are displayed) - # -X disables sending the termcap initialization and deinitialization strings to the terminal - # -F causes less to automatically exit if the entire file can be displayed on the first screen - pager_cmd = 'less -SRXF' - self.pipe_proc = subprocess.Popen(pager_cmd, shell=True, stdin=subprocess.PIPE) - try: - self.pipe_proc.stdin.write(msg_str.encode('utf-8', 'replace')) - self.pipe_proc.stdin.close() - except (IOError, KeyboardInterrupt): - pass - - # Less doesn't respect ^C, but catches it for its own UI purposes (aborting search etc. inside less) - while True: - try: - self.pipe_proc.wait() - except KeyboardInterrupt: - pass - else: - break - self.pipe_proc = None - else: - self.stdout.write(msg_str) - except BrokenPipeError: - # This occurs if a command's output is being piped to another process and that process closes before the - # command is finished. If you would like your application to print a warning message, then set the - # broken_pipe_warning attribute to the message you want printed. - if self.broken_pipe_warning: - sys.stderr.write(self.broken_pipe_warning) - - def colorize(self, val, color): - """Given a string (``val``), returns that string wrapped in UNIX-style - special characters that turn on (and then off) text color and style. - If the ``colors`` environment parameter is ``False``, or the application - is running on Windows, will return ``val`` unchanged. - ``color`` should be one of the supported strings (or styles): - red/blue/green/cyan/magenta, bold, underline""" - if self.colors and (self.stdout == self.initial_stdout): - return self._colorcodes[color][True] + val + self._colorcodes[color][False] - return val - - def get_subcommands(self, command): - """ - Returns a list of a command's subcommand names if they exist - :param command: the command we are querying - :return: A subcommand list or None - """ - - subcommand_names = None - - # Check if is a valid command - funcname = self._func_named(command) - - if funcname: - # Check to see if this function was decorated with an argparse ArgumentParser - func = getattr(self, funcname) - subcommands = func.__dict__.get('subcommands', None) - if subcommands is not None: - subcommand_names = subcommands.keys() - - return subcommand_names - - def get_subcommand_completer(self, command, subcommand): - """ - Returns a subcommand's tab completion function if one exists - :param command: command which owns the subcommand - :param subcommand: the subcommand we are querying - :return: A completer or None - """ - - completer = None - - # Check if is a valid command - funcname = self._func_named(command) - - if funcname: - # Check to see if this function was decorated with an argparse ArgumentParser - func = getattr(self, funcname) - subcommands = func.__dict__.get('subcommands', None) - if subcommands is not None: - completer = subcommands[subcommand] - - return completer - - # ----- Methods related to tab completion ----- - - def set_completion_defaults(self): - """ - Resets tab completion settings - Needs to be called each time readline runs tab completion - """ - self.allow_appended_space = True - self.allow_closing_quote = True - self.display_matches = [] - - def tokens_for_completion(self, line, begidx, endidx): - """ - Used by tab completion functions to get all tokens through the one being completed - :param line: str - the current input line with leading whitespace removed - :param begidx: int - the beginning index of the prefix text - :param endidx: int - the ending index of the prefix text - :return: A 2 item tuple where the items are - On Success - tokens: list of unquoted tokens - this is generally the list needed for tab completion functions - raw_tokens: list of tokens with any quotes preserved - this can be used to know if a token was quoted or is missing a closing quote - - Both lists are guaranteed to have at least 1 item - The last item in both lists is the token being tab completed - - On Failure - Both items are None - """ - unclosed_quote = '' - quotes_to_try = copy.copy(QUOTES) - - tmp_line = line[:endidx] - tmp_endidx = endidx - - # Parse the line into tokens - while True: - try: - # Use non-POSIX parsing to keep the quotes around the tokens - initial_tokens = shlex.split(tmp_line[:tmp_endidx], posix=False) - - # If the cursor is at an empty token outside of a quoted string, - # then that is the token being completed. Add it to the list. - if not unclosed_quote and begidx == tmp_endidx: - initial_tokens.append('') - break - except ValueError: - # ValueError can be caused by missing closing quote - if not quotes_to_try: - # Since we have no more quotes to try, something else - # is causing the parsing error. Return None since - # this means the line is malformed. - return None, None - - # Add a closing quote and try to parse again - unclosed_quote = quotes_to_try[0] - quotes_to_try = quotes_to_try[1:] - - tmp_line = line[:endidx] - tmp_line += unclosed_quote - tmp_endidx = endidx + 1 - - if self.allow_redirection: - - # Since redirection is enabled, we need to treat redirection characters (|, <, >) - # as word breaks when they are in unquoted strings. Go through each token - # and further split them on these characters. Each run of redirect characters - # is treated as a single token. - raw_tokens = [] - - for cur_initial_token in initial_tokens: - - # Save tokens up to 1 character in length or quoted tokens. No need to parse these. - if len(cur_initial_token) <= 1 or cur_initial_token[0] in QUOTES: - raw_tokens.append(cur_initial_token) - continue - - # Iterate over each character in this token - cur_index = 0 - cur_char = cur_initial_token[cur_index] - - # Keep track of the token we are building - cur_raw_token = '' - - while True: - if cur_char not in REDIRECTION_CHARS: - - # Keep appending to cur_raw_token until we hit a redirect char - while cur_char not in REDIRECTION_CHARS: - cur_raw_token += cur_char - cur_index += 1 - if cur_index < len(cur_initial_token): - cur_char = cur_initial_token[cur_index] - else: - break - - else: - redirect_char = cur_char - - # Keep appending to cur_raw_token until we hit something other than redirect_char - while cur_char == redirect_char: - cur_raw_token += cur_char - cur_index += 1 - if cur_index < len(cur_initial_token): - cur_char = cur_initial_token[cur_index] - else: - break - - # Save the current token - raw_tokens.append(cur_raw_token) - cur_raw_token = '' - - # Check if we've viewed all characters - if cur_index >= len(cur_initial_token): - break - else: - raw_tokens = initial_tokens - - # Save the unquoted tokens - tokens = [strip_quotes(cur_token) for cur_token in raw_tokens] - - # If the token being completed had an unclosed quote, we need - # to remove the closing quote that was added in order for it - # to match what was on the command line. - if unclosed_quote: - raw_tokens[-1] = raw_tokens[-1][:-1] - - return tokens, raw_tokens - - # noinspection PyUnusedLocal - @staticmethod - def basic_complete(text, line, begidx, endidx, match_against): - """ - Performs tab completion against a list - - :param text: str - the string prefix we are attempting to match (all returned matches must begin with it) - :param line: str - the current input line with leading whitespace removed - :param begidx: int - the beginning index of the prefix text - :param endidx: int - the ending index of the prefix text - :param match_against: Collection - the list being matched against - :return: List[str] - a list of possible tab completions - """ - return [cur_match for cur_match in match_against if cur_match.startswith(text)] - - def delimiter_complete(self, text, line, begidx, endidx, match_against, delimiter): - """ - Performs tab completion against a list but each match is split on a delimiter and only - the portion of the match being tab completed is shown as the completion suggestions. - This is useful if you match against strings that are hierarchical in nature and have a - common delimiter. - - An easy way to illustrate this concept is path completion since paths are just directories/files - delimited by a slash. If you are tab completing items in /home/user you don't get the following - as suggestions: - - /home/user/file.txt /home/user/program.c - /home/user/maps/ /home/user/cmd2.py - - Instead you are shown: - - file.txt program.c - maps/ cmd2.py - - For a large set of data, this can be visually more pleasing and easier to search. - - Another example would be strings formatted with the following syntax: company::department::name - In this case the delimiter would be :: and the user could easily narrow down what they are looking - for if they were only shown suggestions in the category they are at in the string. - - :param text: str - the string prefix we are attempting to match (all returned matches must begin with it) - :param line: str - the current input line with leading whitespace removed - :param begidx: int - the beginning index of the prefix text - :param endidx: int - the ending index of the prefix text - :param match_against: Collection - the list being matched against - :param delimiter: str - what delimits each portion of the matches (ex: paths are delimited by a slash) - :return: List[str] - a list of possible tab completions - """ - matches = self.basic_complete(text, line, begidx, endidx, match_against) - - # Display only the portion of the match that's being completed based on delimiter - if matches: - - # Get the common beginning for the matches - common_prefix = os.path.commonprefix(matches) - prefix_tokens = common_prefix.split(delimiter) - - # Calculate what portion of the match we are completing - display_token_index = 0 - if prefix_tokens: - display_token_index = len(prefix_tokens) - 1 - - # Get this portion for each match and store them in self.display_matches - for cur_match in matches: - match_tokens = cur_match.split(delimiter) - display_token = match_tokens[display_token_index] - - if not display_token: - display_token = delimiter - self.display_matches.append(display_token) - - return matches - - def flag_based_complete(self, text, line, begidx, endidx, flag_dict, all_else=None): - """ - Tab completes based on a particular flag preceding the token being completed - :param text: str - the string prefix we are attempting to match (all returned matches must begin with it) - :param line: str - the current input line with leading whitespace removed - :param begidx: int - the beginning index of the prefix text - :param endidx: int - the ending index of the prefix text - :param flag_dict: dict - dictionary whose structure is the following: - keys - flags (ex: -c, --create) that result in tab completion for the next - argument in the command line - values - there are two types of values - 1. iterable list of strings to match against (dictionaries, lists, etc.) - 2. function that performs tab completion (ex: path_complete) - :param all_else: Collection or function - an optional parameter for tab completing any token that isn't preceded - by a flag in flag_dict - :return: List[str] - a list of possible tab completions - """ - # Get all tokens through the one being completed - tokens, _ = self.tokens_for_completion(line, begidx, endidx) - if tokens is None: - return [] - - completions_matches = [] - match_against = all_else - - # Must have at least 2 args for a flag to precede the token being completed - if len(tokens) > 1: - flag = tokens[-2] - if flag in flag_dict: - match_against = flag_dict[flag] - - # Perform tab completion using a Collection - if isinstance(match_against, Collection): - completions_matches = self.basic_complete(text, line, begidx, endidx, match_against) - - # Perform tab completion using a function - elif callable(match_against): - completions_matches = match_against(text, line, begidx, endidx) - - return completions_matches - - def index_based_complete(self, text, line, begidx, endidx, index_dict, all_else=None): - """ - Tab completes based on a fixed position in the input string - :param text: str - the string prefix we are attempting to match (all returned matches must begin with it) - :param line: str - the current input line with leading whitespace removed - :param begidx: int - the beginning index of the prefix text - :param endidx: int - the ending index of the prefix text - :param index_dict: dict - dictionary whose structure is the following: - keys - 0-based token indexes into command line that determine which tokens - perform tab completion - values - there are two types of values - 1. iterable list of strings to match against (dictionaries, lists, etc.) - 2. function that performs tab completion (ex: path_complete) - :param all_else: Collection or function - an optional parameter for tab completing any token that isn't at an - index in index_dict - :return: List[str] - a list of possible tab completions - """ - # Get all tokens through the one being completed - tokens, _ = self.tokens_for_completion(line, begidx, endidx) - if tokens is None: - return [] - - matches = [] - - # Get the index of the token being completed - index = len(tokens) - 1 - - # Check if token is at an index in the dictionary - if index in index_dict: - match_against = index_dict[index] - else: - match_against = all_else - - # Perform tab completion using a Collection - if isinstance(match_against, Collection): - matches = self.basic_complete(text, line, begidx, endidx, match_against) - - # Perform tab completion using a function - elif callable(match_against): - matches = match_against(text, line, begidx, endidx) - - return matches - - # noinspection PyUnusedLocal - def path_complete(self, text, line, begidx, endidx, dir_exe_only=False, dir_only=False): - """Performs completion of local file system paths - - :param text: str - the string prefix we are attempting to match (all returned matches must begin with it) - :param line: str - the current input line with leading whitespace removed - :param begidx: int - the beginning index of the prefix text - :param endidx: int - the ending index of the prefix text - :param dir_exe_only: bool - only return directories and executables, not non-executable files - :param dir_only: bool - only return directories - :return: List[str] - a list of possible tab completions - """ - - # Used to complete ~ and ~user strings - def complete_users(): - - # We are returning ~user strings that resolve to directories, - # so don't append a space or quote in the case of a single result. - self.allow_appended_space = False - self.allow_closing_quote = False - - users = [] - - # Windows lacks the pwd module so we can't get a list of users. - # Instead we will add a slash once the user enters text that - # resolves to an existing home directory. - if sys.platform.startswith('win'): - expanded_path = os.path.expanduser(text) - if os.path.isdir(expanded_path): - users.append(text + os.path.sep) - else: - import pwd - - # Iterate through a list of users from the password database - for cur_pw in pwd.getpwall(): - - # Check if the user has an existing home dir - if os.path.isdir(cur_pw.pw_dir): - - # Add a ~ to the user to match against text - cur_user = '~' + cur_pw.pw_name - if cur_user.startswith(text): - if add_trailing_sep_if_dir: - cur_user += os.path.sep - users.append(cur_user) - - return users - - # Determine if a trailing separator should be appended to directory completions - add_trailing_sep_if_dir = False - if endidx == len(line) or (endidx < len(line) and line[endidx] != os.path.sep): - add_trailing_sep_if_dir = True - - # Used to replace cwd in the final results - cwd = os.getcwd() - cwd_added = False - - # Used to replace expanded user path in final result - orig_tilde_path = '' - expanded_tilde_path = '' - - # If the search text is blank, then search in the CWD for * - if not text: - search_str = os.path.join(os.getcwd(), '*') - cwd_added = True - else: - # Purposely don't match any path containing wildcards - what we are doing is complicated enough! - wildcards = ['*', '?'] - for wildcard in wildcards: - if wildcard in text: - return [] - - # Start the search string - search_str = text + '*' - - # Handle tilde expansion and completion - if text.startswith('~'): - sep_index = text.find(os.path.sep, 1) - - # If there is no slash, then the user is still completing the user after the tilde - if sep_index == -1: - return complete_users() - - # Otherwise expand the user dir - else: - search_str = os.path.expanduser(search_str) - - # Get what we need to restore the original tilde path later - orig_tilde_path = text[:sep_index] - expanded_tilde_path = os.path.expanduser(orig_tilde_path) - - # If the search text does not have a directory, then use the cwd - elif not os.path.dirname(text): - search_str = os.path.join(os.getcwd(), search_str) - cwd_added = True - - # Find all matching path completions - matches = glob.glob(search_str) - - # Filter based on type - if dir_exe_only: - matches = [c for c in matches if os.path.isdir(c) or os.access(c, os.X_OK)] - elif dir_only: - matches = [c for c in matches if os.path.isdir(c)] - - # Don't append a space or closing quote to directory - if len(matches) == 1 and os.path.isdir(matches[0]): - self.allow_appended_space = False - self.allow_closing_quote = False - - # Build display_matches and add a slash to directories - for index, cur_match in enumerate(matches): - - # Display only the basename of this path in the tab-completion suggestions - self.display_matches.append(os.path.basename(cur_match)) - - # Add a separator after directories if the next character isn't already a separator - if os.path.isdir(cur_match) and add_trailing_sep_if_dir: - matches[index] += os.path.sep - self.display_matches[index] += os.path.sep - - # Remove cwd if it was added to match the text readline expects - if cwd_added: - matches = [cur_path.replace(cwd + os.path.sep, '', 1) for cur_path in matches] - - # Restore the tilde string if we expanded one to match the text readline expects - if expanded_tilde_path: - matches = [cur_path.replace(expanded_tilde_path, orig_tilde_path, 1) for cur_path in matches] - - return matches - - @staticmethod - def get_exes_in_path(starts_with): - """ - Returns names of executables in a user's path - :param starts_with: str - what the exes should start with. leave blank for all exes in path. - :return: List[str] - a list of matching exe names - """ - # Purposely don't match any executable containing wildcards - wildcards = ['*', '?'] - for wildcard in wildcards: - if wildcard in starts_with: - return [] - - # Get a list of every directory in the PATH environment variable and ignore symbolic links - paths = [p for p in os.getenv('PATH').split(os.path.pathsep) if not os.path.islink(p)] - - # Use a set to store exe names since there can be duplicates - exes_set = set() - - # Find every executable file in the user's path that matches the pattern - for path in paths: - full_path = os.path.join(path, starts_with) - matches = [f for f in glob.glob(full_path + '*') if os.path.isfile(f) and os.access(f, os.X_OK)] - - for match in matches: - exes_set.add(os.path.basename(match)) - - return list(exes_set) - - def shell_cmd_complete(self, text, line, begidx, endidx, complete_blank=False): - """Performs completion of executables either in a user's path or a given path - :param text: str - the string prefix we are attempting to match (all returned matches must begin with it) - :param line: str - the current input line with leading whitespace removed - :param begidx: int - the beginning index of the prefix text - :param endidx: int - the ending index of the prefix text - :param complete_blank: bool - If True, then a blank will complete all shell commands in a user's path - If False, then no completion is performed - Defaults to False to match Bash shell behavior - :return: List[str] - a list of possible tab completions - """ - # Don't tab complete anything if no shell command has been started - if not complete_blank and not text: - return [] - - # If there are no path characters in the search text, then do shell command completion in the user's path - if not text.startswith('~') and os.path.sep not in text: - return self.get_exes_in_path(text) - - # Otherwise look for executables in the given path - else: - return self.path_complete(text, line, begidx, endidx, dir_exe_only=True) - - def _redirect_complete(self, text, line, begidx, endidx, compfunc): - """ - Called by complete() as the first tab completion function for all commands - It determines if it should tab complete for redirection (|, <, >, >>) or use the - completer function for the current command - - :param text: str - the string prefix we are attempting to match (all returned matches must begin with it) - :param line: str - the current input line with leading whitespace removed - :param begidx: int - the beginning index of the prefix text - :param endidx: int - the ending index of the prefix text - :param compfunc: Callable - the completer function for the current command - this will be called if we aren't completing for redirection - :return: List[str] - a list of possible tab completions - """ - if self.allow_redirection: - - # Get all tokens through the one being completed. We want the raw tokens - # so we can tell if redirection strings are quoted and ignore them. - _, raw_tokens = self.tokens_for_completion(line, begidx, endidx) - if raw_tokens is None: - return [] - - if len(raw_tokens) > 1: - - # Build a list of all redirection tokens - all_redirects = REDIRECTION_CHARS + ['>>'] - - # Check if there are redirection strings prior to the token being completed - seen_pipe = False - has_redirection = False - - for cur_token in raw_tokens[:-1]: - if cur_token in all_redirects: - has_redirection = True - - if cur_token == '|': - seen_pipe = True - - # Get token prior to the one being completed - prior_token = raw_tokens[-2] - - # If a pipe is right before the token being completed, complete a shell command as the piped process - if prior_token == '|': - return self.shell_cmd_complete(text, line, begidx, endidx) - - # Otherwise do path completion either as files to redirectors or arguments to the piped process - elif prior_token in all_redirects or seen_pipe: - return self.path_complete(text, line, begidx, endidx) - - # If there were redirection strings anywhere on the command line, then we - # are no longer tab completing for the current command - elif has_redirection: - return [] - - # Call the command's completer function - return compfunc(text, line, begidx, endidx) - - @staticmethod - def _pad_matches_to_display(matches_to_display): - """ - Adds padding to the matches being displayed as tab completion suggestions. - The default padding of readline/pyreadine is small and not visually appealing - especially if matches have spaces. It appears very squished together. - - :param matches_to_display: the matches being padded - :return: the padded matches and length of padding that was added - """ - if rl_type == RlType.GNU: - # Add 2 to the padding of 2 that readline uses for a total of 4. - padding = 2 * ' ' - - elif rl_type == RlType.PYREADLINE: - # Add 3 to the padding of 1 that pyreadline uses for a total of 4. - padding = 3 * ' ' - - else: - return matches_to_display, 0 - - return [cur_match + padding for cur_match in matches_to_display], len(padding) - - def _display_matches_gnu_readline(self, substitution, matches, longest_match_length): - """ - Prints a match list using GNU readline's rl_display_match_list() - This exists to print self.display_matches if it has data. Otherwise matches prints. - - :param substitution: str - the substitution written to the command line - :param matches: list[str] - the tab completion matches to display - :param longest_match_length: int - longest printed length of the matches - """ - if rl_type == RlType.GNU: - - # Check if we should show display_matches - if self.display_matches: - matches_to_display = self.display_matches - - # Recalculate longest_match_length for display_matches - longest_match_length = 0 - - for cur_match in matches_to_display: - cur_length = wcswidth(cur_match) - if cur_length > longest_match_length: - longest_match_length = cur_length - else: - matches_to_display = matches - - # Add padding for visual appeal - matches_to_display, padding_length = self._pad_matches_to_display(matches_to_display) - longest_match_length += padding_length - - # We will use readline's display function (rl_display_match_list()), so we - # need to encode our string as bytes to place in a C array. - encoded_substitution = bytes(substitution, encoding='utf-8') - encoded_matches = [bytes(cur_match, encoding='utf-8') for cur_match in matches_to_display] - - # rl_display_match_list() expects matches to be in argv format where - # substitution is the first element, followed by the matches, and then a NULL. - # noinspection PyCallingNonCallable,PyTypeChecker - strings_array = (ctypes.c_char_p * (1 + len(encoded_matches) + 1))() - - # Copy in the encoded strings and add a NULL to the end - strings_array[0] = encoded_substitution - strings_array[1:-1] = encoded_matches - strings_array[-1] = None - - # Call readline's display function - # rl_display_match_list(strings_array, number of completion matches, longest match length) - readline_lib.rl_display_match_list(strings_array, len(encoded_matches), longest_match_length) - - # rl_forced_update_display() is the proper way to redraw the prompt and line, but we - # have to use ctypes to do it since Python's readline API does not wrap the function - readline_lib.rl_forced_update_display() - - # Since we updated the display, readline asks that rl_display_fixed be set for efficiency - display_fixed = ctypes.c_int.in_dll(readline_lib, "rl_display_fixed") - display_fixed.value = 1 - - def _display_matches_pyreadline(self, matches): - """ - Prints a match list using pyreadline's _display_completions() - This exists to print self.display_matches if it has data. Otherwise matches prints. - - :param matches: list[str] - the tab completion matches to display - """ - if rl_type == RlType.PYREADLINE: - - # Check if we should show display_matches - if self.display_matches: - matches_to_display = self.display_matches - else: - matches_to_display = matches - - # Add padding for visual appeal - matches_to_display, _ = self._pad_matches_to_display(matches_to_display) - - # Display the matches - orig_pyreadline_display(matches_to_display) - - # ----- Methods which override stuff in cmd ----- - - def complete(self, text, state): - """Override of command method which returns the next possible completion for 'text'. - - If a command has not been entered, then complete against command list. - Otherwise try to call complete_<command> to get list of completions. - - This method gets called directly by readline because it is set as the tab-completion function. - - This completer function is called as complete(text, state), for state in 0, 1, 2, …, until it returns a - non-string value. It should return the next possible completion starting with text. - - :param text: str - the current word that user is typing - :param state: int - non-negative integer - """ - if state == 0: - unclosed_quote = '' - self.set_completion_defaults() - - # lstrip the original line - orig_line = readline.get_line_buffer() - line = orig_line.lstrip() - stripped = len(orig_line) - len(line) - - # Calculate new indexes for the stripped line. If the cursor is at a position before the end of a - # line of spaces, then the following math could result in negative indexes. Enforce a max of 0. - begidx = max(readline.get_begidx() - stripped, 0) - endidx = max(readline.get_endidx() - stripped, 0) - - # Shortcuts are not word break characters when tab completing. Therefore shortcuts become part - # of the text variable if there isn't a word break, like a space, after it. We need to remove it - # from text and update the indexes. This only applies if we are at the the beginning of the line. - shortcut_to_restore = '' - if begidx == 0: - for (shortcut, expansion) in self.shortcuts: - if text.startswith(shortcut): - # Save the shortcut to restore later - shortcut_to_restore = shortcut - - # Adjust text and where it begins - text = text[len(shortcut_to_restore):] - begidx += len(shortcut_to_restore) - break - - # If begidx is greater than 0, then we are no longer completing the command - if begidx > 0: - - # Parse the command line - command, args, expanded_line = self.parseline(line) - - # We overwrote line with a properly formatted but fully stripped version - # Restore the end spaces since line is only supposed to be lstripped when - # passed to completer functions according to Python docs - rstripped_len = len(line) - len(line.rstrip()) - expanded_line += ' ' * rstripped_len - - # Fix the index values if expanded_line has a different size than line - if len(expanded_line) != len(line): - diff = len(expanded_line) - len(line) - begidx += diff - endidx += diff - - # Overwrite line to pass into completers - line = expanded_line - - # Get all tokens through the one being completed - tokens, raw_tokens = self.tokens_for_completion(line, begidx, endidx) - - # Either had a parsing error or are trying to complete the command token - # The latter can happen if default_to_shell is True and parseline() allowed - # assumed something like " or ' was a command. - if tokens is None or len(tokens) == 1: - self.completion_matches = [] - return None - - # Text we need to remove from completions later - text_to_remove = '' - - # Get the token being completed with any opening quote preserved - raw_completion_token = raw_tokens[-1] - - # Check if the token being completed has an opening quote - if raw_completion_token and raw_completion_token[0] in QUOTES: - - # Since the token is still being completed, we know the opening quote is unclosed - unclosed_quote = raw_completion_token[0] - - # readline still performs word breaks after a quote. Therefore something like quoted search - # text with a space would have resulted in begidx pointing to the middle of the token we - # we want to complete. Figure out where that token actually begins and save the beginning - # portion of it that was not part of the text readline gave us. We will remove it from the - # completions later since readline expects them to start with the original text. - actual_begidx = line[:endidx].rfind(tokens[-1]) - - if actual_begidx != begidx: - text_to_remove = line[actual_begidx:begidx] - - # Adjust text and where it begins so the completer routines - # get unbroken search text to complete on. - text = text_to_remove + text - begidx = actual_begidx - - # Check if a valid command was entered - if command in self.get_all_commands(): - # Get the completer function for this command - try: - compfunc = getattr(self, 'complete_' + command) - except AttributeError: - compfunc = self.completedefault - - subcommands = self.get_subcommands(command) - if subcommands is not None: - # Since there are subcommands, then try completing those if the cursor is in - # the token at index 1, otherwise default to using compfunc - index_dict = {1: subcommands} - compfunc = functools.partial(self.index_based_complete, - index_dict=index_dict, - all_else=compfunc) - - # A valid command was not entered - else: - # Check if this command should be run as a shell command - if self.default_to_shell and command in self.get_exes_in_path(command): - compfunc = self.path_complete - else: - compfunc = self.completedefault - - # Attempt tab completion for redirection first, and if that isn't occurring, - # call the completer function for the current command - self.completion_matches = self._redirect_complete(text, line, begidx, endidx, compfunc) - - if self.completion_matches: - - # Eliminate duplicates - matches_set = set(self.completion_matches) - self.completion_matches = list(matches_set) - - display_matches_set = set(self.display_matches) - self.display_matches = list(display_matches_set) - - # Check if display_matches has been used. If so, then matches - # on delimited strings like paths was done. - if self.display_matches: - matches_delimited = True - else: - matches_delimited = False - - # Since self.display_matches is empty, set it to self.completion_matches - # before we alter them. That way the suggestions will reflect how we parsed - # the token being completed and not how readline did. - self.display_matches = copy.copy(self.completion_matches) - - # Check if we need to add an opening quote - if not unclosed_quote: - - add_quote = False - - # This is the tab completion text that will appear on the command line. - common_prefix = os.path.commonprefix(self.completion_matches) - - if matches_delimited: - # Check if any portion of the display matches appears in the tab completion - display_prefix = os.path.commonprefix(self.display_matches) - - # For delimited matches, we check what appears before the display - # matches (common_prefix) as well as the display matches themselves. - if (' ' in common_prefix) or (display_prefix and ' ' in ''.join(self.display_matches)): - add_quote = True - - # If there is a tab completion and any match has a space, then add an opening quote - elif common_prefix and ' ' in ''.join(self.completion_matches): - add_quote = True - - if add_quote: - # Figure out what kind of quote to add and save it as the unclosed_quote - if '"' in ''.join(self.completion_matches): - unclosed_quote = "'" - else: - unclosed_quote = '"' - - self.completion_matches = [unclosed_quote + match for match in self.completion_matches] - - # Check if we need to remove text from the beginning of tab completions - elif text_to_remove: - self.completion_matches = \ - [m.replace(text_to_remove, '', 1) for m in self.completion_matches] - - # Check if we need to restore a shortcut in the tab completions - # so it doesn't get erased from the command line - if shortcut_to_restore: - self.completion_matches = \ - [shortcut_to_restore + match for match in self.completion_matches] - - else: - # Complete token against aliases and command names - alias_names = set(self.aliases.keys()) - visible_commands = set(self.get_visible_commands()) - strs_to_match = list(alias_names | visible_commands) - self.completion_matches = self.basic_complete(text, line, begidx, endidx, strs_to_match) - - # Handle single result - if len(self.completion_matches) == 1: - str_to_append = '' - - # Add a closing quote if needed and allowed - if self.allow_closing_quote and unclosed_quote: - str_to_append += unclosed_quote - - # If we are at the end of the line, then add a space if allowed - if self.allow_appended_space and endidx == len(line): - str_to_append += ' ' - - self.completion_matches[0] += str_to_append - - # Otherwise sort matches - elif self.completion_matches: - self.completion_matches.sort() - self.display_matches.sort() - - try: - return self.completion_matches[state] - except IndexError: - return None - - def get_all_commands(self): - """ - Returns a list of all commands - """ - return [cur_name[3:] for cur_name in self.get_names() if cur_name.startswith('do_')] - - def get_visible_commands(self): - """ - Returns a list of commands that have not been hidden - """ - commands = self.get_all_commands() - - # Remove the hidden commands - for name in self.hidden_commands: - if name in commands: - commands.remove(name) - - return commands - - def get_help_topics(self): - """ Returns a list of help topics """ - return [name[5:] for name in self.get_names() if name.startswith('help_')] - - def complete_help(self, text, line, begidx, endidx): - """ - Override of parent class method to handle tab completing subcommands and not showing hidden commands - Returns a list of possible tab completions - """ - - # The command is the token at index 1 in the command line - cmd_index = 1 - - # The subcommand is the token at index 2 in the command line - subcmd_index = 2 - - # Get all tokens through the one being completed - tokens, _ = self.tokens_for_completion(line, begidx, endidx) - if tokens is None: - return [] - - matches = [] - - # Get the index of the token being completed - index = len(tokens) - 1 - - # Check if we are completing a command or help topic - if index == cmd_index: - - # Complete token against topics and visible commands - topics = set(self.get_help_topics()) - visible_commands = set(self.get_visible_commands()) - strs_to_match = list(topics | visible_commands) - matches = self.basic_complete(text, line, begidx, endidx, strs_to_match) - - # Check if we are completing a subcommand - elif index == subcmd_index: - - # Match subcommands if any exist - command = tokens[cmd_index] - matches = self.basic_complete(text, line, begidx, endidx, self.get_subcommands(command)) - - return matches - - # noinspection PyUnusedLocal - def sigint_handler(self, signum, frame): - """Signal handler for SIGINTs which typically come from Ctrl-C events. - - If you need custom SIGINT behavior, then override this function. - - :param signum: int - signal number - :param frame - """ - - # Save copy of pipe_proc since it could theoretically change while this is running - pipe_proc = self.pipe_proc - - if pipe_proc is not None: - pipe_proc.terminate() - - # Re-raise a KeyboardInterrupt so other parts of the code can catch it - raise KeyboardInterrupt("Got a keyboard interrupt") - - def preloop(self): - """"Hook method executed once when the cmdloop() method is called.""" - - # Register a default SIGINT signal handler for Ctrl+C - signal.signal(signal.SIGINT, self.sigint_handler) - - def precmd(self, statement): - """Hook method executed just before the command is processed by ``onecmd()`` and after adding it to the history. - - :param statement: ParsedString - subclass of str which also contains pyparsing ParseResults instance - :return: ParsedString - a potentially modified version of the input ParsedString statement - """ - return statement - - # ----- Methods which are cmd2-specific lifecycle hooks which are not present in cmd ----- - - # noinspection PyMethodMayBeStatic - def preparse(self, raw): - """Hook method executed just before the command line is interpreted, but after the input prompt is generated. - - :param raw: str - raw command line input - :return: str - potentially modified raw command line input - """ - return raw - - # noinspection PyMethodMayBeStatic - def postparse(self, parse_result): - """Hook that runs immediately after parsing the command-line but before ``parsed()`` returns a ParsedString. - - :param parse_result: pyparsing.ParseResults - parsing results output by the pyparsing parser - :return: pyparsing.ParseResults - potentially modified ParseResults object - """ - return parse_result - - # noinspection PyMethodMayBeStatic - def postparsing_precmd(self, statement): - """This runs after parsing the command-line, but before anything else; even before adding cmd to history. - - NOTE: This runs before precmd() and prior to any potential output redirection or piping. - - If you wish to fatally fail this command and exit the application entirely, set stop = True. - - If you wish to just fail this command you can do so by raising an exception: - - - raise EmptyStatement - will silently fail and do nothing - - raise <AnyOtherException> - will fail and print an error message - - :param statement: - the parsed command-line statement - :return: (bool, statement) - (stop, statement) containing a potentially modified version of the statement - """ - stop = False - return stop, statement - - # noinspection PyMethodMayBeStatic - def postparsing_postcmd(self, stop): - """This runs after everything else, including after postcmd(). - - It even runs when an empty line is entered. Thus, if you need to do something like update the prompt due - to notifications from a background thread, then this is the method you want to override to do it. - - :param stop: bool - True implies the entire application should exit. - :return: bool - True implies the entire application should exit. - """ - if not sys.platform.startswith('win'): - # Fix those annoying problems that occur with terminal programs like "less" when you pipe to them - if self.stdin.isatty(): - proc = subprocess.Popen(shlex.split('stty sane')) - proc.communicate() - return stop - - def parseline(self, line): - """Parse the line into a command name and a string containing the arguments. - - NOTE: This is an override of a parent class method. It is only used by other parent class methods. But - we do need to override it here so that the additional shortcuts present in cmd2 get properly expanded for - purposes of tab completion. - - Used for command tab completion. Returns a tuple containing (command, args, line). - 'command' and 'args' may be None if the line couldn't be parsed. - - :param line: str - line read by readline - :return: (str, str, str) - tuple containing (command, args, line) - """ - line = line.strip() - - if not line: - # Deal with empty line or all whitespace line - return None, None, line - - # Make a copy of aliases so we can edit it - tmp_aliases = list(self.aliases.keys()) - keep_expanding = len(tmp_aliases) > 0 - - # Expand aliases - while keep_expanding: - for cur_alias in tmp_aliases: - keep_expanding = False - - if line == cur_alias or line.startswith(cur_alias + ' '): - line = line.replace(cur_alias, self.aliases[cur_alias], 1) - - # Do not expand the same alias more than once - tmp_aliases.remove(cur_alias) - keep_expanding = len(tmp_aliases) > 0 - break - - # Expand command shortcut to its full command name - for (shortcut, expansion) in self.shortcuts: - if line.startswith(shortcut): - # If the next character after the shortcut isn't a space, then insert one - shortcut_len = len(shortcut) - if len(line) == shortcut_len or line[shortcut_len] != ' ': - expansion += ' ' - - # Expand the shortcut - line = line.replace(shortcut, expansion, 1) - break - - i, n = 0, len(line) - - # If we are allowing shell commands, then allow any character in the command - if self.default_to_shell: - while i < n and line[i] != ' ': - i += 1 - - # Otherwise only allow those in identchars - else: - while i < n and line[i] in self.identchars: - i += 1 - - command, arg = line[:i], line[i:].strip() - - return command, arg, line - - def onecmd_plus_hooks(self, line): - """Top-level function called by cmdloop() to handle parsing a line and running the command and all of its hooks. - - :param line: str - line of text read from input - :return: bool - True if cmdloop() should exit, False otherwise - """ - stop = 0 - try: - statement = self._complete_statement(line) - (stop, statement) = self.postparsing_precmd(statement) - if stop: - return self.postparsing_postcmd(stop) - - try: - if self.allow_redirection: - self._redirect_output(statement) - timestart = datetime.datetime.now() - statement = self.precmd(statement) - stop = self.onecmd(statement) - stop = self.postcmd(stop, statement) - if self.timing: - self.pfeedback('Elapsed: %s' % str(datetime.datetime.now() - timestart)) - finally: - if self.allow_redirection: - self._restore_output(statement) - except EmptyStatement: - pass - except ValueError as ex: - # If shlex.split failed on syntax, let user know whats going on - self.perror("Invalid syntax: {}".format(ex), traceback_war=False) - except Exception as ex: - self.perror(ex, type(ex).__name__) - finally: - return self.postparsing_postcmd(stop) - - def runcmds_plus_hooks(self, cmds): - """Convenience method to run multiple commands by onecmd_plus_hooks. - - This method adds the given cmds to the command queue and processes the - queue until completion or an error causes it to abort. Scripts that are - loaded will have their commands added to the queue. Scripts may even - load other scripts recursively. This means, however, that you should not - use this method if there is a running cmdloop or some other event-loop. - This method is only intended to be used in "one-off" scenarios. - - NOTE: You may need this method even if you only have one command. If - that command is a load, then you will need this command to fully process - all the subsequent commands that are loaded from the script file. This - is an improvement over onecmd_plus_hooks, which expects to be used - inside of a command loop which does the processing of loaded commands. - - Example: cmd_obj.runcmds_plus_hooks(['load myscript.txt']) - - :param cmds: list - Command strings suitable for onecmd_plus_hooks. - :return: bool - True implies the entire application should exit. - - """ - stop = False - self.cmdqueue = list(cmds) + self.cmdqueue - try: - while self.cmdqueue and not stop: - line = self.cmdqueue.pop(0) - if self.echo and line != 'eos': - self.poutput('{}{}'.format(self.prompt, line)) - - stop = self.onecmd_plus_hooks(line) - finally: - # Clear out the command queue and script directory stack, just in - # case we hit an error and they were not completed. - self.cmdqueue = [] - self._script_dir = [] - # NOTE: placing this return here inside the finally block will - # swallow exceptions. This is consistent with what is done in - # onecmd_plus_hooks and _cmdloop, although it may not be - # necessary/desired here. - return stop - - def _complete_statement(self, line): - """Keep accepting lines of input until the command is complete.""" - if not line or (not pyparsing.Or(self.commentGrammars).setParseAction(lambda x: '').transformString(line)): - raise EmptyStatement() - statement = self.parser_manager.parsed(line) - while statement.parsed.multilineCommand and (statement.parsed.terminator == ''): - statement = '%s\n%s' % (statement.parsed.raw, - self.pseudo_raw_input(self.continuation_prompt)) - statement = self.parser_manager.parsed(statement) - if not statement.parsed.command: - raise EmptyStatement() - return statement - - def _redirect_output(self, statement): - """Handles output redirection for >, >>, and |. - - :param statement: ParsedString - subclass of str which also contains pyparsing ParseResults instance - """ - if statement.parsed.pipeTo: - self.kept_state = Statekeeper(self, ('stdout',)) - - # Create a pipe with read and write sides - read_fd, write_fd = os.pipe() - - # Open each side of the pipe and set stdout accordingly - # noinspection PyTypeChecker - self.stdout = io.open(write_fd, 'w') - self.redirecting = True - # noinspection PyTypeChecker - subproc_stdin = io.open(read_fd, 'r') - - # We want Popen to raise an exception if it fails to open the process. Thus we don't set shell to True. - try: - self.pipe_proc = subprocess.Popen(shlex.split(statement.parsed.pipeTo), stdin=subproc_stdin) - except Exception as ex: - # Restore stdout to what it was and close the pipe - self.stdout.close() - subproc_stdin.close() - self.pipe_proc = None - self.kept_state.restore() - self.kept_state = None - self.redirecting = False - - # Re-raise the exception - raise ex - elif statement.parsed.output: - if (not statement.parsed.outputTo) and (not can_clip): - raise EnvironmentError('Cannot redirect to paste buffer; install ``xclip`` and re-run to enable') - self.kept_state = Statekeeper(self, ('stdout',)) - self.kept_sys = Statekeeper(sys, ('stdout',)) - self.redirecting = True - if statement.parsed.outputTo: - mode = 'w' - if statement.parsed.output == 2 * self.redirector: - mode = 'a' - sys.stdout = self.stdout = open(os.path.expanduser(statement.parsed.outputTo), mode) - else: - sys.stdout = self.stdout = tempfile.TemporaryFile(mode="w+") - if statement.parsed.output == '>>': - self.poutput(get_paste_buffer()) - - def _restore_output(self, statement): - """Handles restoring state after output redirection as well as the actual pipe operation if present. - - :param statement: ParsedString - subclass of str which also contains pyparsing ParseResults instance - """ - # If we have redirected output to a file or the clipboard or piped it to a shell command, then restore state - if self.kept_state is not None: - # If we redirected output to the clipboard - if statement.parsed.output and not statement.parsed.outputTo: - self.stdout.seek(0) - write_to_paste_buffer(self.stdout.read()) - - try: - # Close the file or pipe that stdout was redirected to - self.stdout.close() - except BrokenPipeError: - pass - finally: - # Restore self.stdout - self.kept_state.restore() - self.kept_state = None - - # If we were piping output to a shell command, then close the subprocess the shell command was running in - if self.pipe_proc is not None: - self.pipe_proc.communicate() - self.pipe_proc = None - - # Restore sys.stdout if need be - if self.kept_sys is not None: - self.kept_sys.restore() - self.kept_sys = None - - self.redirecting = False - - def _func_named(self, arg): - """Gets the method name associated with a given command. - - :param arg: str - command to look up method name which implements it - :return: str - method name which implements the given command - """ - result = None - target = 'do_' + arg - if target in dir(self): - result = target - return result - - def onecmd(self, line): - """ This executes the actual do_* method for a command. - - If the command provided doesn't exist, then it executes _default() instead. - - :param line: ParsedString - subclass of string including the pyparsing ParseResults - :return: bool - a flag indicating whether the interpretation of commands should stop - """ - statement = self.parser_manager.parsed(line) - funcname = self._func_named(statement.parsed.command) - if not funcname: - return self.default(statement) - - # Since we have a valid command store it in the history - if statement.parsed.command not in self.exclude_from_history: - self.history.append(statement.parsed.raw) - - try: - func = getattr(self, funcname) - except AttributeError: - return self.default(statement) - - stop = func(statement) - return stop - - def default(self, statement): - """Executed when the command given isn't a recognized command implemented by a do_* method. - - :param statement: ParsedString - subclass of string including the pyparsing ParseResults - :return: - """ - arg = statement.full_parsed_statement() - if self.default_to_shell: - result = os.system(arg) - # If os.system() succeeded, then don't print warning about unknown command - if not result: - return - - # Print out a message stating this is an unknown command - self.poutput('*** Unknown syntax: {}\n'.format(arg)) - - @staticmethod - def _surround_ansi_escapes(prompt, start="\x01", end="\x02"): - """Overcome bug in GNU Readline in relation to calculation of prompt length in presence of ANSI escape codes. - - :param prompt: str - original prompt - :param start: str - start code to tell GNU Readline about beginning of invisible characters - :param end: str - end code to tell GNU Readline about end of invisible characters - :return: str - prompt safe to pass to GNU Readline - """ - # Windows terminals don't use ANSI escape codes and Windows readline isn't based on GNU Readline - if sys.platform == "win32": - return prompt - - escaped = False - result = "" - - for c in prompt: - if c == "\x1b" and not escaped: - result += start + c - escaped = True - elif c.isalpha() and escaped: - result += c + end - escaped = False - else: - result += c - - return result - - def pseudo_raw_input(self, prompt): - """ - began life as a copy of cmd's cmdloop; like raw_input but - - - accounts for changed stdin, stdout - - if input is a pipe (instead of a tty), look at self.echo - to decide whether to print the prompt and the input - """ - - # Deal with the vagaries of readline and ANSI escape codes - safe_prompt = self._surround_ansi_escapes(prompt) - - if self.use_rawinput: - try: - if sys.stdin.isatty(): - line = input(safe_prompt) - else: - line = input() - if self.echo: - sys.stdout.write('{}{}\n'.format(safe_prompt, line)) - except EOFError: - line = 'eof' - else: - if self.stdin.isatty(): - # on a tty, print the prompt first, then read the line - self.poutput(safe_prompt, end='') - self.stdout.flush() - line = self.stdin.readline() - if len(line) == 0: - line = 'eof' - else: - # we are reading from a pipe, read the line to see if there is - # anything there, if so, then decide whether to print the - # prompt or not - line = self.stdin.readline() - if len(line): - # we read something, output the prompt and the something - if self.echo: - self.poutput('{}{}'.format(safe_prompt, line)) - else: - line = 'eof' - return line.strip() - - def _cmdloop(self): - """Repeatedly issue a prompt, accept input, parse an initial prefix - off the received input, and dispatch to action methods, passing them - the remainder of the line as argument. - - This serves the same role as cmd.cmdloop(). - - :return: bool - True implies the entire application should exit. - """ - # An almost perfect copy from Cmd; however, the pseudo_raw_input portion - # has been split out so that it can be called separately - if self.use_rawinput and self.completekey: - - # Set up readline for our tab completion needs - if rl_type == RlType.GNU: - readline.set_completion_display_matches_hook(self._display_matches_gnu_readline) - - # Set GNU readline's rl_basic_quote_characters to NULL so it won't automatically add a closing quote - # We don't need to worry about setting rl_completion_suppress_quote since we never declared - # rl_completer_quote_characters. - rl_basic_quote_characters.value = None - - elif rl_type == RlType.PYREADLINE: - readline.rl.mode._display_completions = self._display_matches_pyreadline - - try: - self.old_completer = readline.get_completer() - self.old_delims = readline.get_completer_delims() - readline.set_completer(self.complete) - - # Break words on whitespace and quotes when tab completing - completer_delims = " \t\n" + ''.join(QUOTES) - - if self.allow_redirection: - # If redirection is allowed, then break words on those characters too - completer_delims += ''.join(REDIRECTION_CHARS) - - readline.set_completer_delims(completer_delims) - - # Enable tab completion - readline.parse_and_bind(self.completekey + ": complete") - except NameError: - pass - stop = None - try: - while not stop: - if self.cmdqueue: - # Run command out of cmdqueue if nonempty (populated by load command or commands at invocation) - line = self.cmdqueue.pop(0) - - if self.echo and line != 'eos': - self.poutput('{}{}'.format(self.prompt, line)) - else: - # Otherwise, read a command from stdin - if not self.quit_on_sigint: - try: - line = self.pseudo_raw_input(self.prompt) - except KeyboardInterrupt: - self.poutput('^C') - line = '' - else: - line = self.pseudo_raw_input(self.prompt) - - # Run the command along with all associated pre and post hooks - stop = self.onecmd_plus_hooks(line) - finally: - if self.use_rawinput and self.completekey: - - # Restore what we changed in readline - try: - readline.set_completer(self.old_completer) - readline.set_completer_delims(self.old_delims) - except NameError: - pass - - if rl_type == RlType.GNU: - readline.set_completion_display_matches_hook(None) - rl_basic_quote_characters.value = orig_rl_basic_quote_characters_addr - - elif rl_type == RlType.PYREADLINE: - readline.rl.mode._display_completions = orig_pyreadline_display - - self.cmdqueue.clear() - self._script_dir.clear() - - return stop - - @with_argument_list - def do_alias(self, arglist): - """Define or display aliases - -Usage: Usage: alias [name] | [<name> <value>] - Where: - name - name of the alias being looked up, added, or replaced - value - what the alias will be resolved to (if adding or replacing) - this can contain spaces and does not need to be quoted - - Without arguments, 'alias' prints a list of all aliases in a reusable form which - can be outputted to a startup_script to preserve aliases across sessions. - - With one argument, 'alias' shows the value of the specified alias. - Example: alias ls (Prints the value of the alias called 'ls' if it exists) - - With two or more arguments, 'alias' creates or replaces an alias. - - Example: alias ls !ls -lF - - If you want to use redirection or pipes in the alias, then either quote the tokens with these - characters or quote the entire alias value. - - Examples: - alias save_results print_results ">" out.txt - alias save_results print_results "> out.txt" - alias save_results "print_results > out.txt" -""" - # If no args were given, then print a list of current aliases - if not arglist: - for cur_alias in self.aliases: - self.poutput("alias {} {}".format(cur_alias, self.aliases[cur_alias])) - - # The user is looking up an alias - elif len(arglist) == 1: - name = arglist[0] - if name in self.aliases: - self.poutput("alias {} {}".format(name, self.aliases[name])) - else: - self.perror("Alias {!r} not found".format(name), traceback_war=False) - - # The user is creating an alias - else: - name = arglist[0] - value = ' '.join(arglist[1:]) - - # Check for a valid name - for cur_char in name: - if cur_char not in self.identchars: - self.perror("Alias names can only contain the following characters: {}".format(self.identchars), - traceback_war=False) - return - - # Set the alias - self.aliases[name] = value - self.poutput("Alias {!r} created".format(name)) - - def complete_alias(self, text, line, begidx, endidx): - """ Tab completion for alias """ - index_dict = \ - { - 1: self.aliases, - 2: self.get_visible_commands() - } - return self.index_based_complete(text, line, begidx, endidx, index_dict, self.path_complete) - - @with_argument_list - def do_unalias(self, arglist): - """Unsets aliases - -Usage: Usage: unalias [-a] name [name ...] - Where: - name - name of the alias being unset - - Options: - -a remove all alias definitions -""" - if not arglist: - self.do_help('unalias') - - if '-a' in arglist: - self.aliases.clear() - self.poutput("All aliases cleared") - - else: - # Get rid of duplicates - arglist = list(set(arglist)) - - for cur_arg in arglist: - if cur_arg in self.aliases: - del self.aliases[cur_arg] - self.poutput("Alias {!r} cleared".format(cur_arg)) - else: - self.perror("Alias {!r} does not exist".format(cur_arg), traceback_war=False) - - def complete_unalias(self, text, line, begidx, endidx): - """ Tab completion for unalias """ - return self.basic_complete(text, line, begidx, endidx, self.aliases) - - @with_argument_list - def do_help(self, arglist): - """List available commands with "help" or detailed help with "help cmd".""" - if not arglist or (len(arglist) == 1 and arglist[0] in ('--verbose', '-v')): - verbose = len(arglist) == 1 and arglist[0] in ('--verbose', '-v') - self._help_menu(verbose) - else: - # Getting help for a specific command - funcname = self._func_named(arglist[0]) - if funcname: - # Check to see if this function was decorated with an argparse ArgumentParser - func = getattr(self, funcname) - if func.__dict__.get('has_parser', False): - # Function has an argparser, so get help based on all the arguments in case there are sub-commands - new_arglist = arglist[1:] - new_arglist.append('-h') - - # Temporarily redirect all argparse output to both sys.stdout and sys.stderr to self.stdout - with redirect_stdout(self.stdout): - with redirect_stderr(self.stdout): - func(new_arglist) - else: - # No special behavior needed, delegate to cmd base class do_help() - cmd.Cmd.do_help(self, funcname[3:]) - else: - # This could be a help topic - cmd.Cmd.do_help(self, arglist[0]) - - def _help_menu(self, verbose=False): - """Show a list of commands which help can be displayed for. - """ - # Get a sorted list of help topics - help_topics = self.get_help_topics() - help_topics.sort() - - # Get a sorted list of visible command names - visible_commands = self.get_visible_commands() - visible_commands.sort() - - cmds_doc = [] - cmds_undoc = [] - cmds_cats = {} - - for command in visible_commands: - if command in help_topics or getattr(self, self._func_named(command)).__doc__: - if command in help_topics: - help_topics.remove(command) - if hasattr(getattr(self, self._func_named(command)), HELP_CATEGORY): - category = getattr(getattr(self, self._func_named(command)), HELP_CATEGORY) - cmds_cats.setdefault(category, []) - cmds_cats[category].append(command) - else: - cmds_doc.append(command) - else: - cmds_undoc.append(command) - - if len(cmds_cats) == 0: - # No categories found, fall back to standard behavior - self.poutput("{}\n".format(str(self.doc_leader))) - self._print_topics(self.doc_header, cmds_doc, verbose) - else: - # Categories found, Organize all commands by category - self.poutput('{}\n'.format(str(self.doc_leader))) - self.poutput('{}\n\n'.format(str(self.doc_header))) - for category in sorted(cmds_cats.keys()): - self._print_topics(category, cmds_cats[category], verbose) - self._print_topics('Other', cmds_doc, verbose) - - self.print_topics(self.misc_header, help_topics, 15, 80) - self.print_topics(self.undoc_header, cmds_undoc, 15, 80) - - def _print_topics(self, header, cmds, verbose): - """Customized version of print_topics that can switch between verbose or traditional output""" - if cmds: - if not verbose: - self.print_topics(header, cmds, 15, 80) - else: - self.stdout.write('{}\n'.format(str(header))) - widest = 0 - # measure the commands - for command in cmds: - width = len(command) - if width > widest: - widest = width - # add a 4-space pad - widest += 4 - if widest < 20: - widest = 20 - - if self.ruler: - self.stdout.write('{:{ruler}<{width}}\n'.format('', ruler=self.ruler, width=80)) - - for command in cmds: - # Try to get the documentation string - try: - # first see if there's a help function implemented - func = getattr(self, 'help_' + command) - except AttributeError: - # Couldn't find a help function - try: - # Now see if help_summary has been set - doc = getattr(self, self._func_named(command)).help_summary - except AttributeError: - # Last, try to directly access the function's doc-string - doc = getattr(self, self._func_named(command)).__doc__ - else: - # we found the help function - result = StringIO() - # try to redirect system stdout - with redirect_stdout(result): - # save our internal stdout - stdout_orig = self.stdout - try: - # redirect our internal stdout - self.stdout = result - func() - finally: - # restore internal stdout - self.stdout = stdout_orig - doc = result.getvalue() - - # Attempt to locate the first documentation block - doc_block = [] - found_first = False - for doc_line in doc.splitlines(): - str(doc_line).strip() - if len(doc_line.strip()) > 0: - doc_block.append(doc_line.strip()) - found_first = True - else: - if found_first: - break - - for doc_line in doc_block: - self.stdout.write('{: <{col_width}}{doc}\n'.format(command, - col_width=widest, - doc=doc_line)) - command = '' - self.stdout.write("\n") - - def do_shortcuts(self, _): - """Lists shortcuts (aliases) available.""" - result = "\n".join('%s: %s' % (sc[0], sc[1]) for sc in sorted(self.shortcuts)) - self.poutput("Shortcuts for other commands:\n{}\n".format(result)) - - def do_eof(self, _): - """Called when <Ctrl>-D is pressed.""" - # End of script should not exit app, but <Ctrl>-D should. - print('') # Required for clearing line when exiting submenu - return self._STOP_AND_EXIT - - def do_quit(self, _): - """Exits this application.""" - self._should_quit = True - return self._STOP_AND_EXIT - - def select(self, opts, prompt='Your choice? '): - """Presents a numbered menu to the user. Modelled after - the bash shell's SELECT. Returns the item chosen. - - Argument ``opts`` can be: - - | a single string -> will be split into one-word options - | a list of strings -> will be offered as options - | a list of tuples -> interpreted as (value, text), so - that the return value can differ from - the text advertised to the user """ - local_opts = opts - if isinstance(opts, str): - local_opts = list(zip(opts.split(), opts.split())) - fulloptions = [] - for opt in local_opts: - if isinstance(opt, str): - fulloptions.append((opt, opt)) - else: - try: - fulloptions.append((opt[0], opt[1])) - except IndexError: - fulloptions.append((opt[0], opt[0])) - for (idx, (value, text)) in enumerate(fulloptions): - self.poutput(' %2d. %s\n' % (idx + 1, text)) - while True: - response = input(prompt) - hlen = readline.get_current_history_length() - if hlen >= 1 and response != '': - readline.remove_history_item(hlen - 1) - try: - response = int(response) - result = fulloptions[response - 1][0] - break - except (ValueError, IndexError): - self.poutput("{!r} isn't a valid choice. Pick a number between 1 and {}:\n".format(response, - len(fulloptions))) - return result - - def cmdenvironment(self): - """Get a summary report of read-only settings which the user cannot modify at runtime. - - :return: str - summary report of read-only settings which the user cannot modify at runtime - """ - read_only_settings = """ - Commands may be terminated with: {} - Arguments at invocation allowed: {} - Output redirection and pipes allowed: {} - Parsing of command arguments: - Shell lexer mode for command argument splitting: {} - Strip Quotes after splitting arguments: {} - """.format(str(self.terminators), self.allow_cli_args, self.allow_redirection, - "POSIX" if POSIX_SHLEX else "non-POSIX", - "True" if STRIP_QUOTES_FOR_NON_POSIX and not POSIX_SHLEX else "False") - return read_only_settings - - def show(self, args, parameter): - param = '' - if parameter: - param = parameter.strip().lower() - result = {} - maxlen = 0 - for p in self.settable: - if (not param) or p.startswith(param): - result[p] = '%s: %s' % (p, str(getattr(self, p))) - maxlen = max(maxlen, len(result[p])) - if result: - for p in sorted(result): - if args.long: - self.poutput('{} # {}'.format(result[p].ljust(maxlen), self.settable[p])) - else: - self.poutput(result[p]) - - # If user has requested to see all settings, also show read-only settings - if args.all: - self.poutput('\nRead only settings:{}'.format(self.cmdenvironment())) - else: - raise LookupError("Parameter '%s' not supported (type 'show' for list of parameters)." % param) - - set_parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter) - set_parser.add_argument('-a', '--all', action='store_true', help='display read-only settings as well') - set_parser.add_argument('-l', '--long', action='store_true', help='describe function of parameter') - set_parser.add_argument('settable', nargs='*', help='[param_name] [value]') - - @with_argparser(set_parser) - def do_set(self, args): - """Sets a settable parameter or shows current settings of parameters. - - Accepts abbreviated parameter names so long as there is no ambiguity. - Call without arguments for a list of settable parameters with their values. - """ - try: - param_name, val = args.settable - val = val.strip() - param_name = param_name.strip().lower() - if param_name not in self.settable: - hits = [p for p in self.settable if p.startswith(param_name)] - if len(hits) == 1: - param_name = hits[0] - else: - return self.show(args, param_name) - current_val = getattr(self, param_name) - if (val[0] == val[-1]) and val[0] in ("'", '"'): - val = val[1:-1] - else: - val = cast(current_val, val) - setattr(self, param_name, val) - self.poutput('%s - was: %s\nnow: %s\n' % (param_name, current_val, val)) - if current_val != val: - try: - onchange_hook = getattr(self, '_onchange_%s' % param_name) - onchange_hook(old=current_val, new=val) - except AttributeError: - pass - except (ValueError, AttributeError): - param = '' - if args.settable: - param = args.settable[0] - self.show(args, param) - - def do_shell(self, command): - """Execute a command as if at the OS prompt. - - Usage: shell <command> [arguments]""" - - try: - # Use non-POSIX parsing to keep the quotes around the tokens - tokens = shlex.split(command, posix=False) - except ValueError as err: - self.perror(err, traceback_war=False) - return - - # Support expanding ~ in quoted paths - for index, _ in enumerate(tokens): - if tokens[index]: - # Check if the token is quoted. Since shlex.split() passed, there isn't - # an unclosed quote, so we only need to check the first character. - first_char = tokens[index][0] - if first_char in QUOTES: - tokens[index] = strip_quotes(tokens[index]) - - tokens[index] = os.path.expanduser(tokens[index]) - - # Restore the quotes - if first_char in QUOTES: - tokens[index] = first_char + tokens[index] + first_char - - expanded_command = ' '.join(tokens) - proc = subprocess.Popen(expanded_command, stdout=self.stdout, shell=True) - proc.communicate() - - def complete_shell(self, text, line, begidx, endidx): - """Handles tab completion of executable commands and local file system paths for the shell command - - :param text: str - the string prefix we are attempting to match (all returned matches must begin with it) - :param line: str - the current input line with leading whitespace removed - :param begidx: int - the beginning index of the prefix text - :param endidx: int - the ending index of the prefix text - :return: List[str] - a list of possible tab completions - """ - index_dict = {1: self.shell_cmd_complete} - return self.index_based_complete(text, line, begidx, endidx, index_dict, self.path_complete) - - def cmd_with_subs_completer(self, text, line, begidx, endidx): - """ - This is a function provided for convenience to those who want an easy way to add - tab completion to functions that implement subcommands. By setting this as the - completer of the base command function, the correct completer for the chosen subcommand - will be called. - - The use of this function requires assigning a completer function to the subcommand's parser - Example: - A command called print has a subcommands called 'names' that needs a tab completer - When you create the parser for names, include the completer function in the parser's defaults. - - names_parser.set_defaults(func=print_names, completer=complete_print_names) - - To make sure the names completer gets called, set the completer for the print function - in a similar fashion to what follows. - - complete_print = cmd2.Cmd.cmd_with_subs_completer - - When the subcommand's completer is called, this function will have stripped off all content from the - beginning of the command line before the subcommand, meaning the line parameter always starts with the - subcommand name and the index parameters reflect this change. - - For instance, the command "print names -d 2" becomes "names -d 2" - begidx and endidx are incremented accordingly - - :param text: str - the string prefix we are attempting to match (all returned matches must begin with it) - :param line: str - the current input line with leading whitespace removed - :param begidx: int - the beginning index of the prefix text - :param endidx: int - the ending index of the prefix text - :return: List[str] - a list of possible tab completions - """ - # The command is the token at index 0 in the command line - cmd_index = 0 - - # The subcommand is the token at index 1 in the command line - subcmd_index = 1 - - # Get all tokens through the one being completed - tokens, _ = self.tokens_for_completion(line, begidx, endidx) - if tokens is None: - return [] - - matches = [] - - # Get the index of the token being completed - index = len(tokens) - 1 - - # If the token being completed is past the subcommand name, then do subcommand specific tab-completion - if index > subcmd_index: - - # Get the command name - command = tokens[cmd_index] - - # Get the subcommand name - subcommand = tokens[subcmd_index] - - # Find the offset into line where the subcommand name begins - subcmd_start = 0 - for cur_index in range(0, subcmd_index + 1): - cur_token = tokens[cur_index] - subcmd_start = line.find(cur_token, subcmd_start) - - if cur_index != subcmd_index: - subcmd_start += len(cur_token) - - # Strip off everything before subcommand name - orig_line = line - line = line[subcmd_start:] - - # Update the indexes - diff = len(orig_line) - len(line) - begidx -= diff - endidx -= diff - - # Call the subcommand specific completer if it exists - compfunc = self.get_subcommand_completer(command, subcommand) - if compfunc is not None: - matches = compfunc(self, text, line, begidx, endidx) - - return matches - - # noinspection PyBroadException - def do_py(self, arg): - """ - Invoke python command, shell, or script - - py <command>: Executes a Python command. - py: Enters interactive Python mode. - End with ``Ctrl-D`` (Unix) / ``Ctrl-Z`` (Windows), ``quit()``, '`exit()``. - Non-python commands can be issued with ``cmd("your command")``. - Run python code from external script files with ``run("script.py")`` - """ - if self._in_py: - self.perror("Recursively entering interactive Python consoles is not allowed.", traceback_war=False) - return - self._in_py = True - - try: - self.pystate['self'] = self - arg = arg.strip() - - # Support the run command even if called prior to invoking an interactive interpreter - def run(filename): - """Run a Python script file in the interactive console. - - :param filename: str - filename of *.py script file to run - """ - try: - with open(filename) as f: - interp.runcode(f.read()) - except IOError as e: - self.perror(e) - - def onecmd_plus_hooks(cmd_plus_args): - """Run a cmd2.Cmd command from a Python script or the interactive Python console. - - :param cmd_plus_args: str - command line including command and arguments to run - :return: bool - True if cmdloop() should exit once leaving the interactive Python console - """ - return self.onecmd_plus_hooks(cmd_plus_args + '\n') - - self.pystate['run'] = run - self.pystate['cmd'] = onecmd_plus_hooks - - localvars = (self.locals_in_py and self.pystate) or {} - interp = InteractiveConsole(locals=localvars) - interp.runcode('import sys, os;sys.path.insert(0, os.getcwd())') - - if arg: - interp.runcode(arg) - else: - # noinspection PyShadowingBuiltins - def quit(): - """Function callable from the interactive Python console to exit that environment""" - raise EmbeddedConsoleExit - - self.pystate['quit'] = quit - self.pystate['exit'] = quit - - keepstate = None - try: - cprt = 'Type "help", "copyright", "credits" or "license" for more information.' - keepstate = Statekeeper(sys, ('stdin', 'stdout')) - sys.stdout = self.stdout - sys.stdin = self.stdin - interp.interact(banner="Python %s on %s\n%s\n(%s)\n%s" % - (sys.version, sys.platform, cprt, self.__class__.__name__, - self.do_py.__doc__)) - except EmbeddedConsoleExit: - pass - if keepstate is not None: - keepstate.restore() - except Exception: - pass - finally: - self._in_py = False - return self._should_quit - - @with_argument_list - def do_pyscript(self, arglist): - """\nRuns a python script file inside the console - - Usage: pyscript <script_path> [script_arguments] - -Console commands can be executed inside this script with cmd("your command") -However, you cannot run nested "py" or "pyscript" commands from within this script -Paths or arguments that contain spaces must be enclosed in quotes -""" - if not arglist: - self.perror("pyscript command requires at least 1 argument ...", traceback_war=False) - self.do_help('pyscript') - return - - # Get the absolute path of the script - script_path = os.path.expanduser(arglist[0]) - - # Save current command line arguments - orig_args = sys.argv - - # Overwrite sys.argv to allow the script to take command line arguments - sys.argv = [script_path] - sys.argv.extend(arglist[1:]) - - # Run the script - use repr formatting to escape things which need to be escaped to prevent issues on Windows - self.do_py("run({!r})".format(script_path)) - - # Restore command line arguments to original state - sys.argv = orig_args - - # Enable tab-completion for pyscript command - def complete_pyscript(self, text, line, begidx, endidx): - index_dict = {1: self.path_complete} - return self.index_based_complete(text, line, begidx, endidx, index_dict) - - # Only include the do_ipy() method if IPython is available on the system - if ipython_available: - # noinspection PyMethodMayBeStatic,PyUnusedLocal - def do_ipy(self, arg): - """Enters an interactive IPython shell. - - Run python code from external files with ``run filename.py`` - End with ``Ctrl-D`` (Unix) / ``Ctrl-Z`` (Windows), ``quit()``, '`exit()``. - """ - banner = 'Entering an embedded IPython shell type quit() or <Ctrl>-d to exit ...' - exit_msg = 'Leaving IPython, back to {}'.format(sys.argv[0]) - embed(banner1=banner, exit_msg=exit_msg) - - history_parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter) - history_parser_group = history_parser.add_mutually_exclusive_group() - history_parser_group.add_argument('-r', '--run', action='store_true', help='run selected history items') - history_parser_group.add_argument('-e', '--edit', action='store_true', - help='edit and then run selected history items') - history_parser_group.add_argument('-s', '--script', action='store_true', help='script format; no separation lines') - history_parser_group.add_argument('-o', '--output-file', metavar='FILE', help='output commands to a script file') - history_parser_group.add_argument('-t', '--transcript', help='output commands and results to a transcript file') - _history_arg_help = """empty all history items -a one history item by number -a..b, a:b, a:, ..b items by indices (inclusive) -[string] items containing string -/regex/ items matching regular expression""" - history_parser.add_argument('arg', nargs='?', help=_history_arg_help) - - @with_argparser(history_parser) - def do_history(self, args): - """View, run, edit, and save previously entered commands.""" - # If an argument was supplied, then retrieve partial contents of the history - cowardly_refuse_to_run = False - if args.arg: - # If a character indicating a slice is present, retrieve - # a slice of the history - arg = args.arg - if '..' in arg or ':' in arg: - try: - # Get a slice of history - history = self.history.span(arg) - except IndexError: - history = self.history.get(arg) - else: - # Get item(s) from history by index or string search - history = self.history.get(arg) - else: - # If no arg given, then retrieve the entire history - cowardly_refuse_to_run = True - # Get a copy of the history so it doesn't get mutated while we are using it - history = self.history[:] - - if args.run: - if cowardly_refuse_to_run: - self.perror("Cowardly refusing to run all previously entered commands.", traceback_war=False) - self.perror("If this is what you want to do, specify '1:' as the range of history.", - traceback_war=False) - else: - for runme in history: - self.pfeedback(runme) - if runme: - self.onecmd_plus_hooks(runme) - elif args.edit: - fd, fname = tempfile.mkstemp(suffix='.txt', text=True) - with os.fdopen(fd, 'w') as fobj: - for command in history: - fobj.write('{}\n'.format(command)) - try: - os.system('"{}" "{}"'.format(self.editor, fname)) - self.do_load(fname) - except Exception: - raise - finally: - os.remove(fname) - elif args.output_file: - try: - with open(os.path.expanduser(args.output_file), 'w') as fobj: - for command in history: - fobj.write('{}\n'.format(command)) - plural = 's' if len(history) > 1 else '' - self.pfeedback('{} command{} saved to {}'.format(len(history), plural, args.output_file)) - except Exception as e: - self.perror('Saving {!r} - {}'.format(args.output_file, e), traceback_war=False) - elif args.transcript: - # Make sure echo is on so commands print to standard out - saved_echo = self.echo - self.echo = True - - # Redirect stdout to the transcript file - saved_self_stdout = self.stdout - self.stdout = open(args.transcript, 'w') - - # Run all of the commands in the history with output redirected to transcript and echo on - self.runcmds_plus_hooks(history) - - # Restore stdout to its original state - self.stdout.close() - self.stdout = saved_self_stdout - - # Set echo back to its original state - self.echo = saved_echo - - # Post-process the file to escape un-escaped "/" regex escapes - with open(args.transcript, 'r') as fin: - data = fin.read() - post_processed_data = data.replace('/', '\/') - with open(args.transcript, 'w') as fout: - fout.write(post_processed_data) - - plural = 's' if len(history) > 1 else '' - self.pfeedback('{} command{} and outputs saved to transcript file {!r}'.format(len(history), plural, - args.transcript)) - else: - # Display the history items retrieved - for hi in history: - if args.script: - self.poutput(hi) - else: - self.poutput(hi.pr()) - - @with_argument_list - def do_edit(self, arglist): - """Edit a file in a text editor. - -Usage: edit [file_path] - Where: - * file_path - path to a file to open in editor - -The editor used is determined by the ``editor`` settable parameter. -"set editor (program-name)" to change or set the EDITOR environment variable. -""" - if not self.editor: - raise EnvironmentError("Please use 'set editor' to specify your text editing program of choice.") - filename = arglist[0] if arglist else '' - if filename: - os.system('"{}" "{}"'.format(self.editor, filename)) - else: - os.system('"{}"'.format(self.editor)) - - # Enable tab-completion for edit command - def complete_edit(self, text, line, begidx, endidx): - index_dict = {1: self.path_complete} - return self.index_based_complete(text, line, begidx, endidx, index_dict) - - @property - def _current_script_dir(self): - """Accessor to get the current script directory from the _script_dir LIFO queue.""" - if self._script_dir: - return self._script_dir[-1] - else: - return None - - @with_argument_list - def do__relative_load(self, arglist): - """Runs commands in script file that is encoded as either ASCII or UTF-8 text. - - Usage: _relative_load <file_path> - - optional argument: - file_path a file path pointing to a script - -Script should contain one command per line, just like command would be typed in console. - -If this is called from within an already-running script, the filename will be interpreted -relative to the already-running script's directory. - -NOTE: This command is intended to only be used within text file scripts. - """ - # If arg is None or arg is an empty string this is an error - if not arglist: - self.perror('_relative_load command requires a file path:', traceback_war=False) - return - - file_path = arglist[0].strip() - # NOTE: Relative path is an absolute path, it is just relative to the current script directory - relative_path = os.path.join(self._current_script_dir or '', file_path) - self.do_load(relative_path) - - def do_eos(self, _): - """Handles cleanup when a script has finished executing.""" - if self._script_dir: - self._script_dir.pop() - - @with_argument_list - def do_load(self, arglist): - """Runs commands in script file that is encoded as either ASCII or UTF-8 text. - - Usage: load <file_path> - - * file_path - a file path pointing to a script - -Script should contain one command per line, just like command would be typed in console. - """ - # If arg is None or arg is an empty string this is an error - if not arglist: - self.perror('load command requires a file path:', traceback_war=False) - return - - file_path = arglist[0].strip() - expanded_path = os.path.abspath(os.path.expanduser(file_path)) - - # Make sure expanded_path points to a file - if not os.path.isfile(expanded_path): - self.perror('{} does not exist or is not a file'.format(expanded_path), traceback_war=False) - return - - # Make sure the file is not empty - if os.path.getsize(expanded_path) == 0: - self.perror('{} is empty'.format(expanded_path), traceback_war=False) - return - - # Make sure the file is ASCII or UTF-8 encoded text - if not self.is_text_file(expanded_path): - self.perror('{} is not an ASCII or UTF-8 encoded text file'.format(expanded_path), traceback_war=False) - return - - try: - # Read all lines of the script and insert into the head of the - # command queue. Add an "end of script (eos)" command to cleanup the - # self._script_dir list when done. - with open(expanded_path, encoding='utf-8') as target: - self.cmdqueue = target.read().splitlines() + ['eos'] + self.cmdqueue - except IOError as e: - self.perror('Problem accessing script from {}:\n{}'.format(expanded_path, e)) - return - - self._script_dir.append(os.path.dirname(expanded_path)) - - # Enable tab-completion for load command - def complete_load(self, text, line, begidx, endidx): - index_dict = {1: self.path_complete} - return self.index_based_complete(text, line, begidx, endidx, index_dict) - - @staticmethod - def is_text_file(file_path): - """ - Returns if a file contains only ASCII or UTF-8 encoded text - :param file_path: path to the file being checked - """ - expanded_path = os.path.abspath(os.path.expanduser(file_path.strip())) - valid_text_file = False - - # Check if the file is ASCII - try: - with codecs.open(expanded_path, encoding='ascii', errors='strict') as f: - # Make sure the file has at least one line of text - # noinspection PyUnusedLocal - if sum(1 for line in f) > 0: - valid_text_file = True - except IOError: - pass - except UnicodeDecodeError: - # The file is not ASCII. Check if it is UTF-8. - try: - with codecs.open(expanded_path, encoding='utf-8', errors='strict') as f: - # Make sure the file has at least one line of text - # noinspection PyUnusedLocal - if sum(1 for line in f) > 0: - valid_text_file = True - except IOError: - pass - except UnicodeDecodeError: - # Not UTF-8 - pass - - return valid_text_file - - def run_transcript_tests(self, callargs): - """Runs transcript tests for provided file(s). - - This is called when either -t is provided on the command line or the transcript_files argument is provided - during construction of the cmd2.Cmd instance. - - :param callargs: List[str] - list of transcript test file names - """ - class TestMyAppCase(Cmd2TestCase): - cmdapp = self - - self.__class__.testfiles = callargs - sys.argv = [sys.argv[0]] # the --test argument upsets unittest.main() - testcase = TestMyAppCase() - runner = unittest.TextTestRunner() - runner.run(testcase) - - def cmdloop(self, intro=None): - """This is an outer wrapper around _cmdloop() which deals with extra features provided by cmd2. - - _cmdloop() provides the main loop equivalent to cmd.cmdloop(). This is a wrapper around that which deals with - the following extra features provided by cmd2: - - commands at invocation - - transcript testing - - intro banner - - :param intro: str - if provided this overrides self.intro and serves as the intro banner printed once at start - """ - if self.allow_cli_args: - parser = argparse.ArgumentParser() - parser.add_argument('-t', '--test', action="store_true", - help='Test against transcript(s) in FILE (wildcards OK)') - callopts, callargs = parser.parse_known_args() - - # If transcript testing was called for, use other arguments as transcript files - if callopts.test: - self._transcript_files = callargs - - # If commands were supplied at invocation, then add them to the command queue - if callargs: - self.cmdqueue.extend(callargs) - - # Always run the preloop first - self.preloop() - - # If transcript-based regression testing was requested, then do that instead of the main loop - if self._transcript_files is not None: - self.run_transcript_tests(self._transcript_files) - else: - # If an intro was supplied in the method call, allow it to override the default - if intro is not None: - self.intro = intro - - # Print the intro, if there is one, right after the preloop - if self.intro is not None: - self.poutput(str(self.intro) + "\n") - - # And then call _cmdloop() to enter the main loop - self._cmdloop() - - # Run the postloop() no matter what - self.postloop() - - -# noinspection PyPep8Naming -class ParserManager: - """ - Class which encapsulates all of the pyparsing parser functionality for cmd2 in a single location. - """ - def __init__(self, redirector, terminators, multilineCommands, legalChars, commentGrammars, commentInProgress, - blankLinesAllowed, prefixParser, preparse, postparse, aliases, shortcuts): - """Creates and uses parsers for user input according to app's parameters.""" - - self.commentGrammars = commentGrammars - self.preparse = preparse - self.postparse = postparse - self.aliases = aliases - self.shortcuts = shortcuts - - self.main_parser = self._build_main_parser(redirector=redirector, terminators=terminators, - multilineCommands=multilineCommands, legalChars=legalChars, - commentInProgress=commentInProgress, - blankLinesAllowed=blankLinesAllowed, prefixParser=prefixParser) - self.input_source_parser = self._build_input_source_parser(legalChars=legalChars, - commentInProgress=commentInProgress) - - def _build_main_parser(self, redirector, terminators, multilineCommands, legalChars, commentInProgress, - blankLinesAllowed, prefixParser): - """Builds a PyParsing parser for interpreting user commands.""" - - # Build several parsing components that are eventually compiled into overall parser - output_destination_parser = (pyparsing.Literal(redirector * 2) | - (pyparsing.WordStart() + redirector) | - pyparsing.Regex('[^=]' + redirector))('output') - - terminator_parser = pyparsing.Or( - [(hasattr(t, 'parseString') and t) or pyparsing.Literal(t) for t in terminators])('terminator') - string_end = pyparsing.stringEnd ^ '\nEOF' - multilineCommand = pyparsing.Or( - [pyparsing.Keyword(c, caseless=False) for c in multilineCommands])('multilineCommand') - oneline_command = (~multilineCommand + pyparsing.Word(legalChars))('command') - pipe = pyparsing.Keyword('|', identChars='|') - do_not_parse = self.commentGrammars | commentInProgress | pyparsing.quotedString - after_elements = \ - pyparsing.Optional(pipe + pyparsing.SkipTo(output_destination_parser ^ string_end, - ignore=do_not_parse)('pipeTo')) + \ - pyparsing.Optional(output_destination_parser + - pyparsing.SkipTo(string_end, ignore=do_not_parse). - setParseAction(lambda x: strip_quotes(x[0].strip()))('outputTo')) - - multilineCommand.setParseAction(lambda x: x[0]) - oneline_command.setParseAction(lambda x: x[0]) - - if blankLinesAllowed: - blankLineTerminationParser = pyparsing.NoMatch - else: - blankLineTerminator = (pyparsing.lineEnd + pyparsing.lineEnd)('terminator') - blankLineTerminator.setResultsName('terminator') - blankLineTerminationParser = ((multilineCommand ^ oneline_command) + - pyparsing.SkipTo(blankLineTerminator, ignore=do_not_parse).setParseAction( - lambda x: x[0].strip())('args') + blankLineTerminator)('statement') - - multilineParser = (((multilineCommand ^ oneline_command) + - pyparsing.SkipTo(terminator_parser, - ignore=do_not_parse).setParseAction(lambda x: x[0].strip())('args') + - terminator_parser)('statement') + - pyparsing.SkipTo(output_destination_parser ^ pipe ^ string_end, - ignore=do_not_parse).setParseAction(lambda x: x[0].strip())('suffix') + - after_elements) - multilineParser.ignore(commentInProgress) - - singleLineParser = ((oneline_command + - pyparsing.SkipTo(terminator_parser ^ string_end ^ pipe ^ output_destination_parser, - ignore=do_not_parse).setParseAction( - lambda x: x[0].strip())('args'))('statement') + - pyparsing.Optional(terminator_parser) + after_elements) - - blankLineTerminationParser = blankLineTerminationParser.setResultsName('statement') - - parser = prefixParser + ( - string_end | - multilineParser | - singleLineParser | - blankLineTerminationParser | - multilineCommand + pyparsing.SkipTo(string_end, ignore=do_not_parse) - ) - parser.ignore(self.commentGrammars) - return parser - - @staticmethod - def _build_input_source_parser(legalChars, commentInProgress): - """Builds a PyParsing parser for alternate user input sources (from file, pipe, etc.)""" - - input_mark = pyparsing.Literal('<') - input_mark.setParseAction(lambda x: '') - - # Also allow spaces, slashes, and quotes - file_name = pyparsing.Word(legalChars + ' /\\"\'') - - input_from = file_name('inputFrom') - input_from.setParseAction(replace_with_file_contents) - # a not-entirely-satisfactory way of distinguishing < as in "import from" from < - # as in "lesser than" - inputParser = input_mark + pyparsing.Optional(input_from) + pyparsing.Optional('>') + \ - pyparsing.Optional(file_name) + (pyparsing.stringEnd | '|') - inputParser.ignore(commentInProgress) - return inputParser - - def parsed(self, raw): - """ This function is where the actual parsing of each line occurs. - - :param raw: str - the line of text as it was entered - :return: ParsedString - custom subclass of str with extra attributes - """ - if isinstance(raw, ParsedString): - p = raw - else: - # preparse is an overridable hook; default makes no changes - s = self.preparse(raw) - s = self.input_source_parser.transformString(s.lstrip()) - s = self.commentGrammars.transformString(s) - - # Make a copy of aliases so we can edit it - tmp_aliases = list(self.aliases.keys()) - keep_expanding = len(tmp_aliases) > 0 - - # Expand aliases - while keep_expanding: - for cur_alias in tmp_aliases: - keep_expanding = False - - if s == cur_alias or s.startswith(cur_alias + ' '): - s = s.replace(cur_alias, self.aliases[cur_alias], 1) - - # Do not expand the same alias more than once - tmp_aliases.remove(cur_alias) - keep_expanding = len(tmp_aliases) > 0 - break - - # Expand command shortcut to its full command name - for (shortcut, expansion) in self.shortcuts: - if s.startswith(shortcut): - # If the next character after the shortcut isn't a space, then insert one - shortcut_len = len(shortcut) - if len(s) == shortcut_len or s[shortcut_len] != ' ': - expansion += ' ' - - # Expand the shortcut - s = s.replace(shortcut, expansion, 1) - break - - try: - result = self.main_parser.parseString(s) - except pyparsing.ParseException: - # If we have a parsing failure, treat it is an empty command and move to next prompt - result = self.main_parser.parseString('') - result['raw'] = raw - result['command'] = result.multilineCommand or result.command - result = self.postparse(result) - p = ParsedString(result.args) - p.parsed = result - p.parser = self.parsed - return p - - -class HistoryItem(str): - """Class used to represent an item in the History list. - - Thin wrapper around str class which adds a custom format for printing. It - also keeps track of its index in the list as well as a lowercase - representation of itself for convenience/efficiency. - - """ - listformat = '-------------------------[{}]\n{}\n' - - # noinspection PyUnusedLocal - def __init__(self, instr): - str.__init__(self) - self.lowercase = self.lower() - self.idx = None - - def pr(self): - """Represent a HistoryItem in a pretty fashion suitable for printing. - - :return: str - pretty print string version of a HistoryItem - """ - return self.listformat.format(self.idx, str(self).rstrip()) - - -class History(list): - """ A list of HistoryItems that knows how to respond to user requests. """ - - # noinspection PyMethodMayBeStatic - def _zero_based_index(self, onebased): - result = onebased - if result > 0: - result -= 1 - return result - - def _to_index(self, raw): - if raw: - result = self._zero_based_index(int(raw)) - else: - result = None - return result - - spanpattern = re.compile(r'^\s*(?P<start>-?\d+)?\s*(?P<separator>:|(\.{2,}))?\s*(?P<end>-?\d+)?\s*$') - - def span(self, raw): - """Parses the input string search for a span pattern and if if found, returns a slice from the History list. - - :param raw: str - string potentially containing a span of the forms a..b, a:b, a:, ..b - :return: List[HistoryItem] - slice from the History list - """ - if raw.lower() in ('*', '-', 'all'): - raw = ':' - results = self.spanpattern.search(raw) - if not results: - raise IndexError - if not results.group('separator'): - return [self[self._to_index(results.group('start'))]] - start = self._to_index(results.group('start')) or 0 # Ensure start is not None - end = self._to_index(results.group('end')) - reverse = False - if end is not None: - if end < start: - (start, end) = (end, start) - reverse = True - end += 1 - result = self[start:end] - if reverse: - result.reverse() - return result - - rangePattern = re.compile(r'^\s*(?P<start>[\d]+)?\s*-\s*(?P<end>[\d]+)?\s*$') - - def append(self, new): - """Append a HistoryItem to end of the History list - - :param new: str - command line to convert to HistoryItem and add to the end of the History list - """ - new = HistoryItem(new) - list.append(self, new) - new.idx = len(self) - - def get(self, getme=None): - """Get an item or items from the History list using 1-based indexing. - - :param getme: int or str - item(s) to get - either an integer index or string to search for - :return: List[str] - list of HistoryItems matching the retrieval criteria - """ - if not getme: - return self - try: - getme = int(getme) - if getme < 0: - return self[:(-1 * getme)] - else: - return [self[getme - 1]] - except IndexError: - return [] - except ValueError: - range_result = self.rangePattern.search(getme) - if range_result: - start = range_result.group('start') or None - end = range_result.group('start') or None - if start: - start = int(start) - 1 - if end: - end = int(end) - return self[start:end] - - # noinspection PyUnresolvedReferences - getme = getme.strip() - - if getme.startswith(r'/') and getme.endswith(r'/'): - finder = re.compile(getme[1:-1], re.DOTALL | re.MULTILINE | re.IGNORECASE) - - def isin(hi): - """Listcomp filter function for doing a regular expression search of History. - - :param hi: HistoryItem - :return: bool - True if search matches - """ - return finder.search(hi) - else: - def isin(hi): - """Listcomp filter function for doing a case-insensitive string search of History. - - :param hi: HistoryItem - :return: bool - True if search matches - """ - return getme.lower() in hi.lowercase - return [itm for itm in self if isin(itm)] - - -def cast(current, new): - """Tries to force a new value into the same type as the current when trying to set the value for a parameter. - - :param current: current value for the parameter, type varies - :param new: str - new value - :return: new value with same type as current, or the current value if there was an error casting - """ - typ = type(current) - if typ == bool: - try: - return bool(int(new)) - except (ValueError, TypeError): - pass - try: - new = new.lower() - except AttributeError: - pass - if (new == 'on') or (new[0] in ('y', 't')): - return True - if (new == 'off') or (new[0] in ('n', 'f')): - return False - else: - try: - return typ(new) - except (ValueError, TypeError): - pass - print("Problem setting parameter (now %s) to %s; incorrect type?" % (current, new)) - return current - - -class Statekeeper(object): - """Class used to save and restore state during load and py commands as well as when redirecting output or pipes.""" - def __init__(self, obj, attribs): - """Use the instance attributes as a generic key-value store to copy instance attributes from outer object. - - :param obj: instance of cmd2.Cmd derived class (your application instance) - :param attribs: Tuple[str] - tuple of strings listing attributes of obj to save a copy of - """ - self.obj = obj - self.attribs = attribs - if self.obj: - self._save() - - def _save(self): - """Create copies of attributes from self.obj inside this Statekeeper instance.""" - for attrib in self.attribs: - setattr(self, attrib, getattr(self.obj, attrib)) - - def restore(self): - """Overwrite attributes in self.obj with the saved values stored in this Statekeeper instance.""" - if self.obj: - for attrib in self.attribs: - setattr(self.obj, attrib, getattr(self, attrib)) - - -class OutputTrap(object): - """Instantiate an OutputTrap to divert/capture ALL stdout output. For use in transcript testing.""" - - def __init__(self): - self.contents = '' - - def write(self, txt): - """Add text to the internal contents. - - :param txt: str - """ - self.contents += txt - - def read(self): - """Read from the internal contents and then clear them out. - - :return: str - text from the internal contents - """ - result = self.contents - self.contents = '' - return result - - -class Cmd2TestCase(unittest.TestCase): - """Subclass this, setting CmdApp, to make a unittest.TestCase class - that will execute the commands in a transcript file and expect the results shown. - See example.py""" - cmdapp = None - - def fetchTranscripts(self): - self.transcripts = {} - for fileset in self.cmdapp.testfiles: - for fname in glob.glob(fileset): - tfile = open(fname) - self.transcripts[fname] = iter(tfile.readlines()) - tfile.close() - if not len(self.transcripts): - raise Exception("No test files found - nothing to test.") - - def setUp(self): - if self.cmdapp: - self.fetchTranscripts() - - # Trap stdout - self._orig_stdout = self.cmdapp.stdout - self.cmdapp.stdout = OutputTrap() - - def runTest(self): # was testall - if self.cmdapp: - its = sorted(self.transcripts.items()) - for (fname, transcript) in its: - self._test_transcript(fname, transcript) - - def _test_transcript(self, fname, transcript): - line_num = 0 - finished = False - line = strip_ansi(next(transcript)) - line_num += 1 - while not finished: - # Scroll forward to where actual commands begin - while not line.startswith(self.cmdapp.visible_prompt): - try: - line = strip_ansi(next(transcript)) - except StopIteration: - finished = True - break - line_num += 1 - command = [line[len(self.cmdapp.visible_prompt):]] - line = next(transcript) - # Read the entirety of a multi-line command - while line.startswith(self.cmdapp.continuation_prompt): - command.append(line[len(self.cmdapp.continuation_prompt):]) - try: - line = next(transcript) - except StopIteration: - raise (StopIteration, - 'Transcript broke off while reading command beginning at line {} with\n{}'.format(line_num, - command[0]) - ) - line_num += 1 - command = ''.join(command) - # Send the command into the application and capture the resulting output - # TODO: Should we get the return value and act if stop == True? - self.cmdapp.onecmd_plus_hooks(command) - result = self.cmdapp.stdout.read() - # Read the expected result from transcript - if strip_ansi(line).startswith(self.cmdapp.visible_prompt): - message = '\nFile {}, line {}\nCommand was:\n{}\nExpected: (nothing)\nGot:\n{}\n'.format( - fname, line_num, command, result) - self.assert_(not (result.strip()), message) - continue - expected = [] - while not strip_ansi(line).startswith(self.cmdapp.visible_prompt): - expected.append(line) - try: - line = next(transcript) - except StopIteration: - finished = True - break - line_num += 1 - expected = ''.join(expected) - - # transform the expected text into a valid regular expression - expected = self._transform_transcript_expected(expected) - message = '\nFile {}, line {}\nCommand was:\n{}\nExpected:\n{}\nGot:\n{}\n'.format( - fname, line_num, command, expected, result) - self.assertTrue(re.match(expected, result, re.MULTILINE | re.DOTALL), message) - - def _transform_transcript_expected(self, s): - """parse the string with slashed regexes into a valid regex - - Given a string like: - - Match a 10 digit phone number: /\d{3}-\d{3}-\d{4}/ - - Turn it into a valid regular expression which matches the literal text - of the string and the regular expression. We have to remove the slashes - because they differentiate between plain text and a regular expression. - Unless the slashes are escaped, in which case they are interpreted as - plain text, or there is only one slash, which is treated as plain text - also. - - Check the tests in tests/test_transcript.py to see all the edge - cases. - """ - regex = '' - start = 0 - - while True: - (regex, first_slash_pos, start) = self._escaped_find(regex, s, start, False) - if first_slash_pos == -1: - # no more slashes, add the rest of the string and bail - regex += re.escape(s[start:]) - break - else: - # there is a slash, add everything we have found so far - # add stuff before the first slash as plain text - regex += re.escape(s[start:first_slash_pos]) - start = first_slash_pos+1 - # and go find the next one - (regex, second_slash_pos, start) = self._escaped_find(regex, s, start, True) - if second_slash_pos > 0: - # add everything between the slashes (but not the slashes) - # as a regular expression - regex += s[start:second_slash_pos] - # and change where we start looking for slashed on the - # turn through the loop - start = second_slash_pos + 1 - else: - # No closing slash, we have to add the first slash, - # and the rest of the text - regex += re.escape(s[start-1:]) - break - return regex - - @staticmethod - def _escaped_find(regex, s, start, in_regex): - """ - Find the next slash in {s} after {start} that is not preceded by a backslash. - - If we find an escaped slash, add everything up to and including it to regex, - updating {start}. {start} therefore serves two purposes, tells us where to start - looking for the next thing, and also tells us where in {s} we have already - added things to {regex} - - {in_regex} specifies whether we are currently searching in a regex, we behave - differently if we are or if we aren't. - """ - - while True: - pos = s.find('/', start) - if pos == -1: - # no match, return to caller - break - elif pos == 0: - # slash at the beginning of the string, so it can't be - # escaped. We found it. - break - else: - # check if the slash is preceeded by a backslash - if s[pos-1:pos] == '\\': - # it is. - if in_regex: - # add everything up to the backslash as a - # regular expression - regex += s[start:pos-1] - # skip the backslash, and add the slash - regex += s[pos] - else: - # add everything up to the backslash as escaped - # plain text - regex += re.escape(s[start:pos-1]) - # and then add the slash as escaped - # plain text - regex += re.escape(s[pos]) - # update start to show we have handled everything - # before it - start = pos+1 - # and continue to look - else: - # slash is not escaped, this is what we are looking for - break - return regex, pos, start - - def tearDown(self): - if self.cmdapp: - # Restore stdout - self.cmdapp.stdout = self._orig_stdout - - -def namedtuple_with_two_defaults(typename, field_names, default_values=('', '')): - """Wrapper around namedtuple which lets you treat the last value as optional. - - :param typename: str - type name for the Named tuple - :param field_names: List[str] or space-separated string of field names - :param default_values: (optional) 2-element tuple containing the default values for last 2 parameters in named tuple - Defaults to an empty string for both of them - :return: namedtuple type - """ - T = collections.namedtuple(typename, field_names) - # noinspection PyUnresolvedReferences - T.__new__.__defaults__ = default_values - return T - - -class CmdResult(namedtuple_with_two_defaults('CmdResult', ['out', 'err', 'war'])): - """Derive a class to store results from a named tuple so we can tweak dunder methods for convenience. - - This is provided as a convenience and an example for one possible way for end users to store results in - the self._last_result attribute of cmd2.Cmd class instances. See the "python_scripting.py" example for how it can - be used to enable conditional control flow. - - Named tuple attributes - ---------------------- - out - this is intended to store normal output data from the command and can be of any type that makes sense - err: str - (optional) this is intended to store an error message and it being non-empty indicates there was an error - Defaults to an empty string - war: str - (optional) this is intended to store a warning message which isn't quite an error, but of note - Defaults to an empty string. - - NOTE: Named tuples are immutable. So the contents are there for access, not for modification. - """ - def __bool__(self): - """If err is an empty string, treat the result as a success; otherwise treat it as a failure.""" - return not self.err - - -if __name__ == '__main__': - # If run as the main application, simply start a bare-bones cmd2 application with only built-in functionality. - - # Set "use_ipython" to True to include the ipy command if IPython is installed, which supports advanced interactive - # debugging of your application via introspection on self. - app = Cmd(use_ipython=False) - app.cmdloop() |