#!/usr/bin/env python # coding=utf-8 """Variant on standard library's cmd with extra features. To use, simply import cmd2.Cmd instead of cmd.Cmd; use precisely as though you were using the standard library's cmd, while enjoying the extra features. Searchable command history (commands: "history") Load commands from file, save to file, edit commands in file Multi-line commands Special-character shortcut commands (beyond cmd's "@" and "!") Settable environment parameters Parsing commands with `argparse` argument parsers (flags) Redirection to file with >, >>; input from file with < Easy transcript-based testing of applications (see examples/example.py) Bash-style ``select`` available Note that redirection with > and | will only work if `self.poutput()` is used in place of `print`. - Catherine Devlin, Jan 03 2008 - catherinedevlin.blogspot.com Git repository on GitHub at https://github.com/python-cmd2/cmd2 """ import argparse import atexit import cmd import codecs import collections import copy import datetime import functools import glob import io from io import StringIO import os import platform import re import shlex import signal import subprocess import sys import tempfile import traceback import unittest from code import InteractiveConsole import pyparsing import pyperclip # Set up readline from .rl_utils import rl_force_redisplay, readline, rl_type, RlType if rl_type == RlType.PYREADLINE: # Save the original pyreadline display completion function since we need to override it and restore it # noinspection PyProtectedMember orig_pyreadline_display = readline.rl.mode._display_completions elif rl_type == RlType.GNU: # We need wcswidth to calculate display width of tab completions from wcwidth import wcswidth # Get the readline lib so we can make changes to it import ctypes from .rl_utils import readline_lib # Save address that rl_basic_quote_characters is pointing to since we need to override and restore it rl_basic_quote_characters = ctypes.c_char_p.in_dll(readline_lib, "rl_basic_quote_characters") orig_rl_basic_quote_characters_addr = ctypes.cast(rl_basic_quote_characters, ctypes.c_void_p).value # Newer versions of pyperclip are released as a single file, but older versions had a more complicated structure try: from pyperclip.exceptions import PyperclipException except ImportError: # noinspection PyUnresolvedReferences from pyperclip import PyperclipException # Collection is a container that is sizable and iterable # It was introduced in Python 3.6. We will try to import it, otherwise use our implementation try: from collections.abc import Collection, Iterable except ImportError: from collections.abc import Sized, Iterable, Container # noinspection PyAbstractClass class Collection(Sized, Iterable, Container): __slots__ = () # noinspection PyPep8Naming @classmethod def __subclasshook__(cls, C): if cls is Collection: if any("__len__" in B.__dict__ for B in C.__mro__) and \ any("__iter__" in B.__dict__ for B in C.__mro__) and \ any("__contains__" in B.__dict__ for B in C.__mro__): return True return NotImplemented # Python 3.4 require contextlib2 for temporarily redirecting stderr and stdout if sys.version_info < (3, 5): from contextlib2 import redirect_stdout, redirect_stderr else: from contextlib import redirect_stdout, redirect_stderr # Detect whether IPython is installed to determine if the built-in "ipy" command should be included ipython_available = True try: # noinspection PyUnresolvedReferences,PyPackageRequirements from IPython import embed except ImportError: ipython_available = False __version__ = '0.9.0' # Pyparsing enablePackrat() can greatly speed up parsing, but problems have been seen in Python 3 in the past pyparsing.ParserElement.enablePackrat() # Override the default whitespace chars in Pyparsing so that newlines are not treated as whitespace pyparsing.ParserElement.setDefaultWhitespaceChars(' \t') # The next 2 variables and associated setter functions effect how arguments are parsed for decorated commands # which use one of the decorators: @with_argument_list, @with_argparser, or @with_argparser_and_unknown_args # The defaults are sane and maximize ease of use for new applications based on cmd2. # Use POSIX or Non-POSIX (Windows) rules for splitting a command-line string into a list of arguments via shlex.split() POSIX_SHLEX = False # Strip outer quotes for convenience if POSIX_SHLEX = False STRIP_QUOTES_FOR_NON_POSIX = True # Used for tab completion and word breaks. Do not change. QUOTES = ['"', "'"] REDIRECTION_CHARS = ['|', '<', '>'] # optional attribute, when tagged on a function, allows cmd2 to categorize commands HELP_CATEGORY = 'help_category' HELP_SUMMARY = 'help_summary' def categorize(func, category): """Categorize a function. The help command output will group this function under the specified category heading :param func: Union[Callable, Iterable] - function to categorize :param category: str - category to put it in """ if isinstance(func, Iterable): for item in func: setattr(item, HELP_CATEGORY, category) else: setattr(func, HELP_CATEGORY, category) def set_posix_shlex(val): """ Allows user of cmd2 to choose between POSIX and non-POSIX splitting of args for decorated commands. :param val: bool - True => POSIX, False => Non-POSIX """ global POSIX_SHLEX POSIX_SHLEX = val def set_strip_quotes(val): """ Allows user of cmd2 to choose whether to automatically strip outer-quotes when POSIX_SHLEX is False. :param val: bool - True => strip quotes on args for decorated commands if POSIX_SHLEX is False. """ global STRIP_QUOTES_FOR_NON_POSIX STRIP_QUOTES_FOR_NON_POSIX = val def _which(editor): try: editor_path = subprocess.check_output(['which', editor], stderr=subprocess.STDOUT).strip() editor_path = editor_path.decode() except subprocess.CalledProcessError: editor_path = None return editor_path def strip_quotes(arg): """ Strip outer quotes from a string. Applies to both single and double quotes. :param arg: str - string to strip outer quotes from :return str - same string with potentially outer quotes stripped """ quote_chars = '"' + "'" if len(arg) > 1 and arg[0] == arg[-1] and arg[0] in quote_chars: arg = arg[1:-1] return arg def parse_quoted_string(cmdline): """Parse a quoted string into a list of arguments.""" if isinstance(cmdline, list): # arguments are already a list, return the list we were passed lexed_arglist = cmdline else: # Use shlex to split the command line into a list of arguments based on shell rules lexed_arglist = shlex.split(cmdline, posix=POSIX_SHLEX) # If not using POSIX shlex, make sure to strip off outer quotes for convenience if not POSIX_SHLEX and STRIP_QUOTES_FOR_NON_POSIX: temp_arglist = [] for arg in lexed_arglist: temp_arglist.append(strip_quotes(arg)) lexed_arglist = temp_arglist return lexed_arglist def with_category(category): """A decorator to apply a category to a command function""" def cat_decorator(func): categorize(func, category) return func return cat_decorator def with_argument_list(func): """A decorator to alter the arguments passed to a do_* cmd2 method. Default passes a string of whatever the user typed. With this decorator, the decorated method will receive a list of arguments parsed from user input using shlex.split().""" @functools.wraps(func) def cmd_wrapper(self, cmdline): lexed_arglist = parse_quoted_string(cmdline) return func(self, lexed_arglist) cmd_wrapper.__doc__ = func.__doc__ return cmd_wrapper def with_argparser_and_unknown_args(argparser): """A decorator to alter a cmd2 method to populate its ``args`` argument by parsing arguments with the given instance of argparse.ArgumentParser, but also returning unknown args as a list. :param argparser: argparse.ArgumentParser - given instance of ArgumentParser :return: function that gets passed parsed args and a list of unknown args """ # noinspection PyProtectedMember def arg_decorator(func): @functools.wraps(func) def cmd_wrapper(instance, cmdline): lexed_arglist = parse_quoted_string(cmdline) args, unknown = argparser.parse_known_args(lexed_arglist) return func(instance, args, unknown) # argparser defaults the program name to sys.argv[0] # we want it to be the name of our command argparser.prog = func.__name__[3:] # If the description has not been set, then use the method docstring if one exists if argparser.description is None and func.__doc__: argparser.description = func.__doc__ if func.__doc__: setattr(cmd_wrapper, HELP_SUMMARY, func.__doc__) cmd_wrapper.__doc__ = argparser.format_help() # Mark this function as having an argparse ArgumentParser (used by do_help) cmd_wrapper.__dict__['has_parser'] = True # If there are subcommands, store their names in a list to support tab-completion of subcommand names if argparser._subparsers is not None: # Key is subcommand name and value is completer function subcommands = collections.OrderedDict() # Get all subcommands and check if they have completer functions for name, parser in argparser._subparsers._group_actions[0]._name_parser_map.items(): if 'completer' in parser._defaults: completer = parser._defaults['completer'] else: completer = None subcommands[name] = completer cmd_wrapper.__dict__['subcommands'] = subcommands return cmd_wrapper return arg_decorator def with_argparser(argparser): """A decorator to alter a cmd2 method to populate its ``args`` argument by parsing arguments with the given instance of argparse.ArgumentParser. :param argparser: argparse.ArgumentParser - given instance of ArgumentParser :return: function that gets passed parsed args """ # noinspection PyProtectedMember def arg_decorator(func): @functools.wraps(func) def cmd_wrapper(instance, cmdline): lexed_arglist = parse_quoted_string(cmdline) args = argparser.parse_args(lexed_arglist) return func(instance, args) # argparser defaults the program name to sys.argv[0] # we want it to be the name of our command argparser.prog = func.__name__[3:] # If the description has not been set, then use the method docstring if one exists if argparser.description is None and func.__doc__: argparser.description = func.__doc__ if func.__doc__: setattr(cmd_wrapper, HELP_SUMMARY, func.__doc__) cmd_wrapper.__doc__ = argparser.format_help() # Mark this function as having an argparse ArgumentParser (used by do_help) cmd_wrapper.__dict__['has_parser'] = True # If there are subcommands, store their names in a list to support tab-completion of subcommand names if argparser._subparsers is not None: # Key is subcommand name and value is completer function subcommands = collections.OrderedDict() # Get all subcommands and check if they have completer functions for name, parser in argparser._subparsers._group_actions[0]._name_parser_map.items(): if 'completer' in parser._defaults: completer = parser._defaults['completer'] else: completer = None subcommands[name] = completer cmd_wrapper.__dict__['subcommands'] = subcommands return cmd_wrapper return arg_decorator # Can we access the clipboard? Should always be true on Windows and Mac, but only sometimes on Linux # noinspection PyUnresolvedReferences try: # Get the version of the pyperclip module as a float pyperclip_ver = float('.'.join(pyperclip.__version__.split('.')[:2])) # The extraneous output bug in pyperclip on Linux using xclip was fixed in more recent versions of pyperclip if sys.platform.startswith('linux') and pyperclip_ver < 1.6: # Avoid extraneous output to stderr from xclip when clipboard is empty at cost of overwriting clipboard contents pyperclip.copy('') else: # Try getting the contents of the clipboard _ = pyperclip.paste() except PyperclipException: can_clip = False else: can_clip = True def disable_clip(): """ Allows user of cmd2 to manually disable clipboard cut-and-paste functionality.""" global can_clip can_clip = False def get_paste_buffer(): """Get the contents of the clipboard / paste buffer. :return: str - contents of the clipboard """ pb_str = pyperclip.paste() return pb_str def write_to_paste_buffer(txt): """Copy text to the clipboard / paste buffer. :param txt: str - text to copy to the clipboard """ pyperclip.copy(txt) class ParsedString(str): """Subclass of str which also stores a pyparsing.ParseResults object containing structured parse results.""" # pyarsing.ParseResults - structured parse results, to provide multiple means of access to the parsed data parsed = None # Function which did the parsing parser = None def full_parsed_statement(self): """Used to reconstruct the full parsed statement when a command isn't recognized.""" new = ParsedString('%s %s' % (self.parsed.command, self.parsed.args)) new.parsed = self.parsed new.parser = self.parser return new def replace_with_file_contents(fname): """Action to perform when successfully matching parse element definition for inputFrom parser. :param fname: str - filename :return: str - contents of file "fname" """ try: # Any outer quotes are not part of the filename unquoted_file = strip_quotes(fname[0]) with open(os.path.expanduser(unquoted_file)) as source_file: result = source_file.read() except IOError: result = '< %s' % fname[0] # wasn't a file after all # TODO: IF pyparsing input parser logic gets fixed to support empty file, add support to get from paste buffer return result class EmbeddedConsoleExit(SystemExit): """Custom exception class for use with the py command.""" pass class EmptyStatement(Exception): """Custom exception class for handling behavior when the user just presses .""" pass # Regular expression to match ANSI escape codes ANSI_ESCAPE_RE = re.compile(r'\x1b[^m]*m') def strip_ansi(text): """Strip ANSI escape codes from a string. :param text: str - a string which may contain ANSI escape codes :return: str - the same string with any ANSI escape codes removed """ return ANSI_ESCAPE_RE.sub('', text) def _pop_readline_history(clear_history=True): """Returns a copy of readline's history and optionally clears it (default)""" # noinspection PyArgumentList history = [ readline.get_history_item(i) for i in range(1, 1 + readline.get_current_history_length()) ] if clear_history: readline.clear_history() return history def _push_readline_history(history, clear_history=True): """Restores readline's history and optionally clears it first (default)""" if clear_history: readline.clear_history() for line in history: readline.add_history(line) def _complete_from_cmd(cmd_obj, text, line, begidx, endidx): """Complete as though the user was typing inside cmd's cmdloop()""" from itertools import takewhile command_subcommand_params = line.split(None, 3) if len(command_subcommand_params) < (3 if text else 2): n = len(command_subcommand_params[0]) n += sum(1 for _ in takewhile(str.isspace, line[n:])) return cmd_obj.completenames(text, line[n:], begidx - n, endidx - n) command, subcommand = command_subcommand_params[:2] n = len(command) + sum(1 for _ in takewhile(str.isspace, line)) cfun = getattr(cmd_obj, 'complete_' + subcommand, cmd_obj.complete) return cfun(text, line[n:], begidx - n, endidx - n) class AddSubmenu(object): """Conveniently add a submenu (Cmd-like class) to a Cmd e.g. given "class SubMenu(Cmd): ..." then @AddSubmenu(SubMenu(), 'sub') class MyCmd(cmd.Cmd): .... will have the following effects: 1. 'sub' will interactively enter the cmdloop of a SubMenu instance 2. 'sub cmd args' will call do_cmd(args) in a SubMenu instance 3. 'sub ... [TAB]' will have the same behavior as [TAB] in a SubMenu cmdloop i.e., autocompletion works the way you think it should 4. 'help sub [cmd]' will print SubMenu's help (calls its do_help()) """ class _Nonexistent(object): """ Used to mark missing attributes. Disable __dict__ creation since this class does nothing """ __slots__ = () # def __init__(self, submenu, command, aliases=(), reformat_prompt="{super_prompt}>> {sub_prompt}", shared_attributes=None, require_predefined_shares=True, create_subclass=False, preserve_shares=False, persistent_history_file=None ): """Set up the class decorator submenu (Cmd): Instance of something cmd.Cmd-like command (str): The command the user types to access the SubMenu instance aliases (iterable): More commands that will behave like "command" reformat_prompt (str): Format str or None to disable if it's a string, it should contain one or more of: {super_prompt}: The current cmd's prompt {command}: The command in the current cmd with which it was called {sub_prompt}: The subordinate cmd's original prompt the default is "{super_prompt}{command} {sub_prompt}" shared_attributes (dict): dict of the form {'subordinate_attr': 'parent_attr'} the attributes are copied to the submenu at the last moment; the submenu's attributes are backed up before this and restored afterward require_predefined_shares: The shared attributes above must be independently defined in the subordinate Cmd (default: True) create_subclass: put the modifications in a subclass rather than modifying the existing class (default: False) """ self.submenu = submenu self.command = command self.aliases = aliases if persistent_history_file: self.persistent_history_file = os.path.expanduser(persistent_history_file) else: self.persistent_history_file = None if reformat_prompt is not None and not isinstance(reformat_prompt, str): raise ValueError("reformat_prompt should be either a format string or None") self.reformat_prompt = reformat_prompt self.shared_attributes = {} if shared_attributes is None else shared_attributes if require_predefined_shares: for attr in self.shared_attributes.keys(): if not hasattr(submenu, attr): raise AttributeError("The shared attribute '{attr}' is not defined in {cmd}. Either define {attr} " "in {cmd} or set require_predefined_shares=False." .format(cmd=submenu.__class__.__name__, attr=attr)) self.create_subclass = create_subclass self.preserve_shares = preserve_shares def _get_original_attributes(self): return { attr: getattr(self.submenu, attr, AddSubmenu._Nonexistent) for attr in self.shared_attributes.keys() } def _copy_in_shared_attrs(self, parent_cmd): for sub_attr, par_attr in self.shared_attributes.items(): setattr(self.submenu, sub_attr, getattr(parent_cmd, par_attr)) def _copy_out_shared_attrs(self, parent_cmd, original_attributes): if self.preserve_shares: for sub_attr, par_attr in self.shared_attributes.items(): setattr(parent_cmd, par_attr, getattr(self.submenu, sub_attr)) else: for attr, value in original_attributes.items(): if attr is not AddSubmenu._Nonexistent: setattr(self.submenu, attr, value) else: delattr(self.submenu, attr) def __call__(self, cmd_obj): """Creates a subclass of Cmd wherein the given submenu can be accessed via the given command""" def enter_submenu(parent_cmd, line): """ This function will be bound to do_ and will change the scope of the CLI to that of the submenu. """ submenu = self.submenu original_attributes = self._get_original_attributes() history = _pop_readline_history() if self.persistent_history_file: try: readline.read_history_file(self.persistent_history_file) except FileNotFoundError: pass try: # copy over any shared attributes self._copy_in_shared_attrs(parent_cmd) if line.parsed.args: # Remove the menu argument and execute the command in the submenu line = submenu.parser_manager.parsed(line.parsed.args) submenu.precmd(line) ret = submenu.onecmd(line) submenu.postcmd(ret, line) else: if self.reformat_prompt is not None: prompt = submenu.prompt submenu.prompt = self.reformat_prompt.format( super_prompt=parent_cmd.prompt, command=self.command, sub_prompt=prompt, ) submenu.cmdloop() if self.reformat_prompt is not None: # noinspection PyUnboundLocalVariable self.submenu.prompt = prompt finally: # copy back original attributes self._copy_out_shared_attrs(parent_cmd, original_attributes) # write submenu history if self.persistent_history_file: readline.write_history_file(self.persistent_history_file) # reset main app history before exit _push_readline_history(history) def complete_submenu(_self, text, line, begidx, endidx): """ This function will be bound to complete_ and will perform the complete commands of the submenu. """ submenu = self.submenu original_attributes = self._get_original_attributes() try: # copy over any shared attributes self._copy_in_shared_attrs(_self) return _complete_from_cmd(submenu, text, line, begidx, endidx) finally: # copy back original attributes self._copy_out_shared_attrs(_self, original_attributes) original_do_help = cmd_obj.do_help original_complete_help = cmd_obj.complete_help def help_submenu(_self, line): """ This function will be bound to help_ and will call the help commands of the submenu. """ tokens = line.split(None, 1) if tokens and (tokens[0] == self.command or tokens[0] in self.aliases): self.submenu.do_help(tokens[1] if len(tokens) == 2 else '') else: original_do_help(_self, line) def _complete_submenu_help(_self, text, line, begidx, endidx): """autocomplete to match help_submenu()'s behavior""" tokens = line.split(None, 1) if len(tokens) == 2 and ( not (not tokens[1].startswith(self.command) and not any( tokens[1].startswith(alias) for alias in self.aliases)) ): return self.submenu.complete_help( text, tokens[1], begidx - line.index(tokens[1]), endidx - line.index(tokens[1]), ) else: return original_complete_help(_self, text, line, begidx, endidx) if self.create_subclass: class _Cmd(cmd_obj): do_help = help_submenu complete_help = _complete_submenu_help else: _Cmd = cmd_obj _Cmd.do_help = help_submenu _Cmd.complete_help = _complete_submenu_help # Create bindings in the parent command to the submenus commands. setattr(_Cmd, 'do_' + self.command, enter_submenu) setattr(_Cmd, 'complete_' + self.command, complete_submenu) # Create additional bindings for aliases for _alias in self.aliases: setattr(_Cmd, 'do_' + _alias, enter_submenu) setattr(_Cmd, 'complete_' + _alias, complete_submenu) return _Cmd class Cmd(cmd.Cmd): """An easy but powerful framework for writing line-oriented command interpreters. Extends the Python Standard Library’s cmd package by adding a lot of useful features to the out of the box configuration. Line-oriented command interpreters are often useful for test harnesses, internal tools, and rapid prototypes. """ # Attributes used to configure the ParserManager (all are not dynamically settable at runtime) blankLinesAllowed = False commentGrammars = pyparsing.Or([pyparsing.pythonStyleComment, pyparsing.cStyleComment]) commentInProgress = pyparsing.Literal('/*') + pyparsing.SkipTo(pyparsing.stringEnd ^ '*/') legalChars = u'!#$%.:?@_-' + pyparsing.alphanums + pyparsing.alphas8bit multilineCommands = [] prefixParser = pyparsing.Empty() redirector = '>' # for sending output to file shortcuts = {'?': 'help', '!': 'shell', '@': 'load', '@@': '_relative_load'} aliases = dict() terminators = [';'] # make sure your terminators are not in legalChars! # Attributes which are NOT dynamically settable at runtime allow_cli_args = True # Should arguments passed on the command-line be processed as commands? allow_redirection = True # Should output redirection and pipes be allowed default_to_shell = False # Attempt to run unrecognized commands as shell commands quit_on_sigint = False # Quit the loop on interrupt instead of just resetting prompt reserved_words = [] # Attributes which ARE dynamically settable at runtime colors = (platform.system() != 'Windows') continuation_prompt = '> ' debug = False echo = False editor = os.environ.get('EDITOR') if not editor: if sys.platform[:3] == 'win': editor = 'notepad' else: # Favor command-line editors first so we don't leave the terminal to edit for editor in ['vim', 'vi', 'emacs', 'nano', 'pico', 'gedit', 'kate', 'subl', 'geany', 'atom']: if _which(editor): break feedback_to_output = False # Do not include nonessentials in >, | output by default (things like timing) locals_in_py = True quiet = False # Do not suppress nonessential output timing = False # Prints elapsed time for each command # To make an attribute settable with the "do_set" command, add it to this ... # This starts out as a dictionary but gets converted to an OrderedDict sorted alphabetically by key settable = {'colors': 'Colorized output (*nix only)', 'continuation_prompt': 'On 2nd+ line of input', 'debug': 'Show full error stack on error', 'echo': 'Echo command issued into output', 'editor': 'Program used by ``edit``', 'feedback_to_output': 'Include nonessentials in `|`, `>` results', 'locals_in_py': 'Allow access to your application in py via self', 'prompt': 'The prompt issued to solicit input', 'quiet': "Don't print nonessential feedback", 'timing': 'Report execution times'} def __init__(self, completekey='tab', stdin=None, stdout=None, persistent_history_file='', persistent_history_length=1000, startup_script=None, use_ipython=False, transcript_files=None): """An easy but powerful framework for writing line-oriented command interpreters, extends Python's cmd package. :param completekey: str - (optional) readline name of a completion key, default to Tab :param stdin: (optional) alternate input file object, if not specified, sys.stdin is used :param stdout: (optional) alternate output file object, if not specified, sys.stdout is used :param persistent_history_file: str - (optional) file path to load a persistent readline history from :param persistent_history_length: int - (optional) max number of lines which will be written to the history file :param startup_script: str - (optional) file path to a a script to load and execute at startup :param use_ipython: (optional) should the "ipy" command be included for an embedded IPython shell :param transcript_files: str - (optional) allows running transcript tests when allow_cli_args is False """ # If use_ipython is False, make sure the do_ipy() method doesn't exit if not use_ipython: try: del Cmd.do_ipy except AttributeError: pass # If persistent readline history is enabled, then read history from file and register to write to file at exit if persistent_history_file: persistent_history_file = os.path.expanduser(persistent_history_file) try: readline.read_history_file(persistent_history_file) # default history len is -1 (infinite), which may grow unruly readline.set_history_length(persistent_history_length) except FileNotFoundError: pass atexit.register(readline.write_history_file, persistent_history_file) # Call super class constructor super().__init__(completekey=completekey, stdin=stdin, stdout=stdout) # Commands to exclude from the help menu and tab completion self.hidden_commands = ['eof', 'eos', '_relative_load'] # Commands to exclude from the history command self.exclude_from_history = '''history edit eof eos'''.split() self._finalize_app_parameters() self.initial_stdout = sys.stdout self.history = History() self.pystate = {} self.keywords = self.reserved_words + [fname[3:] for fname in dir(self) if fname.startswith('do_')] self.parser_manager = ParserManager(redirector=self.redirector, terminators=self.terminators, multilineCommands=self.multilineCommands, legalChars=self.legalChars, commentGrammars=self.commentGrammars, commentInProgress=self.commentInProgress, blankLinesAllowed=self.blankLinesAllowed, prefixParser=self.prefixParser, preparse=self.preparse, postparse=self.postparse, aliases=self.aliases, shortcuts=self.shortcuts) self._transcript_files = transcript_files # Used to enable the ability for a Python script to quit the application self._should_quit = False # True if running inside a Python script or interactive console, False otherwise self._in_py = False # Stores results from the last command run to enable usage of results in a Python script or interactive console # Built-in commands don't make use of this. It is purely there for user-defined commands and convenience. self._last_result = None # Used to save state during a redirection self.kept_state = None self.kept_sys = None # Codes used for exit conditions self._STOP_AND_EXIT = True # cmd convention self._colorcodes = {'bold': {True: '\x1b[1m', False: '\x1b[22m'}, 'cyan': {True: '\x1b[36m', False: '\x1b[39m'}, 'blue': {True: '\x1b[34m', False: '\x1b[39m'}, 'red': {True: '\x1b[31m', False: '\x1b[39m'}, 'magenta': {True: '\x1b[35m', False: '\x1b[39m'}, 'green': {True: '\x1b[32m', False: '\x1b[39m'}, 'underline': {True: '\x1b[4m', False: '\x1b[24m'}, 'yellow': {True: '\x1b[33m', False: '\x1b[39m'}} # Used load command to store the current script dir as a LIFO queue to support _relative_load command self._script_dir = [] # Used when piping command output to a shell command self.pipe_proc = None # Used by complete() for readline tab completion self.completion_matches = [] # Used to keep track of whether we are redirecting or piping output self.redirecting = False # If this string is non-empty, then this warning message will print if a broken pipe error occurs while printing self.broken_pipe_warning = '' # If a startup script is provided, then add it in the queue to load if startup_script is not None: startup_script = os.path.expanduser(startup_script) if os.path.exists(startup_script) and os.path.getsize(startup_script) > 0: self.cmdqueue.append('load {}'.format(startup_script)) ############################################################################################################ # The following variables are used by tab-completion functions. They are reset each time complete() is run # using set_completion_defaults() and it is up to completer functions to set them before returning results. ############################################################################################################ # If true and a single match is returned to complete(), then a space will be appended # if the match appears at the end of the line self.allow_appended_space = True # If true and a single match is returned to complete(), then a closing quote # will be added if there is an unmatched opening quote self.allow_closing_quote = True # Use this list if you are completing strings that contain a common delimiter and you only want to # display the final portion of the matches as the tab-completion suggestions. The full matches # still must be returned from your completer function. For an example, look at path_complete() # which uses this to show only the basename of paths as the suggestions. delimiter_complete() also # populates this list. self.display_matches = [] # ----- Methods related to presenting output to the user ----- @property def visible_prompt(self): """Read-only property to get the visible prompt with any ANSI escape codes stripped. Used by transcript testing to make it easier and more reliable when users are doing things like coloring the prompt using ANSI color codes. :return: str - prompt stripped of any ANSI escape codes """ return strip_ansi(self.prompt) def _finalize_app_parameters(self): self.commentGrammars.ignore(pyparsing.quotedString).setParseAction(lambda x: '') # noinspection PyUnresolvedReferences self.shortcuts = sorted(self.shortcuts.items(), reverse=True) # Make sure settable parameters are sorted alphabetically by key self.settable = collections.OrderedDict(sorted(self.settable.items(), key=lambda t: t[0])) def poutput(self, msg, end='\n'): """Convenient shortcut for self.stdout.write(); by default adds newline to end if not already present. Also handles BrokenPipeError exceptions for when a commands's output has been piped to another process and that process terminates before the cmd2 command is finished executing. :param msg: str - message to print to current stdout - anything convertible to a str with '{}'.format() is OK :param end: str - string appended after the end of the message if not already present, default a newline """ if msg is not None and msg != '': try: msg_str = '{}'.format(msg) self.stdout.write(msg_str) if not msg_str.endswith(end): self.stdout.write(end) except BrokenPipeError: # This occurs if a command's output is being piped to another process and that process closes before the # command is finished. If you would like your application to print a warning message, then set the # broken_pipe_warning attribute to the message you want printed. if self.broken_pipe_warning: sys.stderr.write(self.broken_pipe_warning) def perror(self, errmsg, exception_type=None, traceback_war=True): """ Print error message to sys.stderr and if debug is true, print an exception Traceback if one exists. :param errmsg: str - error message to print out :param exception_type: str - (optional) type of exception which precipitated this error message :param traceback_war: bool - (optional) if True, print a message to let user know they can enable debug :return: """ if self.debug: traceback.print_exc() if exception_type is None: err = self.colorize("ERROR: {}\n".format(errmsg), 'red') sys.stderr.write(err) else: err = "EXCEPTION of type '{}' occurred with message: '{}'\n".format(exception_type, errmsg) sys.stderr.write(self.colorize(err, 'red')) if traceback_war: war = "To enable full traceback, run the following command: 'set debug true'\n" sys.stderr.write(self.colorize(war, 'yellow')) def pfeedback(self, msg): """For printing nonessential feedback. Can be silenced with `quiet`. Inclusion in redirected output is controlled by `feedback_to_output`.""" if not self.quiet: if self.feedback_to_output: self.poutput(msg) else: sys.stderr.write("{}\n".format(msg)) def ppaged(self, msg, end='\n'): """Print output using a pager if it would go off screen and stdout isn't currently being redirected. Never uses a pager inside of a script (Python or text) or when output is being redirected or piped or when stdout or stdin are not a fully functional terminal. :param msg: str - message to print to current stdout - anything convertible to a str with '{}'.format() is OK :param end: str - string appended after the end of the message if not already present, default a newline """ if msg is not None and msg != '': try: msg_str = '{}'.format(msg) if not msg_str.endswith(end): msg_str += end # Attempt to detect if we are not running within a fully functional terminal. # Don't try to use the pager when being run by a continuous integration system like Jenkins + pexpect. functional_terminal = False if self.stdin.isatty() and self.stdout.isatty(): if sys.platform.startswith('win') or os.environ.get('TERM') is not None: functional_terminal = True # Don't attempt to use a pager that can block if redirecting or running a script (either text or Python) # Also only attempt to use a pager if actually running in a real fully functional terminal if functional_terminal and not self.redirecting and not self._in_py and not self._script_dir: if sys.platform.startswith('win'): pager_cmd = 'more' else: # Here is the meaning of the various flags we are using with the less command: # -S causes lines longer than the screen width to be chopped (truncated) rather than wrapped # -R causes ANSI "color" escape sequences to be output in raw form (i.e. colors are displayed) # -X disables sending the termcap initialization and deinitialization strings to the terminal # -F causes less to automatically exit if the entire file can be displayed on the first screen pager_cmd = 'less -SRXF' self.pipe_proc = subprocess.Popen(pager_cmd, shell=True, stdin=subprocess.PIPE) try: self.pipe_proc.stdin.write(msg_str.encode('utf-8', 'replace')) self.pipe_proc.stdin.close() except (IOError, KeyboardInterrupt): pass # Less doesn't respect ^C, but catches it for its own UI purposes (aborting search etc. inside less) while True: try: self.pipe_proc.wait() except KeyboardInterrupt: pass else: break self.pipe_proc = None else: self.stdout.write(msg_str) except BrokenPipeError: # This occurs if a command's output is being piped to another process and that process closes before the # command is finished. If you would like your application to print a warning message, then set the # broken_pipe_warning attribute to the message you want printed. if self.broken_pipe_warning: sys.stderr.write(self.broken_pipe_warning) def colorize(self, val, color): """Given a string (``val``), returns that string wrapped in UNIX-style special characters that turn on (and then off) text color and style. If the ``colors`` environment parameter is ``False``, or the application is running on Windows, will return ``val`` unchanged. ``color`` should be one of the supported strings (or styles): red/blue/green/cyan/magenta, bold, underline""" if self.colors and (self.stdout == self.initial_stdout): return self._colorcodes[color][True] + val + self._colorcodes[color][False] return val def get_subcommands(self, command): """ Returns a list of a command's subcommand names if they exist :param command: the command we are querying :return: A subcommand list or None """ subcommand_names = None # Check if is a valid command funcname = self._func_named(command) if funcname: # Check to see if this function was decorated with an argparse ArgumentParser func = getattr(self, funcname) subcommands = func.__dict__.get('subcommands', None) if subcommands is not None: subcommand_names = subcommands.keys() return subcommand_names def get_subcommand_completer(self, command, subcommand): """ Returns a subcommand's tab completion function if one exists :param command: command which owns the subcommand :param subcommand: the subcommand we are querying :return: A completer or None """ completer = None # Check if is a valid command funcname = self._func_named(command) if funcname: # Check to see if this function was decorated with an argparse ArgumentParser func = getattr(self, funcname) subcommands = func.__dict__.get('subcommands', None) if subcommands is not None: completer = subcommands[subcommand] return completer # ----- Methods related to tab completion ----- def set_completion_defaults(self): """ Resets tab completion settings Needs to be called each time readline runs tab completion """ self.allow_appended_space = True self.allow_closing_quote = True self.display_matches = [] def tokens_for_completion(self, line, begidx, endidx): """ Used by tab completion functions to get all tokens through the one being completed :param line: str - the current input line with leading whitespace removed :param begidx: int - the beginning index of the prefix text :param endidx: int - the ending index of the prefix text :return: A 2 item tuple where the items are On Success tokens: list of unquoted tokens this is generally the list needed for tab completion functions raw_tokens: list of tokens with any quotes preserved this can be used to know if a token was quoted or is missing a closing quote Both lists are guaranteed to have at least 1 item The last item in both lists is the token being tab completed On Failure Both items are None """ unclosed_quote = '' quotes_to_try = copy.copy(QUOTES) tmp_line = line[:endidx] tmp_endidx = endidx # Parse the line into tokens while True: try: # Use non-POSIX parsing to keep the quotes around the tokens initial_tokens = shlex.split(tmp_line[:tmp_endidx], posix=False) # If the cursor is at an empty token outside of a quoted string, # then that is the token being completed. Add it to the list. if not unclosed_quote and begidx == tmp_endidx: initial_tokens.append('') break except ValueError: # ValueError can be caused by missing closing quote if not quotes_to_try: # Since we have no more quotes to try, something else # is causing the parsing error. Return None since # this means the line is malformed. return None, None # Add a closing quote and try to parse again unclosed_quote = quotes_to_try[0] quotes_to_try = quotes_to_try[1:] tmp_line = line[:endidx] tmp_line += unclosed_quote tmp_endidx = endidx + 1 if self.allow_redirection: # Since redirection is enabled, we need to treat redirection characters (|, <, >) # as word breaks when they are in unquoted strings. Go through each token # and further split them on these characters. Each run of redirect characters # is treated as a single token. raw_tokens = [] for cur_initial_token in initial_tokens: # Save tokens up to 1 character in length or quoted tokens. No need to parse these. if len(cur_initial_token) <= 1 or cur_initial_token[0] in QUOTES: raw_tokens.append(cur_initial_token) continue # Iterate over each character in this token cur_index = 0 cur_char = cur_initial_token[cur_index] # Keep track of the token we are building cur_raw_token = '' while True: if cur_char not in REDIRECTION_CHARS: # Keep appending to cur_raw_token until we hit a redirect char while cur_char not in REDIRECTION_CHARS: cur_raw_token += cur_char cur_index += 1 if cur_index < len(cur_initial_token): cur_char = cur_initial_token[cur_index] else: break else: redirect_char = cur_char # Keep appending to cur_raw_token until we hit something other than redirect_char while cur_char == redirect_char: cur_raw_token += cur_char cur_index += 1 if cur_index < len(cur_initial_token): cur_char = cur_initial_token[cur_index] else: break # Save the current token raw_tokens.append(cur_raw_token) cur_raw_token = '' # Check if we've viewed all characters if cur_index >= len(cur_initial_token): break else: raw_tokens = initial_tokens # Save the unquoted tokens tokens = [strip_quotes(cur_token) for cur_token in raw_tokens] # If the token being completed had an unclosed quote, we need # to remove the closing quote that was added in order for it # to match what was on the command line. if unclosed_quote: raw_tokens[-1] = raw_tokens[-1][:-1] return tokens, raw_tokens # noinspection PyUnusedLocal @staticmethod def basic_complete(text, line, begidx, endidx, match_against): """ Performs tab completion against a list :param text: str - the string prefix we are attempting to match (all returned matches must begin with it) :param line: str - the current input line with leading whitespace removed :param begidx: int - the beginning index of the prefix text :param endidx: int - the ending index of the prefix text :param match_against: Collection - the list being matched against :return: List[str] - a list of possible tab completions """ return [cur_match for cur_match in match_against if cur_match.startswith(text)] def delimiter_complete(self, text, line, begidx, endidx, match_against, delimiter): """ Performs tab completion against a list but each match is split on a delimiter and only the portion of the match being tab completed is shown as the completion suggestions. This is useful if you match against strings that are hierarchical in nature and have a common delimiter. An easy way to illustrate this concept is path completion since paths are just directories/files delimited by a slash. If you are tab completing items in /home/user you don't get the following as suggestions: /home/user/file.txt /home/user/program.c /home/user/maps/ /home/user/cmd2.py Instead you are shown: file.txt program.c maps/ cmd2.py For a large set of data, this can be visually more pleasing and easier to search. Another example would be strings formatted with the following syntax: company::department::name In this case the delimiter would be :: and the user could easily narrow down what they are looking for if they were only shown suggestions in the category they are at in the string. :param text: str - the string prefix we are attempting to match (all returned matches must begin with it) :param line: str - the current input line with leading whitespace removed :param begidx: int - the beginning index of the prefix text :param endidx: int - the ending index of the prefix text :param match_against: Collection - the list being matched against :param delimiter: str - what delimits each portion of the matches (ex: paths are delimited by a slash) :return: List[str] - a list of possible tab completions """ matches = self.basic_complete(text, line, begidx, endidx, match_against) # Display only the portion of the match that's being completed based on delimiter if matches: # Get the common beginning for the matches common_prefix = os.path.commonprefix(matches) prefix_tokens = common_prefix.split(delimiter) # Calculate what portion of the match we are completing display_token_index = 0 if prefix_tokens: display_token_index = len(prefix_tokens) - 1 # Get this portion for each match and store them in self.display_matches for cur_match in matches: match_tokens = cur_match.split(delimiter) display_token = match_tokens[display_token_index] if not display_token: display_token = delimiter self.display_matches.append(display_token) return matches def flag_based_complete(self, text, line, begidx, endidx, flag_dict, all_else=None): """ Tab completes based on a particular flag preceding the token being completed :param text: str - the string prefix we are attempting to match (all returned matches must begin with it) :param line: str - the current input line with leading whitespace removed :param begidx: int - the beginning index of the prefix text :param endidx: int - the ending index of the prefix text :param flag_dict: dict - dictionary whose structure is the following: keys - flags (ex: -c, --create) that result in tab completion for the next argument in the command line values - there are two types of values 1. iterable list of strings to match against (dictionaries, lists, etc.) 2. function that performs tab completion (ex: path_complete) :param all_else: Collection or function - an optional parameter for tab completing any token that isn't preceded by a flag in flag_dict :return: List[str] - a list of possible tab completions """ # Get all tokens through the one being completed tokens, _ = self.tokens_for_completion(line, begidx, endidx) if tokens is None: return [] completions_matches = [] match_against = all_else # Must have at least 2 args for a flag to precede the token being completed if len(tokens) > 1: flag = tokens[-2] if flag in flag_dict: match_against = flag_dict[flag] # Perform tab completion using a Collection if isinstance(match_against, Collection): completions_matches = self.basic_complete(text, line, begidx, endidx, match_against) # Perform tab completion using a function elif callable(match_against): completions_matches = match_against(text, line, begidx, endidx) return completions_matches def index_based_complete(self, text, line, begidx, endidx, index_dict, all_else=None): """ Tab completes based on a fixed position in the input string :param text: str - the string prefix we are attempting to match (all returned matches must begin with it) :param line: str - the current input line with leading whitespace removed :param begidx: int - the beginning index of the prefix text :param endidx: int - the ending index of the prefix text :param index_dict: dict - dictionary whose structure is the following: keys - 0-based token indexes into command line that determine which tokens perform tab completion values - there are two types of values 1. iterable list of strings to match against (dictionaries, lists, etc.) 2. function that performs tab completion (ex: path_complete) :param all_else: Collection or function - an optional parameter for tab completing any token that isn't at an index in index_dict :return: List[str] - a list of possible tab completions """ # Get all tokens through the one being completed tokens, _ = self.tokens_for_completion(line, begidx, endidx) if tokens is None: return [] matches = [] # Get the index of the token being completed index = len(tokens) - 1 # Check if token is at an index in the dictionary if index in index_dict: match_against = index_dict[index] else: match_against = all_else # Perform tab completion using a Collection if isinstance(match_against, Collection): matches = self.basic_complete(text, line, begidx, endidx, match_against) # Perform tab completion using a function elif callable(match_against): matches = match_against(text, line, begidx, endidx) return matches # noinspection PyUnusedLocal def path_complete(self, text, line, begidx, endidx, dir_exe_only=False, dir_only=False): """Performs completion of local file system paths :param text: str - the string prefix we are attempting to match (all returned matches must begin with it) :param line: str - the current input line with leading whitespace removed :param begidx: int - the beginning index of the prefix text :param endidx: int - the ending index of the prefix text :param dir_exe_only: bool - only return directories and executables, not non-executable files :param dir_only: bool - only return directories :return: List[str] - a list of possible tab completions """ # Used to complete ~ and ~user strings def complete_users(): # We are returning ~user strings that resolve to directories, # so don't append a space or quote in the case of a single result. self.allow_appended_space = False self.allow_closing_quote = False users = [] # Windows lacks the pwd module so we can't get a list of users. # Instead we will add a slash once the user enters text that # resolves to an existing home directory. if sys.platform.startswith('win'): expanded_path = os.path.expanduser(text) if os.path.isdir(expanded_path): users.append(text + os.path.sep) else: import pwd # Iterate through a list of users from the password database for cur_pw in pwd.getpwall(): # Check if the user has an existing home dir if os.path.isdir(cur_pw.pw_dir): # Add a ~ to the user to match against text cur_user = '~' + cur_pw.pw_name if cur_user.startswith(text): if add_trailing_sep_if_dir: cur_user += os.path.sep users.append(cur_user) return users # Determine if a trailing separator should be appended to directory completions add_trailing_sep_if_dir = False if endidx == len(line) or (endidx < len(line) and line[endidx] != os.path.sep): add_trailing_sep_if_dir = True # Used to replace cwd in the final results cwd = os.getcwd() cwd_added = False # Used to replace expanded user path in final result orig_tilde_path = '' expanded_tilde_path = '' # If the search text is blank, then search in the CWD for * if not text: search_str = os.path.join(os.getcwd(), '*') cwd_added = True else: # Purposely don't match any path containing wildcards - what we are doing is complicated enough! wildcards = ['*', '?'] for wildcard in wildcards: if wildcard in text: return [] # Start the search string search_str = text + '*' # Handle tilde expansion and completion if text.startswith('~'): sep_index = text.find(os.path.sep, 1) # If there is no slash, then the user is still completing the user after the tilde if sep_index == -1: return complete_users() # Otherwise expand the user dir else: search_str = os.path.expanduser(search_str) # Get what we need to restore the original tilde path later orig_tilde_path = text[:sep_index] expanded_tilde_path = os.path.expanduser(orig_tilde_path) # If the search text does not have a directory, then use the cwd elif not os.path.dirname(text): search_str = os.path.join(os.getcwd(), search_str) cwd_added = True # Find all matching path completions matches = glob.glob(search_str) # Filter based on type if dir_exe_only: matches = [c for c in matches if os.path.isdir(c) or os.access(c, os.X_OK)] elif dir_only: matches = [c for c in matches if os.path.isdir(c)] # Don't append a space or closing quote to directory if len(matches) == 1 and os.path.isdir(matches[0]): self.allow_appended_space = False self.allow_closing_quote = False # Build display_matches and add a slash to directories for index, cur_match in enumerate(matches): # Display only the basename of this path in the tab-completion suggestions self.display_matches.append(os.path.basename(cur_match)) # Add a separator after directories if the next character isn't already a separator if os.path.isdir(cur_match) and add_trailing_sep_if_dir: matches[index] += os.path.sep self.display_matches[index] += os.path.sep # Remove cwd if it was added to match the text readline expects if cwd_added: matches = [cur_path.replace(cwd + os.path.sep, '', 1) for cur_path in matches] # Restore the tilde string if we expanded one to match the text readline expects if expanded_tilde_path: matches = [cur_path.replace(expanded_tilde_path, orig_tilde_path, 1) for cur_path in matches] return matches @staticmethod def get_exes_in_path(starts_with): """ Returns names of executables in a user's path :param starts_with: str - what the exes should start with. leave blank for all exes in path. :return: List[str] - a list of matching exe names """ # Purposely don't match any executable containing wildcards wildcards = ['*', '?'] for wildcard in wildcards: if wildcard in starts_with: return [] # Get a list of every directory in the PATH environment variable and ignore symbolic links paths = [p for p in os.getenv('PATH').split(os.path.pathsep) if not os.path.islink(p)] # Use a set to store exe names since there can be duplicates exes_set = set() # Find every executable file in the user's path that matches the pattern for path in paths: full_path = os.path.join(path, starts_with) matches = [f for f in glob.glob(full_path + '*') if os.path.isfile(f) and os.access(f, os.X_OK)] for match in matches: exes_set.add(os.path.basename(match)) return list(exes_set) def shell_cmd_complete(self, text, line, begidx, endidx, complete_blank=False): """Performs completion of executables either in a user's path or a given path :param text: str - the string prefix we are attempting to match (all returned matches must begin with it) :param line: str - the current input line with leading whitespace removed :param begidx: int - the beginning index of the prefix text :param endidx: int - the ending index of the prefix text :param complete_blank: bool - If True, then a blank will complete all shell commands in a user's path If False, then no completion is performed Defaults to False to match Bash shell behavior :return: List[str] - a list of possible tab completions """ # Don't tab complete anything if no shell command has been started if not complete_blank and not text: return [] # If there are no path characters in the search text, then do shell command completion in the user's path if not text.startswith('~') and os.path.sep not in text: return self.get_exes_in_path(text) # Otherwise look for executables in the given path else: return self.path_complete(text, line, begidx, endidx, dir_exe_only=True) def _redirect_complete(self, text, line, begidx, endidx, compfunc): """ Called by complete() as the first tab completion function for all commands It determines if it should tab complete for redirection (|, <, >, >>) or use the completer function for the current command :param text: str - the string prefix we are attempting to match (all returned matches must begin with it) :param line: str - the current input line with leading whitespace removed :param begidx: int - the beginning index of the prefix text :param endidx: int - the ending index of the prefix text :param compfunc: Callable - the completer function for the current command this will be called if we aren't completing for redirection :return: List[str] - a list of possible tab completions """ if self.allow_redirection: # Get all tokens through the one being completed. We want the raw tokens # so we can tell if redirection strings are quoted and ignore them. _, raw_tokens = self.tokens_for_completion(line, begidx, endidx) if raw_tokens is None: return [] if len(raw_tokens) > 1: # Build a list of all redirection tokens all_redirects = REDIRECTION_CHARS + ['>>'] # Check if there are redirection strings prior to the token being completed seen_pipe = False has_redirection = False for cur_token in raw_tokens[:-1]: if cur_token in all_redirects: has_redirection = True if cur_token == '|': seen_pipe = True # Get token prior to the one being completed prior_token = raw_tokens[-2] # If a pipe is right before the token being completed, complete a shell command as the piped process if prior_token == '|': return self.shell_cmd_complete(text, line, begidx, endidx) # Otherwise do path completion either as files to redirectors or arguments to the piped process elif prior_token in all_redirects or seen_pipe: return self.path_complete(text, line, begidx, endidx) # If there were redirection strings anywhere on the command line, then we # are no longer tab completing for the current command elif has_redirection: return [] # Call the command's completer function return compfunc(text, line, begidx, endidx) @staticmethod def _pad_matches_to_display(matches_to_display): """ Adds padding to the matches being displayed as tab completion suggestions. The default padding of readline/pyreadine is small and not visually appealing especially if matches have spaces. It appears very squished together. :param matches_to_display: the matches being padded :return: the padded matches and length of padding that was added """ if rl_type == RlType.GNU: # Add 2 to the padding of 2 that readline uses for a total of 4. padding = 2 * ' ' elif rl_type == RlType.PYREADLINE: # Add 3 to the padding of 1 that pyreadline uses for a total of 4. padding = 3 * ' ' else: return matches_to_display, 0 return [cur_match + padding for cur_match in matches_to_display], len(padding) def _display_matches_gnu_readline(self, substitution, matches, longest_match_length): """ Prints a match list using GNU readline's rl_display_match_list() This exists to print self.display_matches if it has data. Otherwise matches prints. :param substitution: str - the substitution written to the command line :param matches: list[str] - the tab completion matches to display :param longest_match_length: int - longest printed length of the matches """ if rl_type == RlType.GNU: # Check if we should show display_matches if self.display_matches: matches_to_display = self.display_matches # Recalculate longest_match_length for display_matches longest_match_length = 0 for cur_match in matches_to_display: cur_length = wcswidth(cur_match) if cur_length > longest_match_length: longest_match_length = cur_length else: matches_to_display = matches # Add padding for visual appeal matches_to_display, padding_length = self._pad_matches_to_display(matches_to_display) longest_match_length += padding_length # We will use readline's display function (rl_display_match_list()), so we # need to encode our string as bytes to place in a C array. encoded_substitution = bytes(substitution, encoding='utf-8') encoded_matches = [bytes(cur_match, encoding='utf-8') for cur_match in matches_to_display] # rl_display_match_list() expects matches to be in argv format where # substitution is the first element, followed by the matches, and then a NULL. # noinspection PyCallingNonCallable,PyTypeChecker strings_array = (ctypes.c_char_p * (1 + len(encoded_matches) + 1))() # Copy in the encoded strings and add a NULL to the end strings_array[0] = encoded_substitution strings_array[1:-1] = encoded_matches strings_array[-1] = None # Call readline's display function # rl_display_match_list(strings_array, number of completion matches, longest match length) readline_lib.rl_display_match_list(strings_array, len(encoded_matches), longest_match_length) # Redraw prompt and input line rl_force_redisplay() def _display_matches_pyreadline(self, matches): """ Prints a match list using pyreadline's _display_completions() This exists to print self.display_matches if it has data. Otherwise matches prints. :param matches: list[str] - the tab completion matches to display """ if rl_type == RlType.PYREADLINE: # Check if we should show display_matches if self.display_matches: matches_to_display = self.display_matches else: matches_to_display = matches # Add padding for visual appeal matches_to_display, _ = self._pad_matches_to_display(matches_to_display) # Display matches using actual display function. This also redraws the prompt and line. orig_pyreadline_display(matches_to_display) # ----- Methods which override stuff in cmd ----- def complete(self, text, state): """Override of command method which returns the next possible completion for 'text'. If a command has not been entered, then complete against command list. Otherwise try to call complete_ to get list of completions. This method gets called directly by readline because it is set as the tab-completion function. This completer function is called as complete(text, state), for state in 0, 1, 2, …, until it returns a non-string value. It should return the next possible completion starting with text. :param text: str - the current word that user is typing :param state: int - non-negative integer """ if state == 0: unclosed_quote = '' self.set_completion_defaults() # lstrip the original line orig_line = readline.get_line_buffer() line = orig_line.lstrip() stripped = len(orig_line) - len(line) # Calculate new indexes for the stripped line. If the cursor is at a position before the end of a # line of spaces, then the following math could result in negative indexes. Enforce a max of 0. begidx = max(readline.get_begidx() - stripped, 0) endidx = max(readline.get_endidx() - stripped, 0) # Shortcuts are not word break characters when tab completing. Therefore shortcuts become part # of the text variable if there isn't a word break, like a space, after it. We need to remove it # from text and update the indexes. This only applies if we are at the the beginning of the line. shortcut_to_restore = '' if begidx == 0: for (shortcut, expansion) in self.shortcuts: if text.startswith(shortcut): # Save the shortcut to restore later shortcut_to_restore = shortcut # Adjust text and where it begins text = text[len(shortcut_to_restore):] begidx += len(shortcut_to_restore) break # If begidx is greater than 0, then we are no longer completing the command if begidx > 0: # Parse the command line command, args, expanded_line = self.parseline(line) # We overwrote line with a properly formatted but fully stripped version # Restore the end spaces since line is only supposed to be lstripped when # passed to completer functions according to Python docs rstripped_len = len(line) - len(line.rstrip()) expanded_line += ' ' * rstripped_len # Fix the index values if expanded_line has a different size than line if len(expanded_line) != len(line): diff = len(expanded_line) - len(line) begidx += diff endidx += diff # Overwrite line to pass into completers line = expanded_line # Get all tokens through the one being completed tokens, raw_tokens = self.tokens_for_completion(line, begidx, endidx) # Either had a parsing error or are trying to complete the command token # The latter can happen if default_to_shell is True and parseline() allowed # assumed something like " or ' was a command. if tokens is None or len(tokens) == 1: self.completion_matches = [] return None # Text we need to remove from completions later text_to_remove = '' # Get the token being completed with any opening quote preserved raw_completion_token = raw_tokens[-1] # Check if the token being completed has an opening quote if raw_completion_token and raw_completion_token[0] in QUOTES: # Since the token is still being completed, we know the opening quote is unclosed unclosed_quote = raw_completion_token[0] # readline still performs word breaks after a quote. Therefore something like quoted search # text with a space would have resulted in begidx pointing to the middle of the token we # we want to complete. Figure out where that token actually begins and save the beginning # portion of it that was not part of the text readline gave us. We will remove it from the # completions later since readline expects them to start with the original text. actual_begidx = line[:endidx].rfind(tokens[-1]) if actual_begidx != begidx: text_to_remove = line[actual_begidx:begidx] # Adjust text and where it begins so the completer routines # get unbroken search text to complete on. text = text_to_remove + text begidx = actual_begidx # Check if a valid command was entered if command in self.get_all_commands(): # Get the completer function for this command try: compfunc = getattr(self, 'complete_' + command) except AttributeError: compfunc = self.completedefault subcommands = self.get_subcommands(command) if subcommands is not None: # Since there are subcommands, then try completing those if the cursor is in # the token at index 1, otherwise default to using compfunc index_dict = {1: subcommands} compfunc = functools.partial(self.index_based_complete, index_dict=index_dict, all_else=compfunc) # A valid command was not entered else: # Check if this command should be run as a shell command if self.default_to_shell and command in self.get_exes_in_path(command): compfunc = self.path_complete else: compfunc = self.completedefault # Attempt tab completion for redirection first, and if that isn't occurring, # call the completer function for the current command self.completion_matches = self._redirect_complete(text, line, begidx, endidx, compfunc) if self.completion_matches: # Eliminate duplicates matches_set = set(self.completion_matches) self.completion_matches = list(matches_set) display_matches_set = set(self.display_matches) self.display_matches = list(display_matches_set) # Check if display_matches has been used. If so, then matches # on delimited strings like paths was done. if self.display_matches: matches_delimited = True else: matches_delimited = False # Since self.display_matches is empty, set it to self.completion_matches # before we alter them. That way the suggestions will reflect how we parsed # the token being completed and not how readline did. self.display_matches = copy.copy(self.completion_matches) # Check if we need to add an opening quote if not unclosed_quote: add_quote = False # This is the tab completion text that will appear on the command line. common_prefix = os.path.commonprefix(self.completion_matches) if matches_delimited: # Check if any portion of the display matches appears in the tab completion display_prefix = os.path.commonprefix(self.display_matches) # For delimited matches, we check what appears before the display # matches (common_prefix) as well as the display matches themselves. if (' ' in common_prefix) or (display_prefix and ' ' in ''.join(self.display_matches)): add_quote = True # If there is a tab completion and any match has a space, then add an opening quote elif common_prefix and ' ' in ''.join(self.completion_matches): add_quote = True if add_quote: # Figure out what kind of quote to add and save it as the unclosed_quote if '"' in ''.join(self.completion_matches): unclosed_quote = "'" else: unclosed_quote = '"' self.completion_matches = [unclosed_quote + match for match in self.completion_matches] # Check if we need to remove text from the beginning of tab completions elif text_to_remove: self.completion_matches = \ [m.replace(text_to_remove, '', 1) for m in self.completion_matches] # Check if we need to restore a shortcut in the tab completions # so it doesn't get erased from the command line if shortcut_to_restore: self.completion_matches = \ [shortcut_to_restore + match for match in self.completion_matches] else: # Complete token against aliases and command names alias_names = set(self.aliases.keys()) visible_commands = set(self.get_visible_commands()) strs_to_match = list(alias_names | visible_commands) self.completion_matches = self.basic_complete(text, line, begidx, endidx, strs_to_match) # Handle single result if len(self.completion_matches) == 1: str_to_append = '' # Add a closing quote if needed and allowed if self.allow_closing_quote and unclosed_quote: str_to_append += unclosed_quote # If we are at the end of the line, then add a space if allowed if self.allow_appended_space and endidx == len(line): str_to_append += ' ' self.completion_matches[0] += str_to_append # Otherwise sort matches elif self.completion_matches: self.completion_matches.sort() self.display_matches.sort() try: return self.completion_matches[state] except IndexError: return None def get_all_commands(self): """ Returns a list of all commands """ return [cur_name[3:] for cur_name in self.get_names() if cur_name.startswith('do_')] def get_visible_commands(self): """ Returns a list of commands that have not been hidden """ commands = self.get_all_commands() # Remove the hidden commands for name in self.hidden_commands: if name in commands: commands.remove(name) return commands def get_help_topics(self): """ Returns a list of help topics """ return [name[5:] for name in self.get_names() if name.startswith('help_')] def complete_help(self, text, line, begidx, endidx): """ Override of parent class method to handle tab completing subcommands and not showing hidden commands Returns a list of possible tab completions """ # The command is the token at index 1 in the command line cmd_index = 1 # The subcommand is the token at index 2 in the command line subcmd_index = 2 # Get all tokens through the one being completed tokens, _ = self.tokens_for_completion(line, begidx, endidx) if tokens is None: return [] matches = [] # Get the index of the token being completed index = len(tokens) - 1 # Check if we are completing a command or help topic if index == cmd_index: # Complete token against topics and visible commands topics = set(self.get_help_topics()) visible_commands = set(self.get_visible_commands()) strs_to_match = list(topics | visible_commands) matches = self.basic_complete(text, line, begidx, endidx, strs_to_match) # Check if we are completing a subcommand elif index == subcmd_index: # Match subcommands if any exist command = tokens[cmd_index] matches = self.basic_complete(text, line, begidx, endidx, self.get_subcommands(command)) return matches # noinspection PyUnusedLocal def sigint_handler(self, signum, frame): """Signal handler for SIGINTs which typically come from Ctrl-C events. If you need custom SIGINT behavior, then override this function. :param signum: int - signal number :param frame """ # Save copy of pipe_proc since it could theoretically change while this is running pipe_proc = self.pipe_proc if pipe_proc is not None: pipe_proc.terminate() # Re-raise a KeyboardInterrupt so other parts of the code can catch it raise KeyboardInterrupt("Got a keyboard interrupt") def preloop(self): """"Hook method executed once when the cmdloop() method is called.""" # Register a default SIGINT signal handler for Ctrl+C signal.signal(signal.SIGINT, self.sigint_handler) def precmd(self, statement): """Hook method executed just before the command is processed by ``onecmd()`` and after adding it to the history. :param statement: ParsedString - subclass of str which also contains pyparsing ParseResults instance :return: ParsedString - a potentially modified version of the input ParsedString statement """ return statement # ----- Methods which are cmd2-specific lifecycle hooks which are not present in cmd ----- # noinspection PyMethodMayBeStatic def preparse(self, raw): """Hook method executed just before the command line is interpreted, but after the input prompt is generated. :param raw: str - raw command line input :return: str - potentially modified raw command line input """ return raw # noinspection PyMethodMayBeStatic def postparse(self, parse_result): """Hook that runs immediately after parsing the command-line but before ``parsed()`` returns a ParsedString. :param parse_result: pyparsing.ParseResults - parsing results output by the pyparsing parser :return: pyparsing.ParseResults - potentially modified ParseResults object """ return parse_result # noinspection PyMethodMayBeStatic def postparsing_precmd(self, statement): """This runs after parsing the command-line, but before anything else; even before adding cmd to history. NOTE: This runs before precmd() and prior to any potential output redirection or piping. If you wish to fatally fail this command and exit the application entirely, set stop = True. If you wish to just fail this command you can do so by raising an exception: - raise EmptyStatement - will silently fail and do nothing - raise - will fail and print an error message :param statement: - the parsed command-line statement :return: (bool, statement) - (stop, statement) containing a potentially modified version of the statement """ stop = False return stop, statement # noinspection PyMethodMayBeStatic def postparsing_postcmd(self, stop): """This runs after everything else, including after postcmd(). It even runs when an empty line is entered. Thus, if you need to do something like update the prompt due to notifications from a background thread, then this is the method you want to override to do it. :param stop: bool - True implies the entire application should exit. :return: bool - True implies the entire application should exit. """ if not sys.platform.startswith('win'): # Fix those annoying problems that occur with terminal programs like "less" when you pipe to them if self.stdin.isatty(): proc = subprocess.Popen(shlex.split('stty sane')) proc.communicate() return stop def parseline(self, line): """Parse the line into a command name and a string containing the arguments. NOTE: This is an override of a parent class method. It is only used by other parent class methods. But we do need to override it here so that the additional shortcuts present in cmd2 get properly expanded for purposes of tab completion. Used for command tab completion. Returns a tuple containing (command, args, line). 'command' and 'args' may be None if the line couldn't be parsed. :param line: str - line read by readline :return: (str, str, str) - tuple containing (command, args, line) """ line = line.strip() if not line: # Deal with empty line or all whitespace line return None, None, line # Make a copy of aliases so we can edit it tmp_aliases = list(self.aliases.keys()) keep_expanding = len(tmp_aliases) > 0 # Expand aliases while keep_expanding: for cur_alias in tmp_aliases: keep_expanding = False if line == cur_alias or line.startswith(cur_alias + ' '): line = line.replace(cur_alias, self.aliases[cur_alias], 1) # Do not expand the same alias more than once tmp_aliases.remove(cur_alias) keep_expanding = len(tmp_aliases) > 0 break # Expand command shortcut to its full command name for (shortcut, expansion) in self.shortcuts: if line.startswith(shortcut): # If the next character after the shortcut isn't a space, then insert one shortcut_len = len(shortcut) if len(line) == shortcut_len or line[shortcut_len] != ' ': expansion += ' ' # Expand the shortcut line = line.replace(shortcut, expansion, 1) break i, n = 0, len(line) # If we are allowing shell commands, then allow any character in the command if self.default_to_shell: while i < n and line[i] != ' ': i += 1 # Otherwise only allow those in identchars else: while i < n and line[i] in self.identchars: i += 1 command, arg = line[:i], line[i:].strip() return command, arg, line def onecmd_plus_hooks(self, line): """Top-level function called by cmdloop() to handle parsing a line and running the command and all of its hooks. :param line: str - line of text read from input :return: bool - True if cmdloop() should exit, False otherwise """ stop = 0 try: statement = self._complete_statement(line) (stop, statement) = self.postparsing_precmd(statement) if stop: return self.postparsing_postcmd(stop) try: if self.allow_redirection: self._redirect_output(statement) timestart = datetime.datetime.now() statement = self.precmd(statement) stop = self.onecmd(statement) stop = self.postcmd(stop, statement) if self.timing: self.pfeedback('Elapsed: %s' % str(datetime.datetime.now() - timestart)) finally: if self.allow_redirection: self._restore_output(statement) except EmptyStatement: pass except ValueError as ex: # If shlex.split failed on syntax, let user know whats going on self.perror("Invalid syntax: {}".format(ex), traceback_war=False) except Exception as ex: self.perror(ex, type(ex).__name__) finally: return self.postparsing_postcmd(stop) def runcmds_plus_hooks(self, cmds): """Convenience method to run multiple commands by onecmd_plus_hooks. This method adds the given cmds to the command queue and processes the queue until completion or an error causes it to abort. Scripts that are loaded will have their commands added to the queue. Scripts may even load other scripts recursively. This means, however, that you should not use this method if there is a running cmdloop or some other event-loop. This method is only intended to be used in "one-off" scenarios. NOTE: You may need this method even if you only have one command. If that command is a load, then you will need this command to fully process all the subsequent commands that are loaded from the script file. This is an improvement over onecmd_plus_hooks, which expects to be used inside of a command loop which does the processing of loaded commands. Example: cmd_obj.runcmds_plus_hooks(['load myscript.txt']) :param cmds: list - Command strings suitable for onecmd_plus_hooks. :return: bool - True implies the entire application should exit. """ stop = False self.cmdqueue = list(cmds) + self.cmdqueue try: while self.cmdqueue and not stop: line = self.cmdqueue.pop(0) if self.echo and line != 'eos': self.poutput('{}{}'.format(self.prompt, line)) stop = self.onecmd_plus_hooks(line) finally: # Clear out the command queue and script directory stack, just in # case we hit an error and they were not completed. self.cmdqueue = [] self._script_dir = [] # NOTE: placing this return here inside the finally block will # swallow exceptions. This is consistent with what is done in # onecmd_plus_hooks and _cmdloop, although it may not be # necessary/desired here. return stop def _complete_statement(self, line): """Keep accepting lines of input until the command is complete.""" if not line or (not pyparsing.Or(self.commentGrammars).setParseAction(lambda x: '').transformString(line)): raise EmptyStatement() statement = self.parser_manager.parsed(line) while statement.parsed.multilineCommand and (statement.parsed.terminator == ''): statement = '%s\n%s' % (statement.parsed.raw, self.pseudo_raw_input(self.continuation_prompt)) statement = self.parser_manager.parsed(statement) if not statement.parsed.command: raise EmptyStatement() return statement def _redirect_output(self, statement): """Handles output redirection for >, >>, and |. :param statement: ParsedString - subclass of str which also contains pyparsing ParseResults instance """ if statement.parsed.pipeTo: self.kept_state = Statekeeper(self, ('stdout',)) # Create a pipe with read and write sides read_fd, write_fd = os.pipe() # Open each side of the pipe and set stdout accordingly # noinspection PyTypeChecker self.stdout = io.open(write_fd, 'w') self.redirecting = True # noinspection PyTypeChecker subproc_stdin = io.open(read_fd, 'r') # We want Popen to raise an exception if it fails to open the process. Thus we don't set shell to True. try: self.pipe_proc = subprocess.Popen(shlex.split(statement.parsed.pipeTo), stdin=subproc_stdin) except Exception as ex: # Restore stdout to what it was and close the pipe self.stdout.close() subproc_stdin.close() self.pipe_proc = None self.kept_state.restore() self.kept_state = None self.redirecting = False # Re-raise the exception raise ex elif statement.parsed.output: if (not statement.parsed.outputTo) and (not can_clip): raise EnvironmentError('Cannot redirect to paste buffer; install ``xclip`` and re-run to enable') self.kept_state = Statekeeper(self, ('stdout',)) self.kept_sys = Statekeeper(sys, ('stdout',)) self.redirecting = True if statement.parsed.outputTo: mode = 'w' if statement.parsed.output == 2 * self.redirector: mode = 'a' sys.stdout = self.stdout = open(os.path.expanduser(statement.parsed.outputTo), mode) else: sys.stdout = self.stdout = tempfile.TemporaryFile(mode="w+") if statement.parsed.output == '>>': self.poutput(get_paste_buffer()) def _restore_output(self, statement): """Handles restoring state after output redirection as well as the actual pipe operation if present. :param statement: ParsedString - subclass of str which also contains pyparsing ParseResults instance """ # If we have redirected output to a file or the clipboard or piped it to a shell command, then restore state if self.kept_state is not None: # If we redirected output to the clipboard if statement.parsed.output and not statement.parsed.outputTo: self.stdout.seek(0) write_to_paste_buffer(self.stdout.read()) try: # Close the file or pipe that stdout was redirected to self.stdout.close() except BrokenPipeError: pass finally: # Restore self.stdout self.kept_state.restore() self.kept_state = None # If we were piping output to a shell command, then close the subprocess the shell command was running in if self.pipe_proc is not None: self.pipe_proc.communicate() self.pipe_proc = None # Restore sys.stdout if need be if self.kept_sys is not None: self.kept_sys.restore() self.kept_sys = None self.redirecting = False def _func_named(self, arg): """Gets the method name associated with a given command. :param arg: str - command to look up method name which implements it :return: str - method name which implements the given command """ result = None target = 'do_' + arg if target in dir(self): result = target return result def onecmd(self, line): """ This executes the actual do_* method for a command. If the command provided doesn't exist, then it executes _default() instead. :param line: ParsedString - subclass of string including the pyparsing ParseResults :return: bool - a flag indicating whether the interpretation of commands should stop """ statement = self.parser_manager.parsed(line) funcname = self._func_named(statement.parsed.command) if not funcname: return self.default(statement) # Since we have a valid command store it in the history if statement.parsed.command not in self.exclude_from_history: self.history.append(statement.parsed.raw) try: func = getattr(self, funcname) except AttributeError: return self.default(statement) stop = func(statement) return stop def default(self, statement): """Executed when the command given isn't a recognized command implemented by a do_* method. :param statement: ParsedString - subclass of string including the pyparsing ParseResults :return: """ arg = statement.full_parsed_statement() if self.default_to_shell: result = os.system(arg) # If os.system() succeeded, then don't print warning about unknown command if not result: return # Print out a message stating this is an unknown command self.poutput('*** Unknown syntax: {}\n'.format(arg)) @staticmethod def _surround_ansi_escapes(prompt, start="\x01", end="\x02"): """Overcome bug in GNU Readline in relation to calculation of prompt length in presence of ANSI escape codes. :param prompt: str - original prompt :param start: str - start code to tell GNU Readline about beginning of invisible characters :param end: str - end code to tell GNU Readline about end of invisible characters :return: str - prompt safe to pass to GNU Readline """ # Windows terminals don't use ANSI escape codes and Windows readline isn't based on GNU Readline if sys.platform == "win32": return prompt escaped = False result = "" for c in prompt: if c == "\x1b" and not escaped: result += start + c escaped = True elif c.isalpha() and escaped: result += c + end escaped = False else: result += c return result def pseudo_raw_input(self, prompt): """ began life as a copy of cmd's cmdloop; like raw_input but - accounts for changed stdin, stdout - if input is a pipe (instead of a tty), look at self.echo to decide whether to print the prompt and the input """ # Deal with the vagaries of readline and ANSI escape codes safe_prompt = self._surround_ansi_escapes(prompt) if self.use_rawinput: try: if sys.stdin.isatty(): line = input(safe_prompt) else: line = input() if self.echo: sys.stdout.write('{}{}\n'.format(safe_prompt, line)) except EOFError: line = 'eof' else: if self.stdin.isatty(): # on a tty, print the prompt first, then read the line self.poutput(safe_prompt, end='') self.stdout.flush() line = self.stdin.readline() if len(line) == 0: line = 'eof' else: # we are reading from a pipe, read the line to see if there is # anything there, if so, then decide whether to print the # prompt or not line = self.stdin.readline() if len(line): # we read something, output the prompt and the something if self.echo: self.poutput('{}{}'.format(safe_prompt, line)) else: line = 'eof' return line.strip() def _cmdloop(self): """Repeatedly issue a prompt, accept input, parse an initial prefix off the received input, and dispatch to action methods, passing them the remainder of the line as argument. This serves the same role as cmd.cmdloop(). :return: bool - True implies the entire application should exit. """ # An almost perfect copy from Cmd; however, the pseudo_raw_input portion # has been split out so that it can be called separately if self.use_rawinput and self.completekey: # Set up readline for our tab completion needs if rl_type == RlType.GNU: readline.set_completion_display_matches_hook(self._display_matches_gnu_readline) # Set GNU readline's rl_basic_quote_characters to NULL so it won't automatically add a closing quote # We don't need to worry about setting rl_completion_suppress_quote since we never declared # rl_completer_quote_characters. rl_basic_quote_characters.value = None elif rl_type == RlType.PYREADLINE: readline.rl.mode._display_completions = self._display_matches_pyreadline try: self.old_completer = readline.get_completer() self.old_delims = readline.get_completer_delims() readline.set_completer(self.complete) # Break words on whitespace and quotes when tab completing completer_delims = " \t\n" + ''.join(QUOTES) if self.allow_redirection: # If redirection is allowed, then break words on those characters too completer_delims += ''.join(REDIRECTION_CHARS) readline.set_completer_delims(completer_delims) # Enable tab completion readline.parse_and_bind(self.completekey + ": complete") except NameError: pass stop = None try: while not stop: if self.cmdqueue: # Run command out of cmdqueue if nonempty (populated by load command or commands at invocation) line = self.cmdqueue.pop(0) if self.echo and line != 'eos': self.poutput('{}{}'.format(self.prompt, line)) else: # Otherwise, read a command from stdin if not self.quit_on_sigint: try: line = self.pseudo_raw_input(self.prompt) except KeyboardInterrupt: self.poutput('^C') line = '' else: line = self.pseudo_raw_input(self.prompt) # Run the command along with all associated pre and post hooks stop = self.onecmd_plus_hooks(line) finally: if self.use_rawinput and self.completekey: # Restore what we changed in readline try: readline.set_completer(self.old_completer) readline.set_completer_delims(self.old_delims) except NameError: pass if rl_type == RlType.GNU: readline.set_completion_display_matches_hook(None) rl_basic_quote_characters.value = orig_rl_basic_quote_characters_addr elif rl_type == RlType.PYREADLINE: readline.rl.mode._display_completions = orig_pyreadline_display self.cmdqueue.clear() self._script_dir.clear() return stop @with_argument_list def do_alias(self, arglist): """Define or display aliases Usage: Usage: alias [name] | [ ] Where: name - name of the alias being looked up, added, or replaced value - what the alias will be resolved to (if adding or replacing) this can contain spaces and does not need to be quoted Without arguments, 'alias' prints a list of all aliases in a reusable form which can be outputted to a startup_script to preserve aliases across sessions. With one argument, 'alias' shows the value of the specified alias. Example: alias ls (Prints the value of the alias called 'ls' if it exists) With two or more arguments, 'alias' creates or replaces an alias. Example: alias ls !ls -lF If you want to use redirection or pipes in the alias, then either quote the tokens with these characters or quote the entire alias value. Examples: alias save_results print_results ">" out.txt alias save_results print_results "> out.txt" alias save_results "print_results > out.txt" """ # If no args were given, then print a list of current aliases if not arglist: for cur_alias in self.aliases: self.poutput("alias {} {}".format(cur_alias, self.aliases[cur_alias])) # The user is looking up an alias elif len(arglist) == 1: name = arglist[0] if name in self.aliases: self.poutput("alias {} {}".format(name, self.aliases[name])) else: self.perror("Alias {!r} not found".format(name), traceback_war=False) # The user is creating an alias else: name = arglist[0] value = ' '.join(arglist[1:]) # Check for a valid name for cur_char in name: if cur_char not in self.identchars: self.perror("Alias names can only contain the following characters: {}".format(self.identchars), traceback_war=False) return # Set the alias self.aliases[name] = value self.poutput("Alias {!r} created".format(name)) def complete_alias(self, text, line, begidx, endidx): """ Tab completion for alias """ index_dict = \ { 1: self.aliases, 2: self.get_visible_commands() } return self.index_based_complete(text, line, begidx, endidx, index_dict, self.path_complete) @with_argument_list def do_unalias(self, arglist): """Unsets aliases Usage: Usage: unalias [-a] name [name ...] Where: name - name of the alias being unset Options: -a remove all alias definitions """ if not arglist: self.do_help('unalias') if '-a' in arglist: self.aliases.clear() self.poutput("All aliases cleared") else: # Get rid of duplicates arglist = list(set(arglist)) for cur_arg in arglist: if cur_arg in self.aliases: del self.aliases[cur_arg] self.poutput("Alias {!r} cleared".format(cur_arg)) else: self.perror("Alias {!r} does not exist".format(cur_arg), traceback_war=False) def complete_unalias(self, text, line, begidx, endidx): """ Tab completion for unalias """ return self.basic_complete(text, line, begidx, endidx, self.aliases) @with_argument_list def do_help(self, arglist): """List available commands with "help" or detailed help with "help cmd".""" if not arglist or (len(arglist) == 1 and arglist[0] in ('--verbose', '-v')): verbose = len(arglist) == 1 and arglist[0] in ('--verbose', '-v') self._help_menu(verbose) else: # Getting help for a specific command funcname = self._func_named(arglist[0]) if funcname: # Check to see if this function was decorated with an argparse ArgumentParser func = getattr(self, funcname) if func.__dict__.get('has_parser', False): # Function has an argparser, so get help based on all the arguments in case there are sub-commands new_arglist = arglist[1:] new_arglist.append('-h') # Temporarily redirect all argparse output to both sys.stdout and sys.stderr to self.stdout with redirect_stdout(self.stdout): with redirect_stderr(self.stdout): func(new_arglist) else: # No special behavior needed, delegate to cmd base class do_help() cmd.Cmd.do_help(self, funcname[3:]) else: # This could be a help topic cmd.Cmd.do_help(self, arglist[0]) def _help_menu(self, verbose=False): """Show a list of commands which help can be displayed for. """ # Get a sorted list of help topics help_topics = self.get_help_topics() help_topics.sort() # Get a sorted list of visible command names visible_commands = self.get_visible_commands() visible_commands.sort() cmds_doc = [] cmds_undoc = [] cmds_cats = {} for command in visible_commands: if command in help_topics or getattr(self, self._func_named(command)).__doc__: if command in help_topics: help_topics.remove(command) if hasattr(getattr(self, self._func_named(command)), HELP_CATEGORY): category = getattr(getattr(self, self._func_named(command)), HELP_CATEGORY) cmds_cats.setdefault(category, []) cmds_cats[category].append(command) else: cmds_doc.append(command) else: cmds_undoc.append(command) if len(cmds_cats) == 0: # No categories found, fall back to standard behavior self.poutput("{}\n".format(str(self.doc_leader))) self._print_topics(self.doc_header, cmds_doc, verbose) else: # Categories found, Organize all commands by category self.poutput('{}\n'.format(str(self.doc_leader))) self.poutput('{}\n\n'.format(str(self.doc_header))) for category in sorted(cmds_cats.keys()): self._print_topics(category, cmds_cats[category], verbose) self._print_topics('Other', cmds_doc, verbose) self.print_topics(self.misc_header, help_topics, 15, 80) self.print_topics(self.undoc_header, cmds_undoc, 15, 80) def _print_topics(self, header, cmds, verbose): """Customized version of print_topics that can switch between verbose or traditional output""" if cmds: if not verbose: self.print_topics(header, cmds, 15, 80) else: self.stdout.write('{}\n'.format(str(header))) widest = 0 # measure the commands for command in cmds: width = len(command) if width > widest: widest = width # add a 4-space pad widest += 4 if widest < 20: widest = 20 if self.ruler: self.stdout.write('{:{ruler}<{width}}\n'.format('', ruler=self.ruler, width=80)) for command in cmds: # Try to get the documentation string try: # first see if there's a help function implemented func = getattr(self, 'help_' + command) except AttributeError: # Couldn't find a help function try: # Now see if help_summary has been set doc = getattr(self, self._func_named(command)).help_summary except AttributeError: # Last, try to directly access the function's doc-string doc = getattr(self, self._func_named(command)).__doc__ else: # we found the help function result = StringIO() # try to redirect system stdout with redirect_stdout(result): # save our internal stdout stdout_orig = self.stdout try: # redirect our internal stdout self.stdout = result func() finally: # restore internal stdout self.stdout = stdout_orig doc = result.getvalue() # Attempt to locate the first documentation block doc_block = [] found_first = False for doc_line in doc.splitlines(): str(doc_line).strip() if len(doc_line.strip()) > 0: doc_block.append(doc_line.strip()) found_first = True else: if found_first: break for doc_line in doc_block: self.stdout.write('{: <{col_width}}{doc}\n'.format(command, col_width=widest, doc=doc_line)) command = '' self.stdout.write("\n") def do_shortcuts(self, _): """Lists shortcuts (aliases) available.""" result = "\n".join('%s: %s' % (sc[0], sc[1]) for sc in sorted(self.shortcuts)) self.poutput("Shortcuts for other commands:\n{}\n".format(result)) def do_eof(self, _): """Called when -D is pressed.""" # End of script should not exit app, but -D should. print('') # Required for clearing line when exiting submenu return self._STOP_AND_EXIT def do_quit(self, _): """Exits this application.""" self._should_quit = True return self._STOP_AND_EXIT def select(self, opts, prompt='Your choice? '): """Presents a numbered menu to the user. Modelled after the bash shell's SELECT. Returns the item chosen. Argument ``opts`` can be: | a single string -> will be split into one-word options | a list of strings -> will be offered as options | a list of tuples -> interpreted as (value, text), so that the return value can differ from the text advertised to the user """ local_opts = opts if isinstance(opts, str): local_opts = list(zip(opts.split(), opts.split())) fulloptions = [] for opt in local_opts: if isinstance(opt, str): fulloptions.append((opt, opt)) else: try: fulloptions.append((opt[0], opt[1])) except IndexError: fulloptions.append((opt[0], opt[0])) for (idx, (value, text)) in enumerate(fulloptions): self.poutput(' %2d. %s\n' % (idx + 1, text)) while True: response = input(prompt) hlen = readline.get_current_history_length() if hlen >= 1 and response != '': readline.remove_history_item(hlen - 1) try: response = int(response) result = fulloptions[response - 1][0] break except (ValueError, IndexError): self.poutput("{!r} isn't a valid choice. Pick a number between 1 and {}:\n".format(response, len(fulloptions))) return result def cmdenvironment(self): """Get a summary report of read-only settings which the user cannot modify at runtime. :return: str - summary report of read-only settings which the user cannot modify at runtime """ read_only_settings = """ Commands may be terminated with: {} Arguments at invocation allowed: {} Output redirection and pipes allowed: {} Parsing of command arguments: Shell lexer mode for command argument splitting: {} Strip Quotes after splitting arguments: {} """.format(str(self.terminators), self.allow_cli_args, self.allow_redirection, "POSIX" if POSIX_SHLEX else "non-POSIX", "True" if STRIP_QUOTES_FOR_NON_POSIX and not POSIX_SHLEX else "False") return read_only_settings def show(self, args, parameter): param = '' if parameter: param = parameter.strip().lower() result = {} maxlen = 0 for p in self.settable: if (not param) or p.startswith(param): result[p] = '%s: %s' % (p, str(getattr(self, p))) maxlen = max(maxlen, len(result[p])) if result: for p in sorted(result): if args.long: self.poutput('{} # {}'.format(result[p].ljust(maxlen), self.settable[p])) else: self.poutput(result[p]) # If user has requested to see all settings, also show read-only settings if args.all: self.poutput('\nRead only settings:{}'.format(self.cmdenvironment())) else: raise LookupError("Parameter '%s' not supported (type 'show' for list of parameters)." % param) set_parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter) set_parser.add_argument('-a', '--all', action='store_true', help='display read-only settings as well') set_parser.add_argument('-l', '--long', action='store_true', help='describe function of parameter') set_parser.add_argument('settable', nargs='*', help='[param_name] [value]') @with_argparser(set_parser) def do_set(self, args): """Sets a settable parameter or shows current settings of parameters. Accepts abbreviated parameter names so long as there is no ambiguity. Call without arguments for a list of settable parameters with their values. """ try: param_name, val = args.settable val = val.strip() param_name = param_name.strip().lower() if param_name not in self.settable: hits = [p for p in self.settable if p.startswith(param_name)] if len(hits) == 1: param_name = hits[0] else: return self.show(args, param_name) current_val = getattr(self, param_name) if (val[0] == val[-1]) and val[0] in ("'", '"'): val = val[1:-1] else: val = cast(current_val, val) setattr(self, param_name, val) self.poutput('%s - was: %s\nnow: %s\n' % (param_name, current_val, val)) if current_val != val: try: onchange_hook = getattr(self, '_onchange_%s' % param_name) onchange_hook(old=current_val, new=val) except AttributeError: pass except (ValueError, AttributeError): param = '' if args.settable: param = args.settable[0] self.show(args, param) def do_shell(self, command): """Execute a command as if at the OS prompt. Usage: shell [arguments]""" try: # Use non-POSIX parsing to keep the quotes around the tokens tokens = shlex.split(command, posix=False) except ValueError as err: self.perror(err, traceback_war=False) return # Support expanding ~ in quoted paths for index, _ in enumerate(tokens): if tokens[index]: # Check if the token is quoted. Since shlex.split() passed, there isn't # an unclosed quote, so we only need to check the first character. first_char = tokens[index][0] if first_char in QUOTES: tokens[index] = strip_quotes(tokens[index]) tokens[index] = os.path.expanduser(tokens[index]) # Restore the quotes if first_char in QUOTES: tokens[index] = first_char + tokens[index] + first_char expanded_command = ' '.join(tokens) proc = subprocess.Popen(expanded_command, stdout=self.stdout, shell=True) proc.communicate() def complete_shell(self, text, line, begidx, endidx): """Handles tab completion of executable commands and local file system paths for the shell command :param text: str - the string prefix we are attempting to match (all returned matches must begin with it) :param line: str - the current input line with leading whitespace removed :param begidx: int - the beginning index of the prefix text :param endidx: int - the ending index of the prefix text :return: List[str] - a list of possible tab completions """ index_dict = {1: self.shell_cmd_complete} return self.index_based_complete(text, line, begidx, endidx, index_dict, self.path_complete) def cmd_with_subs_completer(self, text, line, begidx, endidx): """ This is a function provided for convenience to those who want an easy way to add tab completion to functions that implement subcommands. By setting this as the completer of the base command function, the correct completer for the chosen subcommand will be called. The use of this function requires assigning a completer function to the subcommand's parser Example: A command called print has a subcommands called 'names' that needs a tab completer When you create the parser for names, include the completer function in the parser's defaults. names_parser.set_defaults(func=print_names, completer=complete_print_names) To make sure the names completer gets called, set the completer for the print function in a similar fashion to what follows. complete_print = cmd2.Cmd.cmd_with_subs_completer When the subcommand's completer is called, this function will have stripped off all content from the beginning of the command line before the subcommand, meaning the line parameter always starts with the subcommand name and the index parameters reflect this change. For instance, the command "print names -d 2" becomes "names -d 2" begidx and endidx are incremented accordingly :param text: str - the string prefix we are attempting to match (all returned matches must begin with it) :param line: str - the current input line with leading whitespace removed :param begidx: int - the beginning index of the prefix text :param endidx: int - the ending index of the prefix text :return: List[str] - a list of possible tab completions """ # The command is the token at index 0 in the command line cmd_index = 0 # The subcommand is the token at index 1 in the command line subcmd_index = 1 # Get all tokens through the one being completed tokens, _ = self.tokens_for_completion(line, begidx, endidx) if tokens is None: return [] matches = [] # Get the index of the token being completed index = len(tokens) - 1 # If the token being completed is past the subcommand name, then do subcommand specific tab-completion if index > subcmd_index: # Get the command name command = tokens[cmd_index] # Get the subcommand name subcommand = tokens[subcmd_index] # Find the offset into line where the subcommand name begins subcmd_start = 0 for cur_index in range(0, subcmd_index + 1): cur_token = tokens[cur_index] subcmd_start = line.find(cur_token, subcmd_start) if cur_index != subcmd_index: subcmd_start += len(cur_token) # Strip off everything before subcommand name orig_line = line line = line[subcmd_start:] # Update the indexes diff = len(orig_line) - len(line) begidx -= diff endidx -= diff # Call the subcommand specific completer if it exists compfunc = self.get_subcommand_completer(command, subcommand) if compfunc is not None: matches = compfunc(self, text, line, begidx, endidx) return matches # noinspection PyBroadException def do_py(self, arg): """ Invoke python command, shell, or script py : Executes a Python command. py: Enters interactive Python mode. End with ``Ctrl-D`` (Unix) / ``Ctrl-Z`` (Windows), ``quit()``, '`exit()``. Non-python commands can be issued with ``cmd("your command")``. Run python code from external script files with ``run("script.py")`` """ if self._in_py: self.perror("Recursively entering interactive Python consoles is not allowed.", traceback_war=False) return self._in_py = True try: self.pystate['self'] = self arg = arg.strip() # Support the run command even if called prior to invoking an interactive interpreter def run(filename): """Run a Python script file in the interactive console. :param filename: str - filename of *.py script file to run """ try: with open(filename) as f: interp.runcode(f.read()) except IOError as e: self.perror(e) def onecmd_plus_hooks(cmd_plus_args): """Run a cmd2.Cmd command from a Python script or the interactive Python console. :param cmd_plus_args: str - command line including command and arguments to run :return: bool - True if cmdloop() should exit once leaving the interactive Python console """ return self.onecmd_plus_hooks(cmd_plus_args + '\n') self.pystate['run'] = run self.pystate['cmd'] = onecmd_plus_hooks localvars = (self.locals_in_py and self.pystate) or {} interp = InteractiveConsole(locals=localvars) interp.runcode('import sys, os;sys.path.insert(0, os.getcwd())') if arg: interp.runcode(arg) else: # noinspection PyShadowingBuiltins def quit(): """Function callable from the interactive Python console to exit that environment""" raise EmbeddedConsoleExit self.pystate['quit'] = quit self.pystate['exit'] = quit keepstate = None try: cprt = 'Type "help", "copyright", "credits" or "license" for more information.' keepstate = Statekeeper(sys, ('stdin', 'stdout')) sys.stdout = self.stdout sys.stdin = self.stdin interp.interact(banner="Python %s on %s\n%s\n(%s)\n%s" % (sys.version, sys.platform, cprt, self.__class__.__name__, self.do_py.__doc__)) except EmbeddedConsoleExit: pass if keepstate is not None: keepstate.restore() except Exception: pass finally: self._in_py = False return self._should_quit @with_argument_list def do_pyscript(self, arglist): """\nRuns a python script file inside the console Usage: pyscript [script_arguments] Console commands can be executed inside this script with cmd("your command") However, you cannot run nested "py" or "pyscript" commands from within this script Paths or arguments that contain spaces must be enclosed in quotes """ if not arglist: self.perror("pyscript command requires at least 1 argument ...", traceback_war=False) self.do_help('pyscript') return # Get the absolute path of the script script_path = os.path.expanduser(arglist[0]) # Save current command line arguments orig_args = sys.argv # Overwrite sys.argv to allow the script to take command line arguments sys.argv = [script_path] sys.argv.extend(arglist[1:]) # Run the script - use repr formatting to escape things which need to be escaped to prevent issues on Windows self.do_py("run({!r})".format(script_path)) # Restore command line arguments to original state sys.argv = orig_args # Enable tab-completion for pyscript command def complete_pyscript(self, text, line, begidx, endidx): index_dict = {1: self.path_complete} return self.index_based_complete(text, line, begidx, endidx, index_dict) # Only include the do_ipy() method if IPython is available on the system if ipython_available: # noinspection PyMethodMayBeStatic,PyUnusedLocal def do_ipy(self, arg): """Enters an interactive IPython shell. Run python code from external files with ``run filename.py`` End with ``Ctrl-D`` (Unix) / ``Ctrl-Z`` (Windows), ``quit()``, '`exit()``. """ banner = 'Entering an embedded IPython shell type quit() or -d to exit ...' exit_msg = 'Leaving IPython, back to {}'.format(sys.argv[0]) embed(banner1=banner, exit_msg=exit_msg) history_parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter) history_parser_group = history_parser.add_mutually_exclusive_group() history_parser_group.add_argument('-r', '--run', action='store_true', help='run selected history items') history_parser_group.add_argument('-e', '--edit', action='store_true', help='edit and then run selected history items') history_parser_group.add_argument('-s', '--script', action='store_true', help='script format; no separation lines') history_parser_group.add_argument('-o', '--output-file', metavar='FILE', help='output commands to a script file') history_parser_group.add_argument('-t', '--transcript', help='output commands and results to a transcript file') _history_arg_help = """empty all history items a one history item by number a..b, a:b, a:, ..b items by indices (inclusive) [string] items containing string /regex/ items matching regular expression""" history_parser.add_argument('arg', nargs='?', help=_history_arg_help) @with_argparser(history_parser) def do_history(self, args): """View, run, edit, and save previously entered commands.""" # If an argument was supplied, then retrieve partial contents of the history cowardly_refuse_to_run = False if args.arg: # If a character indicating a slice is present, retrieve # a slice of the history arg = args.arg if '..' in arg or ':' in arg: try: # Get a slice of history history = self.history.span(arg) except IndexError: history = self.history.get(arg) else: # Get item(s) from history by index or string search history = self.history.get(arg) else: # If no arg given, then retrieve the entire history cowardly_refuse_to_run = True # Get a copy of the history so it doesn't get mutated while we are using it history = self.history[:] if args.run: if cowardly_refuse_to_run: self.perror("Cowardly refusing to run all previously entered commands.", traceback_war=False) self.perror("If this is what you want to do, specify '1:' as the range of history.", traceback_war=False) else: for runme in history: self.pfeedback(runme) if runme: self.onecmd_plus_hooks(runme) elif args.edit: fd, fname = tempfile.mkstemp(suffix='.txt', text=True) with os.fdopen(fd, 'w') as fobj: for command in history: fobj.write('{}\n'.format(command)) try: os.system('"{}" "{}"'.format(self.editor, fname)) self.do_load(fname) except Exception: raise finally: os.remove(fname) elif args.output_file: try: with open(os.path.expanduser(args.output_file), 'w') as fobj: for command in history: fobj.write('{}\n'.format(command)) plural = 's' if len(history) > 1 else '' self.pfeedback('{} command{} saved to {}'.format(len(history), plural, args.output_file)) except Exception as e: self.perror('Saving {!r} - {}'.format(args.output_file, e), traceback_war=False) elif args.transcript: # Make sure echo is on so commands print to standard out saved_echo = self.echo self.echo = True # Redirect stdout to the transcript file saved_self_stdout = self.stdout self.stdout = open(args.transcript, 'w') # Run all of the commands in the history with output redirected to transcript and echo on self.runcmds_plus_hooks(history) # Restore stdout to its original state self.stdout.close() self.stdout = saved_self_stdout # Set echo back to its original state self.echo = saved_echo # Post-process the file to escape un-escaped "/" regex escapes with open(args.transcript, 'r') as fin: data = fin.read() post_processed_data = data.replace('/', '\/') with open(args.transcript, 'w') as fout: fout.write(post_processed_data) plural = 's' if len(history) > 1 else '' self.pfeedback('{} command{} and outputs saved to transcript file {!r}'.format(len(history), plural, args.transcript)) else: # Display the history items retrieved for hi in history: if args.script: self.poutput(hi) else: self.poutput(hi.pr()) @with_argument_list def do_edit(self, arglist): """Edit a file in a text editor. Usage: edit [file_path] Where: * file_path - path to a file to open in editor The editor used is determined by the ``editor`` settable parameter. "set editor (program-name)" to change or set the EDITOR environment variable. """ if not self.editor: raise EnvironmentError("Please use 'set editor' to specify your text editing program of choice.") filename = arglist[0] if arglist else '' if filename: os.system('"{}" "{}"'.format(self.editor, filename)) else: os.system('"{}"'.format(self.editor)) # Enable tab-completion for edit command def complete_edit(self, text, line, begidx, endidx): index_dict = {1: self.path_complete} return self.index_based_complete(text, line, begidx, endidx, index_dict) @property def _current_script_dir(self): """Accessor to get the current script directory from the _script_dir LIFO queue.""" if self._script_dir: return self._script_dir[-1] else: return None @with_argument_list def do__relative_load(self, arglist): """Runs commands in script file that is encoded as either ASCII or UTF-8 text. Usage: _relative_load optional argument: file_path a file path pointing to a script Script should contain one command per line, just like command would be typed in console. If this is called from within an already-running script, the filename will be interpreted relative to the already-running script's directory. NOTE: This command is intended to only be used within text file scripts. """ # If arg is None or arg is an empty string this is an error if not arglist: self.perror('_relative_load command requires a file path:', traceback_war=False) return file_path = arglist[0].strip() # NOTE: Relative path is an absolute path, it is just relative to the current script directory relative_path = os.path.join(self._current_script_dir or '', file_path) self.do_load(relative_path) def do_eos(self, _): """Handles cleanup when a script has finished executing.""" if self._script_dir: self._script_dir.pop() @with_argument_list def do_load(self, arglist): """Runs commands in script file that is encoded as either ASCII or UTF-8 text. Usage: load * file_path - a file path pointing to a script Script should contain one command per line, just like command would be typed in console. """ # If arg is None or arg is an empty string this is an error if not arglist: self.perror('load command requires a file path:', traceback_war=False) return file_path = arglist[0].strip() expanded_path = os.path.abspath(os.path.expanduser(file_path)) # Make sure expanded_path points to a file if not os.path.isfile(expanded_path): self.perror('{} does not exist or is not a file'.format(expanded_path), traceback_war=False) return # Make sure the file is not empty if os.path.getsize(expanded_path) == 0: self.perror('{} is empty'.format(expanded_path), traceback_war=False) return # Make sure the file is ASCII or UTF-8 encoded text if not self.is_text_file(expanded_path): self.perror('{} is not an ASCII or UTF-8 encoded text file'.format(expanded_path), traceback_war=False) return try: # Read all lines of the script and insert into the head of the # command queue. Add an "end of script (eos)" command to cleanup the # self._script_dir list when done. with open(expanded_path, encoding='utf-8') as target: self.cmdqueue = target.read().splitlines() + ['eos'] + self.cmdqueue except IOError as e: self.perror('Problem accessing script from {}:\n{}'.format(expanded_path, e)) return self._script_dir.append(os.path.dirname(expanded_path)) # Enable tab-completion for load command def complete_load(self, text, line, begidx, endidx): index_dict = {1: self.path_complete} return self.index_based_complete(text, line, begidx, endidx, index_dict) @staticmethod def is_text_file(file_path): """ Returns if a file contains only ASCII or UTF-8 encoded text :param file_path: path to the file being checked """ expanded_path = os.path.abspath(os.path.expanduser(file_path.strip())) valid_text_file = False # Check if the file is ASCII try: with codecs.open(expanded_path, encoding='ascii', errors='strict') as f: # Make sure the file has at least one line of text # noinspection PyUnusedLocal if sum(1 for line in f) > 0: valid_text_file = True except IOError: pass except UnicodeDecodeError: # The file is not ASCII. Check if it is UTF-8. try: with codecs.open(expanded_path, encoding='utf-8', errors='strict') as f: # Make sure the file has at least one line of text # noinspection PyUnusedLocal if sum(1 for line in f) > 0: valid_text_file = True except IOError: pass except UnicodeDecodeError: # Not UTF-8 pass return valid_text_file def run_transcript_tests(self, callargs): """Runs transcript tests for provided file(s). This is called when either -t is provided on the command line or the transcript_files argument is provided during construction of the cmd2.Cmd instance. :param callargs: List[str] - list of transcript test file names """ class TestMyAppCase(Cmd2TestCase): cmdapp = self self.__class__.testfiles = callargs sys.argv = [sys.argv[0]] # the --test argument upsets unittest.main() testcase = TestMyAppCase() runner = unittest.TextTestRunner() runner.run(testcase) def cmdloop(self, intro=None): """This is an outer wrapper around _cmdloop() which deals with extra features provided by cmd2. _cmdloop() provides the main loop equivalent to cmd.cmdloop(). This is a wrapper around that which deals with the following extra features provided by cmd2: - commands at invocation - transcript testing - intro banner :param intro: str - if provided this overrides self.intro and serves as the intro banner printed once at start """ if self.allow_cli_args: parser = argparse.ArgumentParser() parser.add_argument('-t', '--test', action="store_true", help='Test against transcript(s) in FILE (wildcards OK)') callopts, callargs = parser.parse_known_args() # If transcript testing was called for, use other arguments as transcript files if callopts.test: self._transcript_files = callargs # If commands were supplied at invocation, then add them to the command queue if callargs: self.cmdqueue.extend(callargs) # Always run the preloop first self.preloop() # If transcript-based regression testing was requested, then do that instead of the main loop if self._transcript_files is not None: self.run_transcript_tests(self._transcript_files) else: # If an intro was supplied in the method call, allow it to override the default if intro is not None: self.intro = intro # Print the intro, if there is one, right after the preloop if self.intro is not None: self.poutput(str(self.intro) + "\n") # And then call _cmdloop() to enter the main loop self._cmdloop() # Run the postloop() no matter what self.postloop() # noinspection PyPep8Naming class ParserManager: """ Class which encapsulates all of the pyparsing parser functionality for cmd2 in a single location. """ def __init__(self, redirector, terminators, multilineCommands, legalChars, commentGrammars, commentInProgress, blankLinesAllowed, prefixParser, preparse, postparse, aliases, shortcuts): """Creates and uses parsers for user input according to app's parameters.""" self.commentGrammars = commentGrammars self.preparse = preparse self.postparse = postparse self.aliases = aliases self.shortcuts = shortcuts self.main_parser = self._build_main_parser(redirector=redirector, terminators=terminators, multilineCommands=multilineCommands, legalChars=legalChars, commentInProgress=commentInProgress, blankLinesAllowed=blankLinesAllowed, prefixParser=prefixParser) self.input_source_parser = self._build_input_source_parser(legalChars=legalChars, commentInProgress=commentInProgress) def _build_main_parser(self, redirector, terminators, multilineCommands, legalChars, commentInProgress, blankLinesAllowed, prefixParser): """Builds a PyParsing parser for interpreting user commands.""" # Build several parsing components that are eventually compiled into overall parser output_destination_parser = (pyparsing.Literal(redirector * 2) | (pyparsing.WordStart() + redirector) | pyparsing.Regex('[^=]' + redirector))('output') terminator_parser = pyparsing.Or( [(hasattr(t, 'parseString') and t) or pyparsing.Literal(t) for t in terminators])('terminator') string_end = pyparsing.stringEnd ^ '\nEOF' multilineCommand = pyparsing.Or( [pyparsing.Keyword(c, caseless=False) for c in multilineCommands])('multilineCommand') oneline_command = (~multilineCommand + pyparsing.Word(legalChars))('command') pipe = pyparsing.Keyword('|', identChars='|') do_not_parse = self.commentGrammars | commentInProgress | pyparsing.quotedString after_elements = \ pyparsing.Optional(pipe + pyparsing.SkipTo(output_destination_parser ^ string_end, ignore=do_not_parse)('pipeTo')) + \ pyparsing.Optional(output_destination_parser + pyparsing.SkipTo(string_end, ignore=do_not_parse). setParseAction(lambda x: strip_quotes(x[0].strip()))('outputTo')) multilineCommand.setParseAction(lambda x: x[0]) oneline_command.setParseAction(lambda x: x[0]) if blankLinesAllowed: blankLineTerminationParser = pyparsing.NoMatch else: blankLineTerminator = (pyparsing.lineEnd + pyparsing.lineEnd)('terminator') blankLineTerminator.setResultsName('terminator') blankLineTerminationParser = ((multilineCommand ^ oneline_command) + pyparsing.SkipTo(blankLineTerminator, ignore=do_not_parse).setParseAction( lambda x: x[0].strip())('args') + blankLineTerminator)('statement') multilineParser = (((multilineCommand ^ oneline_command) + pyparsing.SkipTo(terminator_parser, ignore=do_not_parse).setParseAction(lambda x: x[0].strip())('args') + terminator_parser)('statement') + pyparsing.SkipTo(output_destination_parser ^ pipe ^ string_end, ignore=do_not_parse).setParseAction(lambda x: x[0].strip())('suffix') + after_elements) multilineParser.ignore(commentInProgress) singleLineParser = ((oneline_command + pyparsing.SkipTo(terminator_parser ^ string_end ^ pipe ^ output_destination_parser, ignore=do_not_parse).setParseAction( lambda x: x[0].strip())('args'))('statement') + pyparsing.Optional(terminator_parser) + after_elements) blankLineTerminationParser = blankLineTerminationParser.setResultsName('statement') parser = prefixParser + ( string_end | multilineParser | singleLineParser | blankLineTerminationParser | multilineCommand + pyparsing.SkipTo(string_end, ignore=do_not_parse) ) parser.ignore(self.commentGrammars) return parser @staticmethod def _build_input_source_parser(legalChars, commentInProgress): """Builds a PyParsing parser for alternate user input sources (from file, pipe, etc.)""" input_mark = pyparsing.Literal('<') input_mark.setParseAction(lambda x: '') # Also allow spaces, slashes, and quotes file_name = pyparsing.Word(legalChars + ' /\\"\'') input_from = file_name('inputFrom') input_from.setParseAction(replace_with_file_contents) # a not-entirely-satisfactory way of distinguishing < as in "import from" from < # as in "lesser than" inputParser = input_mark + pyparsing.Optional(input_from) + pyparsing.Optional('>') + \ pyparsing.Optional(file_name) + (pyparsing.stringEnd | '|') inputParser.ignore(commentInProgress) return inputParser def parsed(self, raw): """ This function is where the actual parsing of each line occurs. :param raw: str - the line of text as it was entered :return: ParsedString - custom subclass of str with extra attributes """ if isinstance(raw, ParsedString): p = raw else: # preparse is an overridable hook; default makes no changes s = self.preparse(raw) s = self.input_source_parser.transformString(s.lstrip()) s = self.commentGrammars.transformString(s) # Make a copy of aliases so we can edit it tmp_aliases = list(self.aliases.keys()) keep_expanding = len(tmp_aliases) > 0 # Expand aliases while keep_expanding: for cur_alias in tmp_aliases: keep_expanding = False if s == cur_alias or s.startswith(cur_alias + ' '): s = s.replace(cur_alias, self.aliases[cur_alias], 1) # Do not expand the same alias more than once tmp_aliases.remove(cur_alias) keep_expanding = len(tmp_aliases) > 0 break # Expand command shortcut to its full command name for (shortcut, expansion) in self.shortcuts: if s.startswith(shortcut): # If the next character after the shortcut isn't a space, then insert one shortcut_len = len(shortcut) if len(s) == shortcut_len or s[shortcut_len] != ' ': expansion += ' ' # Expand the shortcut s = s.replace(shortcut, expansion, 1) break try: result = self.main_parser.parseString(s) except pyparsing.ParseException: # If we have a parsing failure, treat it is an empty command and move to next prompt result = self.main_parser.parseString('') result['raw'] = raw result['command'] = result.multilineCommand or result.command result = self.postparse(result) p = ParsedString(result.args) p.parsed = result p.parser = self.parsed return p class HistoryItem(str): """Class used to represent an item in the History list. Thin wrapper around str class which adds a custom format for printing. It also keeps track of its index in the list as well as a lowercase representation of itself for convenience/efficiency. """ listformat = '-------------------------[{}]\n{}\n' # noinspection PyUnusedLocal def __init__(self, instr): str.__init__(self) self.lowercase = self.lower() self.idx = None def pr(self): """Represent a HistoryItem in a pretty fashion suitable for printing. :return: str - pretty print string version of a HistoryItem """ return self.listformat.format(self.idx, str(self).rstrip()) class History(list): """ A list of HistoryItems that knows how to respond to user requests. """ # noinspection PyMethodMayBeStatic def _zero_based_index(self, onebased): result = onebased if result > 0: result -= 1 return result def _to_index(self, raw): if raw: result = self._zero_based_index(int(raw)) else: result = None return result spanpattern = re.compile(r'^\s*(?P-?\d+)?\s*(?P:|(\.{2,}))?\s*(?P-?\d+)?\s*$') def span(self, raw): """Parses the input string search for a span pattern and if if found, returns a slice from the History list. :param raw: str - string potentially containing a span of the forms a..b, a:b, a:, ..b :return: List[HistoryItem] - slice from the History list """ if raw.lower() in ('*', '-', 'all'): raw = ':' results = self.spanpattern.search(raw) if not results: raise IndexError if not results.group('separator'): return [self[self._to_index(results.group('start'))]] start = self._to_index(results.group('start')) or 0 # Ensure start is not None end = self._to_index(results.group('end')) reverse = False if end is not None: if end < start: (start, end) = (end, start) reverse = True end += 1 result = self[start:end] if reverse: result.reverse() return result rangePattern = re.compile(r'^\s*(?P[\d]+)?\s*-\s*(?P[\d]+)?\s*$') def append(self, new): """Append a HistoryItem to end of the History list :param new: str - command line to convert to HistoryItem and add to the end of the History list """ new = HistoryItem(new) list.append(self, new) new.idx = len(self) def get(self, getme=None): """Get an item or items from the History list using 1-based indexing. :param getme: int or str - item(s) to get - either an integer index or string to search for :return: List[str] - list of HistoryItems matching the retrieval criteria """ if not getme: return self try: getme = int(getme) if getme < 0: return self[:(-1 * getme)] else: return [self[getme - 1]] except IndexError: return [] except ValueError: range_result = self.rangePattern.search(getme) if range_result: start = range_result.group('start') or None end = range_result.group('start') or None if start: start = int(start) - 1 if end: end = int(end) return self[start:end] # noinspection PyUnresolvedReferences getme = getme.strip() if getme.startswith(r'/') and getme.endswith(r'/'): finder = re.compile(getme[1:-1], re.DOTALL | re.MULTILINE | re.IGNORECASE) def isin(hi): """Listcomp filter function for doing a regular expression search of History. :param hi: HistoryItem :return: bool - True if search matches """ return finder.search(hi) else: def isin(hi): """Listcomp filter function for doing a case-insensitive string search of History. :param hi: HistoryItem :return: bool - True if search matches """ return getme.lower() in hi.lowercase return [itm for itm in self if isin(itm)] def cast(current, new): """Tries to force a new value into the same type as the current when trying to set the value for a parameter. :param current: current value for the parameter, type varies :param new: str - new value :return: new value with same type as current, or the current value if there was an error casting """ typ = type(current) if typ == bool: try: return bool(int(new)) except (ValueError, TypeError): pass try: new = new.lower() except AttributeError: pass if (new == 'on') or (new[0] in ('y', 't')): return True if (new == 'off') or (new[0] in ('n', 'f')): return False else: try: return typ(new) except (ValueError, TypeError): pass print("Problem setting parameter (now %s) to %s; incorrect type?" % (current, new)) return current class Statekeeper(object): """Class used to save and restore state during load and py commands as well as when redirecting output or pipes.""" def __init__(self, obj, attribs): """Use the instance attributes as a generic key-value store to copy instance attributes from outer object. :param obj: instance of cmd2.Cmd derived class (your application instance) :param attribs: Tuple[str] - tuple of strings listing attributes of obj to save a copy of """ self.obj = obj self.attribs = attribs if self.obj: self._save() def _save(self): """Create copies of attributes from self.obj inside this Statekeeper instance.""" for attrib in self.attribs: setattr(self, attrib, getattr(self.obj, attrib)) def restore(self): """Overwrite attributes in self.obj with the saved values stored in this Statekeeper instance.""" if self.obj: for attrib in self.attribs: setattr(self.obj, attrib, getattr(self, attrib)) class OutputTrap(object): """Instantiate an OutputTrap to divert/capture ALL stdout output. For use in transcript testing.""" def __init__(self): self.contents = '' def write(self, txt): """Add text to the internal contents. :param txt: str """ self.contents += txt def read(self): """Read from the internal contents and then clear them out. :return: str - text from the internal contents """ result = self.contents self.contents = '' return result class Cmd2TestCase(unittest.TestCase): """Subclass this, setting CmdApp, to make a unittest.TestCase class that will execute the commands in a transcript file and expect the results shown. See example.py""" cmdapp = None def fetchTranscripts(self): self.transcripts = {} for fileset in self.cmdapp.testfiles: for fname in glob.glob(fileset): tfile = open(fname) self.transcripts[fname] = iter(tfile.readlines()) tfile.close() if not len(self.transcripts): raise Exception("No test files found - nothing to test.") def setUp(self): if self.cmdapp: self.fetchTranscripts() # Trap stdout self._orig_stdout = self.cmdapp.stdout self.cmdapp.stdout = OutputTrap() def runTest(self): # was testall if self.cmdapp: its = sorted(self.transcripts.items()) for (fname, transcript) in its: self._test_transcript(fname, transcript) def _test_transcript(self, fname, transcript): line_num = 0 finished = False line = strip_ansi(next(transcript)) line_num += 1 while not finished: # Scroll forward to where actual commands begin while not line.startswith(self.cmdapp.visible_prompt): try: line = strip_ansi(next(transcript)) except StopIteration: finished = True break line_num += 1 command = [line[len(self.cmdapp.visible_prompt):]] line = next(transcript) # Read the entirety of a multi-line command while line.startswith(self.cmdapp.continuation_prompt): command.append(line[len(self.cmdapp.continuation_prompt):]) try: line = next(transcript) except StopIteration: raise (StopIteration, 'Transcript broke off while reading command beginning at line {} with\n{}'.format(line_num, command[0]) ) line_num += 1 command = ''.join(command) # Send the command into the application and capture the resulting output # TODO: Should we get the return value and act if stop == True? self.cmdapp.onecmd_plus_hooks(command) result = self.cmdapp.stdout.read() # Read the expected result from transcript if strip_ansi(line).startswith(self.cmdapp.visible_prompt): message = '\nFile {}, line {}\nCommand was:\n{}\nExpected: (nothing)\nGot:\n{}\n'.format( fname, line_num, command, result) self.assert_(not (result.strip()), message) continue expected = [] while not strip_ansi(line).startswith(self.cmdapp.visible_prompt): expected.append(line) try: line = next(transcript) except StopIteration: finished = True break line_num += 1 expected = ''.join(expected) # transform the expected text into a valid regular expression expected = self._transform_transcript_expected(expected) message = '\nFile {}, line {}\nCommand was:\n{}\nExpected:\n{}\nGot:\n{}\n'.format( fname, line_num, command, expected, result) self.assertTrue(re.match(expected, result, re.MULTILINE | re.DOTALL), message) def _transform_transcript_expected(self, s): """parse the string with slashed regexes into a valid regex Given a string like: Match a 10 digit phone number: /\d{3}-\d{3}-\d{4}/ Turn it into a valid regular expression which matches the literal text of the string and the regular expression. We have to remove the slashes because they differentiate between plain text and a regular expression. Unless the slashes are escaped, in which case they are interpreted as plain text, or there is only one slash, which is treated as plain text also. Check the tests in tests/test_transcript.py to see all the edge cases. """ regex = '' start = 0 while True: (regex, first_slash_pos, start) = self._escaped_find(regex, s, start, False) if first_slash_pos == -1: # no more slashes, add the rest of the string and bail regex += re.escape(s[start:]) break else: # there is a slash, add everything we have found so far # add stuff before the first slash as plain text regex += re.escape(s[start:first_slash_pos]) start = first_slash_pos+1 # and go find the next one (regex, second_slash_pos, start) = self._escaped_find(regex, s, start, True) if second_slash_pos > 0: # add everything between the slashes (but not the slashes) # as a regular expression regex += s[start:second_slash_pos] # and change where we start looking for slashed on the # turn through the loop start = second_slash_pos + 1 else: # No closing slash, we have to add the first slash, # and the rest of the text regex += re.escape(s[start-1:]) break return regex @staticmethod def _escaped_find(regex, s, start, in_regex): """ Find the next slash in {s} after {start} that is not preceded by a backslash. If we find an escaped slash, add everything up to and including it to regex, updating {start}. {start} therefore serves two purposes, tells us where to start looking for the next thing, and also tells us where in {s} we have already added things to {regex} {in_regex} specifies whether we are currently searching in a regex, we behave differently if we are or if we aren't. """ while True: pos = s.find('/', start) if pos == -1: # no match, return to caller break elif pos == 0: # slash at the beginning of the string, so it can't be # escaped. We found it. break else: # check if the slash is preceeded by a backslash if s[pos-1:pos] == '\\': # it is. if in_regex: # add everything up to the backslash as a # regular expression regex += s[start:pos-1] # skip the backslash, and add the slash regex += s[pos] else: # add everything up to the backslash as escaped # plain text regex += re.escape(s[start:pos-1]) # and then add the slash as escaped # plain text regex += re.escape(s[pos]) # update start to show we have handled everything # before it start = pos+1 # and continue to look else: # slash is not escaped, this is what we are looking for break return regex, pos, start def tearDown(self): if self.cmdapp: # Restore stdout self.cmdapp.stdout = self._orig_stdout def namedtuple_with_two_defaults(typename, field_names, default_values=('', '')): """Wrapper around namedtuple which lets you treat the last value as optional. :param typename: str - type name for the Named tuple :param field_names: List[str] or space-separated string of field names :param default_values: (optional) 2-element tuple containing the default values for last 2 parameters in named tuple Defaults to an empty string for both of them :return: namedtuple type """ T = collections.namedtuple(typename, field_names) # noinspection PyUnresolvedReferences T.__new__.__defaults__ = default_values return T class CmdResult(namedtuple_with_two_defaults('CmdResult', ['out', 'err', 'war'])): """Derive a class to store results from a named tuple so we can tweak dunder methods for convenience. This is provided as a convenience and an example for one possible way for end users to store results in the self._last_result attribute of cmd2.Cmd class instances. See the "python_scripting.py" example for how it can be used to enable conditional control flow. Named tuple attributes ---------------------- out - this is intended to store normal output data from the command and can be of any type that makes sense err: str - (optional) this is intended to store an error message and it being non-empty indicates there was an error Defaults to an empty string war: str - (optional) this is intended to store a warning message which isn't quite an error, but of note Defaults to an empty string. NOTE: Named tuples are immutable. So the contents are there for access, not for modification. """ def __bool__(self): """If err is an empty string, treat the result as a success; otherwise treat it as a failure.""" return not self.err