diff options
Diffstat (limited to 'cmd2/cmd2.py')
-rw-r--r-- | cmd2/cmd2.py | 1562 |
1 files changed, 971 insertions, 591 deletions
diff --git a/cmd2/cmd2.py b/cmd2/cmd2.py index fb929078..dec0a04d 100644 --- a/cmd2/cmd2.py +++ b/cmd2/cmd2.py @@ -32,25 +32,26 @@ Git repository on GitHub at https://github.com/python-cmd2/cmd2 import argparse import cmd import collections +import colorama from colorama import Fore import glob import inspect import os -import platform import re import shlex import sys -from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple, Type, Union +import threading +from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple, Type, Union, IO from . import constants from . import utils from . import plugin -from .argparse_completer import AutoCompleter, ACArgumentParser +from .argparse_completer import AutoCompleter, ACArgumentParser, ACTION_ARG_CHOICES from .clipboard import can_clip, get_paste_buffer, write_to_paste_buffer -from .parsing import StatementParser, Statement +from .parsing import StatementParser, Statement, Macro, MacroArg # Set up readline -from .rl_utils import rl_type, RlType +from .rl_utils import rl_type, RlType, rl_get_point, rl_set_prompt, vt100_support, rl_make_safe_prompt if rl_type == RlType.NONE: # pragma: no cover rl_warning = "Readline features including tab completion have been disabled since no \n" \ "supported version of readline was found. To resolve this, install \n" \ @@ -104,9 +105,9 @@ except ImportError: # Python 3.4 require contextlib2 for temporarily redirecting stderr and stdout if sys.version_info < (3, 5): - from contextlib2 import redirect_stdout, redirect_stderr + from contextlib2 import redirect_stdout else: - from contextlib import redirect_stdout, redirect_stderr + from contextlib import redirect_stdout # Detect whether IPython is installed to determine if the built-in "ipy" command should be included ipython_available = True @@ -121,6 +122,13 @@ except ImportError: # pragma: no cover HELP_CATEGORY = 'help_category' HELP_SUMMARY = 'help_summary' +INTERNAL_COMMAND_EPILOG = ("Notes:\n" + " This command is for internal use and is not intended to be called from the\n" + " command line.") + +# All command functions start with this +COMMAND_PREFIX = 'do_' + def categorize(func: Union[Callable, Iterable], category: str) -> None: """Categorize a function. @@ -137,19 +145,21 @@ def categorize(func: Union[Callable, Iterable], category: str) -> None: setattr(func, HELP_CATEGORY, category) -def parse_quoted_string(cmdline: str) -> List[str]: - """Parse a quoted string into a list of arguments.""" - if isinstance(cmdline, list): +def parse_quoted_string(string: str, preserve_quotes: bool) -> List[str]: + """ + Parse a quoted string into a list of arguments + :param string: the string being parsed + :param preserve_quotes: if True, then quotes will not be stripped + """ + if isinstance(string, list): # arguments are already a list, return the list we were passed - lexed_arglist = cmdline + lexed_arglist = string else: # Use shlex to split the command line into a list of arguments based on shell rules - lexed_arglist = shlex.split(cmdline, posix=False) - # strip off outer quotes for convenience - temp_arglist = [] - for arg in lexed_arglist: - temp_arglist.append(utils.strip_quotes(arg)) - lexed_arglist = temp_arglist + lexed_arglist = shlex.split(string, posix=False) + + if not preserve_quotes: + lexed_arglist = [utils.strip_quotes(arg) for arg in lexed_arglist] return lexed_arglist @@ -161,7 +171,7 @@ def with_category(category: str) -> Callable: return cat_decorator -def with_argument_list(func: Callable) -> Callable: +def with_argument_list(func: Callable, preserve_quotes: bool=False) -> Callable: """A decorator to alter the arguments passed to a do_* cmd2 method. Default passes a string of whatever the user typed. With this decorator, the decorated method will receive a list @@ -170,18 +180,19 @@ def with_argument_list(func: Callable) -> Callable: @functools.wraps(func) def cmd_wrapper(self, cmdline): - lexed_arglist = parse_quoted_string(cmdline) + lexed_arglist = parse_quoted_string(cmdline, preserve_quotes) return func(self, lexed_arglist) cmd_wrapper.__doc__ = func.__doc__ return cmd_wrapper -def with_argparser_and_unknown_args(argparser: argparse.ArgumentParser) -> Callable: +def with_argparser_and_unknown_args(argparser: argparse.ArgumentParser, preserve_quotes: bool=False) -> Callable: """A decorator to alter a cmd2 method to populate its ``args`` argument by parsing arguments with the given instance of argparse.ArgumentParser, but also returning unknown args as a list. - :param argparser: argparse.ArgumentParser - given instance of ArgumentParser + :param argparser: given instance of ArgumentParser + :param preserve_quotes: if True, then the arguments passed to arparse be maintain their quotes :return: function that gets passed parsed args and a list of unknown args """ import functools @@ -190,7 +201,7 @@ def with_argparser_and_unknown_args(argparser: argparse.ArgumentParser) -> Calla def arg_decorator(func: Callable): @functools.wraps(func) def cmd_wrapper(instance, cmdline): - lexed_arglist = parse_quoted_string(cmdline) + lexed_arglist = parse_quoted_string(cmdline, preserve_quotes) try: args, unknown = argparser.parse_known_args(lexed_arglist) except SystemExit: @@ -219,11 +230,12 @@ def with_argparser_and_unknown_args(argparser: argparse.ArgumentParser) -> Calla return arg_decorator -def with_argparser(argparser: argparse.ArgumentParser) -> Callable: +def with_argparser(argparser: argparse.ArgumentParser, preserve_quotes: bool=False) -> Callable: """A decorator to alter a cmd2 method to populate its ``args`` argument by parsing arguments with the given instance of argparse.ArgumentParser. - :param argparser: argparse.ArgumentParser - given instance of ArgumentParser + :param argparser: given instance of ArgumentParser + :param preserve_quotes: if True, then the arguments passed to arparse be maintain their quotes :return: function that gets passed parsed args """ import functools @@ -232,7 +244,7 @@ def with_argparser(argparser: argparse.ArgumentParser) -> Callable: def arg_decorator(func: Callable): @functools.wraps(func) def cmd_wrapper(instance, cmdline): - lexed_arglist = parse_quoted_string(cmdline) + lexed_arglist = parse_quoted_string(cmdline, preserve_quotes) try: args = argparser.parse_args(lexed_arglist) except SystemExit: @@ -306,7 +318,6 @@ class Cmd(cmd.Cmd): # Attributes used to configure the StatementParser, best not to change these at runtime multiline_commands = [] shortcuts = {'?': 'help', '!': 'shell', '@': 'load', '@@': '_relative_load'} - aliases = dict() terminators = [';'] # Attributes which are NOT dynamically settable at runtime @@ -317,7 +328,7 @@ class Cmd(cmd.Cmd): reserved_words = [] # Attributes which ARE dynamically settable at runtime - colors = (platform.system() != 'Windows') + colors = constants.COLORS_TERMINAL continuation_prompt = '> ' debug = False echo = False @@ -337,7 +348,7 @@ class Cmd(cmd.Cmd): # To make an attribute settable with the "do_set" command, add it to this ... # This starts out as a dictionary but gets converted to an OrderedDict sorted alphabetically by key - settable = {'colors': 'Colorized output (*nix only)', + settable = {'colors': 'Allow colorized output (valid values: Terminal, Always, Never)', 'continuation_prompt': 'On 2nd+ line of input', 'debug': 'Show full error stack on error', 'echo': 'Echo command issued into output', @@ -369,6 +380,9 @@ class Cmd(cmd.Cmd): except AttributeError: pass + # Override whether ansi codes should be stripped from the output since cmd2 has its own logic for doing this + colorama.init(strip=False) + # initialize plugin system # needs to be done before we call __init__(0) self._initialize_plugin_system() @@ -376,12 +390,20 @@ class Cmd(cmd.Cmd): # Call super class constructor super().__init__(completekey=completekey, stdin=stdin, stdout=stdout) + # Get rid of cmd's complete_help() functions so AutoCompleter will complete our help command + if getattr(cmd.Cmd, 'complete_help', None) is not None: + delattr(cmd.Cmd, 'complete_help') + # Commands to exclude from the help menu and tab completion self.hidden_commands = ['eof', 'eos', '_relative_load'] # Commands to exclude from the history command self.exclude_from_history = '''history edit eof eos'''.split() + # Command aliases and macros + self.aliases = dict() + self.macros = dict() + self._finalize_app_parameters() self.initial_stdout = sys.stdout @@ -389,7 +411,7 @@ class Cmd(cmd.Cmd): self.pystate = {} self.py_history = [] self.pyscript_name = 'app' - self.keywords = self.reserved_words + [fname[3:] for fname in dir(self) if fname.startswith('do_')] + self.keywords = self.reserved_words + self.get_all_commands() self.statement_parser = StatementParser( allow_redirection=self.allow_redirection, terminators=self.terminators, @@ -417,13 +439,13 @@ class Cmd(cmd.Cmd): self._STOP_AND_EXIT = True # cmd convention self._colorcodes = {'bold': {True: '\x1b[1m', False: '\x1b[22m'}, - 'cyan': {True: '\x1b[36m', False: '\x1b[39m'}, - 'blue': {True: '\x1b[34m', False: '\x1b[39m'}, - 'red': {True: '\x1b[31m', False: '\x1b[39m'}, - 'magenta': {True: '\x1b[35m', False: '\x1b[39m'}, - 'green': {True: '\x1b[32m', False: '\x1b[39m'}, - 'underline': {True: '\x1b[4m', False: '\x1b[24m'}, - 'yellow': {True: '\x1b[33m', False: '\x1b[39m'}} + 'cyan': {True: Fore.CYAN, False: Fore.RESET}, + 'blue': {True: Fore.BLUE, False: Fore.RESET}, + 'red': {True: Fore.RED, False: Fore.RESET}, + 'magenta': {True: Fore.MAGENTA, False: Fore.RESET}, + 'green': {True: Fore.GREEN, False: Fore.RESET}, + 'underline': {True: '\x1b[4m', False: Fore.RESET}, + 'yellow': {True: Fore.YELLOW, False: Fore.RESET}} # Used load command to store the current script dir as a LIFO queue to support _relative_load command self._script_dir = [] @@ -441,6 +463,7 @@ class Cmd(cmd.Cmd): self.broken_pipe_warning = '' # Check if history should persist + self.persistent_history_file = '' if persistent_history_file and rl_type != RlType.NONE: persistent_history_file = os.path.expanduser(persistent_history_file) read_err = False @@ -526,6 +549,11 @@ class Cmd(cmd.Cmd): # This determines if a non-zero exit code should be used when exiting the application self.exit_code = None + # This lock should be acquired before doing any asynchronous changes to the terminal to + # ensure the updates to the terminal don't interfere with the input being typed. It can be + # acquired any time there is a readline prompt on screen. + self.terminal_lock = threading.RLock() + # ----- Methods related to presenting output to the user ----- @property @@ -547,34 +575,53 @@ class Cmd(cmd.Cmd): # Make sure settable parameters are sorted alphabetically by key self.settable = collections.OrderedDict(sorted(self.settable.items(), key=lambda t: t[0])) - def poutput(self, msg: str, end: str='\n') -> None: - """Convenient shortcut for self.stdout.write(); by default adds newline to end if not already present. + def decolorized_write(self, fileobj: IO, msg: str) -> None: + """Write a string to a fileobject, stripping ANSI escape sequences if necessary - Also handles BrokenPipeError exceptions for when a commands's output has been piped to another process and - that process terminates before the cmd2 command is finished executing. + Honor the current colors setting, which requires us to check whether the + fileobject is a tty. + """ + if self.colors.lower() == constants.COLORS_NEVER.lower() or \ + (self.colors.lower() == constants.COLORS_TERMINAL.lower() and not fileobj.isatty()): + msg = utils.strip_ansi(msg) + fileobj.write(msg) - :param msg: message to print to current stdout - anything convertible to a str with '{}'.format() is OK - :param end: string appended after the end of the message if not already present, default a newline + def poutput(self, msg: Any, end: str='\n', color: str='') -> None: + """Smarter self.stdout.write(); color aware and adds newline of not present. + + Also handles BrokenPipeError exceptions for when a commands's output has + been piped to another process and that process terminates before the + cmd2 command is finished executing. + + :param msg: message to print to current stdout (anything convertible to a str with '{}'.format() is OK) + :param end: (optional) string appended after the end of the message if not already present, default a newline + :param color: (optional) color escape to output this message with """ if msg is not None and msg != '': try: msg_str = '{}'.format(msg) - self.stdout.write(msg_str) if not msg_str.endswith(end): - self.stdout.write(end) + msg_str += end + if color: + msg_str = color + msg_str + Fore.RESET + self.decolorized_write(self.stdout, msg_str) except BrokenPipeError: - # This occurs if a command's output is being piped to another process and that process closes before the - # command is finished. If you would like your application to print a warning message, then set the - # broken_pipe_warning attribute to the message you want printed. + # This occurs if a command's output is being piped to another + # process and that process closes before the command is + # finished. If you would like your application to print a + # warning message, then set the broken_pipe_warning attribute + # to the message you want printed. if self.broken_pipe_warning: sys.stderr.write(self.broken_pipe_warning) - def perror(self, err: Union[str, Exception], traceback_war: bool=True) -> None: + def perror(self, err: Union[str, Exception], traceback_war: bool=True, err_color: str=Fore.LIGHTRED_EX, + war_color: str=Fore.LIGHTYELLOW_EX) -> None: """ Print error message to sys.stderr and if debug is true, print an exception Traceback if one exists. :param err: an Exception or error message to print out :param traceback_war: (optional) if True, print a message to let user know they can enable debug - :return: + :param err_color: (optional) color escape to output error with + :param war_color: (optional) color escape to output warning with """ if self.debug: import traceback @@ -582,14 +629,15 @@ class Cmd(cmd.Cmd): if isinstance(err, Exception): err_msg = "EXCEPTION of type '{}' occurred with message: '{}'\n".format(type(err).__name__, err) - sys.stderr.write(self.colorize(err_msg, 'red')) else: - err_msg = self.colorize("ERROR: {}\n".format(err), 'red') - sys.stderr.write(err_msg) + err_msg = "ERROR: {}\n".format(err) + err_msg = err_color + err_msg + Fore.RESET + self.decolorized_write(sys.stderr, err_msg) if traceback_war: war = "To enable full traceback, run the following command: 'set debug true'\n" - sys.stderr.write(self.colorize(war, 'yellow')) + war = war_color + war + Fore.RESET + self.decolorized_write(sys.stderr, war) def pfeedback(self, msg: str) -> None: """For printing nonessential feedback. Can be silenced with `quiet`. @@ -598,7 +646,7 @@ class Cmd(cmd.Cmd): if self.feedback_to_output: self.poutput(msg) else: - sys.stderr.write("{}\n".format(msg)) + self.decolorized_write(sys.stderr, "{}\n".format(msg)) def ppaged(self, msg: str, end: str='\n', chop: bool=False) -> None: """Print output using a pager if it would go off screen and stdout isn't currently being redirected. @@ -606,7 +654,7 @@ class Cmd(cmd.Cmd): Never uses a pager inside of a script (Python or text) or when output is being redirected or piped or when stdout or stdin are not a fully functional terminal. - :param msg: message to print to current stdout - anything convertible to a str with '{}'.format() is OK + :param msg: message to print to current stdout (anything convertible to a str with '{}'.format() is OK) :param end: string appended after the end of the message if not already present, default a newline :param chop: True -> causes lines longer than the screen width to be chopped (truncated) rather than wrapped - truncated text is still accessible by scrolling with the right & left arrow keys @@ -634,6 +682,9 @@ class Cmd(cmd.Cmd): # Don't attempt to use a pager that can block if redirecting or running a script (either text or Python) # Also only attempt to use a pager if actually running in a real fully functional terminal if functional_terminal and not self.redirecting and not self._in_py and not self._script_dir: + if self.colors.lower() == constants.COLORS_NEVER.lower(): + msg_str = utils.strip_ansi(msg_str) + pager = self.pager if chop: pager = self.pager_chop @@ -658,7 +709,7 @@ class Cmd(cmd.Cmd): except BrokenPipeError: # This occurs if a command's output is being piped to another process and that process closes before the # command is finished. If you would like your application to print a warning message, then set the - # broken_pipe_warning attribute to the message you want printed. + # broken_pipe_warning attribute to the message you want printed.` if self.broken_pipe_warning: sys.stderr.write(self.broken_pipe_warning) @@ -669,7 +720,7 @@ class Cmd(cmd.Cmd): is running on Windows, will return ``val`` unchanged. ``color`` should be one of the supported strings (or styles): red/blue/green/cyan/magenta, bold, underline""" - if self.colors and (self.stdout == self.initial_stdout): + if self.colors.lower() != constants.COLORS_NEVER.lower() and (self.stdout == self.initial_stdout): return self._colorcodes[color][True] + val + self._colorcodes[color][False] return val @@ -897,14 +948,13 @@ class Cmd(cmd.Cmd): :param line: the current input line with leading whitespace removed :param begidx: the beginning index of the prefix text :param endidx: the ending index of the prefix text - :param flag_dict: dict - dictionary whose structure is the following: - keys - flags (ex: -c, --create) that result in tab completion for the next - argument in the command line - values - there are two types of values - 1. iterable list of strings to match against (dictionaries, lists, etc.) - 2. function that performs tab completion (ex: path_complete) - :param all_else: Collection or function - an optional parameter for tab completing any token that isn't preceded - by a flag in flag_dict + :param flag_dict: dictionary whose structure is the following: + keys - flags (ex: -c, --create) that result in tab completion for the next + argument in the command line + values - there are two types of values + 1. iterable list of strings to match against (dictionaries, lists, etc.) + 2. function that performs tab completion (ex: path_complete) + :param all_else: an optional parameter for tab completing any token that isn't preceded by a flag in flag_dict :return: a list of possible tab completions """ # Get all tokens through the one being completed @@ -940,14 +990,13 @@ class Cmd(cmd.Cmd): :param line: the current input line with leading whitespace removed :param begidx: the beginning index of the prefix text :param endidx: the ending index of the prefix text - :param index_dict: dict - dictionary whose structure is the following: - keys - 0-based token indexes into command line that determine which tokens - perform tab completion - values - there are two types of values - 1. iterable list of strings to match against (dictionaries, lists, etc.) - 2. function that performs tab completion (ex: path_complete) - :param all_else: Collection or function - an optional parameter for tab completing any token that isn't at an - index in index_dict + :param index_dict: dictionary whose structure is the following: + keys - 0-based token indexes into command line that determine which tokens + perform tab completion + values - there are two types of values + 1. iterable list of strings to match against (dictionaries, lists, etc.) + 2. function that performs tab completion (ex: path_complete) + :param all_else: an optional parameter for tab completing any token that isn't at an index in index_dict :return: a list of possible tab completions """ # Get all tokens through the one being completed @@ -1093,8 +1142,8 @@ class Cmd(cmd.Cmd): self.allow_appended_space = False self.allow_closing_quote = False - # Sort the matches before any trailing slashes are added - matches = utils.alphabetical_sort(matches) + # Sort the matches alphabetically before any trailing slashes are added + matches.sort(key=utils.norm_fold) self.matches_sorted = True # Build display_matches and add a slash to directories @@ -1433,18 +1482,21 @@ class Cmd(cmd.Cmd): # Check if a valid command was entered if command in self.get_all_commands(): # Get the completer function for this command - try: - compfunc = getattr(self, 'complete_' + command) - except AttributeError: + compfunc = getattr(self, 'complete_' + command, None) + + if compfunc is None: # There's no completer function, next see if the command uses argparser - try: - cmd_func = getattr(self, 'do_' + command) - argparser = getattr(cmd_func, 'argparser') - # Command uses argparser, switch to the default argparse completer - compfunc = functools.partial(self._autocomplete_default, argparser=argparser) - except AttributeError: + func = self.cmd_func(command) + if func and hasattr(func, 'argparser'): + compfunc = functools.partial(self._autocomplete_default, + argparser=getattr(func, 'argparser')) + else: compfunc = self.completedefault + # Check if a macro was entered + elif command in self.macros: + compfunc = self.path_complete + # A valid command was not entered else: # Check if this command should be run as a shell command @@ -1512,11 +1564,9 @@ class Cmd(cmd.Cmd): [shortcut_to_restore + match for match in self.completion_matches] else: - # Complete token against aliases and command names - alias_names = set(self.aliases.keys()) - visible_commands = set(self.get_visible_commands()) - strs_to_match = list(alias_names | visible_commands) - self.completion_matches = self.basic_complete(text, line, begidx, endidx, strs_to_match) + # Complete token against anything a user can run + self.completion_matches = self.basic_complete(text, line, begidx, endidx, + self.get_commands_aliases_and_macros_for_completion()) # Handle single result if len(self.completion_matches) == 1: @@ -1534,8 +1584,8 @@ class Cmd(cmd.Cmd): # Sort matches alphabetically if they haven't already been sorted if not self.matches_sorted: - self.completion_matches = utils.alphabetical_sort(self.completion_matches) - self.display_matches = utils.alphabetical_sort(self.display_matches) + self.completion_matches.sort(key=utils.norm_fold) + self.display_matches.sort(key=utils.norm_fold) self.matches_sorted = True try: @@ -1556,8 +1606,8 @@ class Cmd(cmd.Cmd): def get_all_commands(self) -> List[str]: """Returns a list of all commands.""" - return [name[3:] for name in self.get_names() - if name.startswith('do_') and isinstance(getattr(self, name), Callable)] + return [name[len(COMMAND_PREFIX):] for name in self.get_names() + if name.startswith(COMMAND_PREFIX) and callable(getattr(self, name))] def get_visible_commands(self) -> List[str]: """Returns a list of commands that have not been hidden.""" @@ -1570,53 +1620,25 @@ class Cmd(cmd.Cmd): return commands - def get_help_topics(self) -> List[str]: - """ Returns a list of help topics """ - return [name[5:] for name in self.get_names() - if name.startswith('help_') and isinstance(getattr(self, name), Callable)] - - def complete_help(self, text: str, line: str, begidx: int, endidx: int) -> List[str]: - """ - Override of parent class method to handle tab completing subcommands and not showing hidden commands - Returns a list of possible tab completions - """ - - # The command is the token at index 1 in the command line - cmd_index = 1 - - # The subcommand is the token at index 2 in the command line - subcmd_index = 2 - - # Get all tokens through the one being completed - tokens, _ = self.tokens_for_completion(line, begidx, endidx) - if not tokens: - return [] - - matches = [] + def get_alias_names(self) -> List[str]: + """Return a list of alias names.""" + return list(self.aliases) - # Get the index of the token being completed - index = len(tokens) - 1 - - # Check if we are completing a command or help topic - if index == cmd_index: + def get_macro_names(self) -> List[str]: + """Return a list of macro names.""" + return list(self.macros) - # Complete token against topics and visible commands - topics = set(self.get_help_topics()) - visible_commands = set(self.get_visible_commands()) - strs_to_match = list(topics | visible_commands) - matches = self.basic_complete(text, line, begidx, endidx, strs_to_match) - - # check if the command uses argparser - elif index >= subcmd_index: - try: - cmd_func = getattr(self, 'do_' + tokens[cmd_index]) - parser = getattr(cmd_func, 'argparser') - completer = AutoCompleter(parser) - matches = completer.complete_command_help(tokens[1:], text, line, begidx, endidx) - except AttributeError: - pass + def get_commands_aliases_and_macros_for_completion(self) -> List[str]: + """Return a list of visible commands, aliases, and macros for tab completion""" + visible_commands = set(self.get_visible_commands()) + alias_names = set(self.get_alias_names()) + macro_names = set(self.get_macro_names()) + return list(visible_commands | alias_names | macro_names) - return matches + def get_help_topics(self) -> List[str]: + """ Returns a list of help topics """ + return [name[5:] for name in self.get_names() + if name.startswith('help_') and callable(getattr(self, name))] # noinspection PyUnusedLocal def sigint_handler(self, signum: int, frame) -> None: @@ -1637,12 +1659,6 @@ class Cmd(cmd.Cmd): # Re-raise a KeyboardInterrupt so other parts of the code can catch it raise KeyboardInterrupt("Got a keyboard interrupt") - def preloop(self) -> None: - """Hook method executed once when the cmdloop() method is called.""" - import signal - # Register a default SIGINT signal handler for Ctrl+C - signal.signal(signal.SIGINT, self.sigint_handler) - def precmd(self, statement: Statement) -> Statement: """Hook method executed just before the command is processed by ``onecmd()`` and after adding it to the history. @@ -1651,58 +1667,6 @@ class Cmd(cmd.Cmd): """ return statement - # ----- Methods which are cmd2-specific lifecycle hooks which are not present in cmd ----- - - # noinspection PyMethodMayBeStatic - def preparse(self, raw: str) -> str: - """Hook method executed before user input is parsed. - - WARNING: If it's a multiline command, `preparse()` may not get all the - user input. _complete_statement() really does two things: a) parse the - user input, and b) accept more input in case it's a multiline command - the passed string doesn't have a terminator. `preparse()` is currently - called before we know whether it's a multiline command, and before we - know whether the user input includes a termination character. - - If you want a reliable pre parsing hook method, register a postparsing - hook, modify the user input, and then reparse it. - - :param raw: raw command line input :return: potentially modified raw command line input - :return: a potentially modified version of the raw input string - """ - return raw - - # noinspection PyMethodMayBeStatic - def postparsing_precmd(self, statement: Statement) -> Tuple[bool, Statement]: - """This runs after parsing the command-line, but before anything else; even before adding cmd to history. - - NOTE: This runs before precmd() and prior to any potential output redirection or piping. - - If you wish to fatally fail this command and exit the application entirely, set stop = True. - - If you wish to just fail this command you can do so by raising an exception: - - - raise EmptyStatement - will silently fail and do nothing - - raise <AnyOtherException> - will fail and print an error message - - :param statement: - the parsed command-line statement as a Statement object - :return: (bool, statement) - (stop, statement) containing a potentially modified version of the statement object - """ - stop = False - return stop, statement - - # noinspection PyMethodMayBeStatic - def postparsing_postcmd(self, stop: bool) -> bool: - """This runs after everything else, including after postcmd(). - - It even runs when an empty line is entered. Thus, if you need to do something like update the prompt due - to notifications from a background thread, then this is the method you want to override to do it. - - :param stop: bool - True implies the entire application should exit. - :return: bool - True implies the entire application should exit. - """ - return stop - def parseline(self, line: str) -> Tuple[str, str, str]: """Parse the line into a command name and a string containing the arguments. @@ -1742,9 +1706,6 @@ class Cmd(cmd.Cmd): data = func(data) if data.stop: break - # postparsing_precmd is deprecated - if not data.stop: - (data.stop, data.statement) = self.postparsing_precmd(data.statement) # unpack the data object statement = data.statement stop = data.stop @@ -1809,9 +1770,7 @@ class Cmd(cmd.Cmd): data = func(data) # retrieve the final value of stop, ignoring any # modifications to the statement - stop = data.stop - # postparsing_postcmd is deprecated - return self.postparsing_postcmd(stop) + return data.stop except Exception as ex: self.perror(ex) @@ -1865,9 +1824,6 @@ class Cmd(cmd.Cmd): pipe runs out. We can't refactor it because we need to retain backwards compatibility with the standard library version of cmd. """ - # preparse() is deprecated, use self.register_postparsing_hook() instead - line = self.preparse(line) - while True: try: statement = self.statement_parser.parse(line) @@ -2014,49 +1970,91 @@ class Cmd(cmd.Cmd): self.redirecting = False - def _func_named(self, arg: str) -> str: - """Gets the method name associated with a given command. + def cmd_func(self, command: str) -> Optional[Callable]: + """ + Get the function for a command + :param command: the name of the command + """ + func_name = self.cmd_func_name(command) + if func_name: + return getattr(self, func_name) + + def cmd_func_name(self, command: str) -> str: + """Get the method name associated with a given command. - :param arg: command to look up method name which implements it + :param command: command to look up method name which implements it :return: method name which implements the given command """ - result = None - target = 'do_' + arg - if target in dir(self): - result = target - return result + target = COMMAND_PREFIX + command + return target if callable(getattr(self, target, None)) else '' - def onecmd(self, statement: Union[Statement, str]) -> Optional[bool]: + def onecmd(self, statement: Union[Statement, str]) -> bool: """ This executes the actual do_* method for a command. If the command provided doesn't exist, then it executes _default() instead. - :param statement: Command - intended to be a Statement instance parsed command from the input stream, - alternative acceptance of a str is present only for backward compatibility with cmd + :param statement: intended to be a Statement instance parsed command from the input stream, alternative + acceptance of a str is present only for backward compatibility with cmd :return: a flag indicating whether the interpretation of commands should stop """ # For backwards compatibility with cmd, allow a str to be passed in if not isinstance(statement, Statement): statement = self._complete_statement(statement) - funcname = self._func_named(statement.command) - if not funcname: - self.default(statement) - return + # Check if this is a macro + if statement.command in self.macros: + stop = self._run_macro(statement) + else: + func = self.cmd_func(statement.command) + if func: + stop = func(statement) - # Since we have a valid command store it in the history - if statement.command not in self.exclude_from_history: - self.history.append(statement.raw) + # Since we have a valid command store it in the history + if statement.command not in self.exclude_from_history: + self.history.append(statement.raw) - try: - func = getattr(self, funcname) - except AttributeError: - self.default(statement) - return + else: + self.default(statement) + stop = False - stop = func(statement) return stop + def _run_macro(self, statement: Statement) -> bool: + """ + Resolve a macro and run the resulting string + + :param statement: the parsed statement from the command line + :return: a flag indicating whether the interpretation of commands should stop + """ + if statement.command not in self.macros.keys(): + raise KeyError('{} is not a macro'.format(statement.command)) + + macro = self.macros[statement.command] + + # For macros, every argument must be provided and there can be no extra arguments. + if len(statement.arg_list) != macro.required_arg_count: + self.perror("The macro '{}' expects {} argument(s)".format(statement.command, macro.required_arg_count), + traceback_war=False) + return False + + # Resolve the arguments in reverse + resolved = macro.value + reverse_arg_list = sorted(macro.arg_list, key=lambda ma: ma.start_index, reverse=True) + + for arg in reverse_arg_list: + if arg.is_escaped: + to_replace = '{{' + arg.number_str + '}}' + replacement = '{' + arg.number_str + '}' + else: + to_replace = '{' + arg.number_str + '}' + replacement = statement.argv[int(arg.number_str)] + + parts = resolved.rsplit(to_replace, maxsplit=1) + resolved = parts[0] + replacement + parts[1] + + # Run the resolved command + return self.onecmd_plus_hooks(resolved) + def default(self, statement: Statement) -> None: """Executed when the command given isn't a recognized command implemented by a do_* method. @@ -2072,34 +2070,6 @@ class Cmd(cmd.Cmd): # Print out a message stating this is an unknown command self.poutput('*** Unknown syntax: {}\n'.format(arg)) - @staticmethod - def _surround_ansi_escapes(prompt: str, start: str="\x01", end: str="\x02") -> str: - """Overcome bug in GNU Readline in relation to calculation of prompt length in presence of ANSI escape codes. - - :param prompt: original prompt - :param start: start code to tell GNU Readline about beginning of invisible characters - :param end: end code to tell GNU Readline about end of invisible characters - :return: prompt safe to pass to GNU Readline - """ - # Windows terminals don't use ANSI escape codes and Windows readline isn't based on GNU Readline - if sys.platform == "win32": - return prompt - - escaped = False - result = "" - - for c in prompt: - if c == "\x1b" and not escaped: - result += start + c - escaped = True - elif c.isalpha() and escaped: - result += c + end - escaped = False - else: - result += c - - return result - def pseudo_raw_input(self, prompt: str) -> str: """Began life as a copy of cmd's cmdloop; like raw_input but @@ -2108,23 +2078,37 @@ class Cmd(cmd.Cmd): to decide whether to print the prompt and the input """ - # Deal with the vagaries of readline and ANSI escape codes - safe_prompt = self._surround_ansi_escapes(prompt) + # Temporarily save over self.prompt to reflect what will be on screen + orig_prompt = self.prompt + self.prompt = prompt if self.use_rawinput: try: if sys.stdin.isatty(): + # Wrap in try since terminal_lock may not be locked when this function is called from unit tests + try: + # A prompt is about to be drawn. Allow asynchronous changes to the terminal. + self.terminal_lock.release() + except RuntimeError: + pass + + # Deal with the vagaries of readline and ANSI escape codes + safe_prompt = rl_make_safe_prompt(prompt) line = input(safe_prompt) else: line = input() if self.echo: - sys.stdout.write('{}{}\n'.format(safe_prompt, line)) + sys.stdout.write('{}{}\n'.format(self.prompt, line)) except EOFError: line = 'eof' + finally: + if sys.stdin.isatty(): + # The prompt is gone. Do not allow asynchronous changes to the terminal. + self.terminal_lock.acquire() else: if self.stdin.isatty(): # on a tty, print the prompt first, then read the line - self.poutput(safe_prompt, end='') + self.poutput(self.prompt, end='') self.stdout.flush() line = self.stdin.readline() if len(line) == 0: @@ -2137,9 +2121,13 @@ class Cmd(cmd.Cmd): if len(line): # we read something, output the prompt and the something if self.echo: - self.poutput('{}{}'.format(safe_prompt, line)) + self.poutput('{}{}'.format(self.prompt, line)) else: line = 'eof' + + # Restore prompt + self.prompt = orig_prompt + return line.strip() def _cmdloop(self) -> bool: @@ -2219,147 +2207,423 @@ class Cmd(cmd.Cmd): return stop - def do_alias(self, statement: Statement) -> None: - """Define or display aliases + # ----- Alias subcommand functions ----- -Usage: Usage: alias [name] | [<name> <value>] - Where: - name - name of the alias being looked up, added, or replaced - value - what the alias will be resolved to (if adding or replacing) - this can contain spaces and does not need to be quoted + def alias_create(self, args: argparse.Namespace): + """ Creates or overwrites an alias """ - Without arguments, 'alias' prints a list of all aliases in a reusable form which - can be outputted to a startup_script to preserve aliases across sessions. + # Validate the alias name + args.name = utils.strip_quotes(args.name) + valid, errmsg = self.statement_parser.is_valid_command(args.name) + if not valid: + errmsg = "Invalid alias name: {}".format(errmsg) + self.perror(errmsg, traceback_war=False) + return - With one argument, 'alias' shows the value of the specified alias. - Example: alias ls (Prints the value of the alias called 'ls' if it exists) + if args.name in self.macros: + errmsg = "Alias cannot have the same name as a macro" + self.perror(errmsg, traceback_war=False) + return - With two or more arguments, 'alias' creates or replaces an alias. + utils.unquote_redirection_tokens(args.command_args) - Example: alias ls !ls -lF + # Build the alias value string + value = args.command + if args.command_args: + value += ' ' + ' '.join(args.command_args) - If you want to use redirection or pipes in the alias, then quote them to prevent - the alias command itself from being redirected + # Set the alias + result = "overwritten" if args.name in self.aliases else "created" + self.aliases[args.name] = value + self.poutput("Alias '{}' {}".format(args.name, result)) - Examples: - alias save_results print_results ">" out.txt - alias save_results print_results '>' out.txt -""" - # Get alias arguments as a list with quotes preserved - alias_arg_list = statement.arg_list + def alias_delete(self, args: argparse.Namespace): + """ Deletes aliases """ + if args.all: + self.aliases.clear() + self.poutput("All aliases deleted") + elif not args.name: + self.do_help('alias delete') + else: + # Get rid of duplicates and strip quotes since the argparse decorator for do_alias() preserves them + aliases_to_delete = [utils.strip_quotes(cur_name) for cur_name in utils.remove_duplicates(args.name)] + + for cur_name in aliases_to_delete: + if cur_name in self.aliases: + del self.aliases[cur_name] + self.poutput("Alias '{}' deleted".format(cur_name)) + else: + self.perror("Alias '{}' does not exist".format(cur_name), traceback_war=False) + + def alias_list(self, args: argparse.Namespace): + """ Lists some or all aliases """ + if args.name: + # Get rid of duplicates and strip quotes since the argparse decorator for do_alias() preserves them + names_to_view = [utils.strip_quotes(cur_name) for cur_name in utils.remove_duplicates(args.name)] + + for cur_name in names_to_view: + if cur_name in self.aliases: + self.poutput("alias create {} {}".format(cur_name, self.aliases[cur_name])) + else: + self.perror("Alias '{}' not found".format(cur_name), traceback_war=False) + else: + sorted_aliases = utils.alphabetical_sort(self.aliases) + for cur_alias in sorted_aliases: + self.poutput("alias create {} {}".format(cur_alias, self.aliases[cur_alias])) + + # Top-level parser for alias + alias_description = ("Manage aliases\n" + "\n" + "An alias is a command that enables replacement of a word by another string.") + alias_epilog = ("See also:\n" + " macro") + alias_parser = ACArgumentParser(description=alias_description, epilog=alias_epilog, prog='alias') + + # Add subcommands to alias + alias_subparsers = alias_parser.add_subparsers() + + # alias -> create + alias_create_help = "create or overwrite an alias" + alias_create_description = "Create or overwrite an alias" + + alias_create_epilog = ("Notes:\n" + " If you want to use redirection or pipes in the alias, then quote them to\n" + " prevent the 'alias create' command from being redirected.\n" + "\n" + " Since aliases are resolved during parsing, tab completion will function as it\n" + " would for the actual command the alias resolves to.\n" + "\n" + "Examples:\n" + " alias ls !ls -lF\n" + " alias create show_log !cat \"log file.txt\"\n" + " alias create save_results print_results \">\" out.txt\n") + + alias_create_parser = alias_subparsers.add_parser('create', help=alias_create_help, + description=alias_create_description, + epilog=alias_create_epilog) + alias_create_parser.add_argument('name', help='name of this alias') + setattr(alias_create_parser.add_argument('command', help='what the alias resolves to'), + ACTION_ARG_CHOICES, get_commands_aliases_and_macros_for_completion) + setattr(alias_create_parser.add_argument('command_args', nargs=argparse.REMAINDER, + help='arguments to pass to command'), + ACTION_ARG_CHOICES, ('path_complete',)) + alias_create_parser.set_defaults(func=alias_create) + + # alias -> delete + alias_delete_help = "delete aliases" + alias_delete_description = "Delete specified aliases or all aliases if --all is used" + alias_delete_parser = alias_subparsers.add_parser('delete', help=alias_delete_help, + description=alias_delete_description) + setattr(alias_delete_parser.add_argument('name', nargs='*', help='alias to delete'), + ACTION_ARG_CHOICES, get_alias_names) + alias_delete_parser.add_argument('-a', '--all', action='store_true', help="delete all aliases") + alias_delete_parser.set_defaults(func=alias_delete) + + # alias -> list + alias_list_help = "list aliases" + alias_list_description = ("List specified aliases in a reusable form that can be saved to a startup script\n" + "to preserve aliases across sessions\n" + "\n" + "Without arguments, all aliases will be listed.") + + alias_list_parser = alias_subparsers.add_parser('list', help=alias_list_help, + description=alias_list_description) + setattr(alias_list_parser.add_argument('name', nargs="*", help='alias to list'), + ACTION_ARG_CHOICES, get_alias_names) + alias_list_parser.set_defaults(func=alias_list) + + # Preserve quotes since we are passing strings to other commands + @with_argparser(alias_parser, preserve_quotes=True) + def do_alias(self, args: argparse.Namespace): + """Manage aliases""" + func = getattr(args, 'func', None) + if func is not None: + # Call whatever subcommand function was selected + func(self, args) + else: + # No subcommand was provided, so call help + self.do_help('alias') + + # ----- Macro subcommand functions ----- + + def macro_create(self, args: argparse.Namespace): + """ Creates or overwrites a macro """ - # If no args were given, then print a list of current aliases - if not alias_arg_list: - for cur_alias in self.aliases: - self.poutput("alias {} {}".format(cur_alias, self.aliases[cur_alias])) + # Validate the macro name + args.name = utils.strip_quotes(args.name) + valid, errmsg = self.statement_parser.is_valid_command(args.name) + if not valid: + errmsg = "Invalid macro name: {}".format(errmsg) + self.perror(errmsg, traceback_war=False) return - # Get the alias name - name = alias_arg_list[0] + if args.name in self.get_all_commands(): + errmsg = "Macro cannot have the same name as a command" + self.perror(errmsg, traceback_war=False) + return - # The user is looking up an alias - if len(alias_arg_list) == 1: - if name in self.aliases: - self.poutput("alias {} {}".format(name, self.aliases[name])) - else: - self.perror("Alias {!r} not found".format(name), traceback_war=False) + if args.name in self.aliases: + errmsg = "Macro cannot have the same name as an alias" + self.perror(errmsg, traceback_war=False) + return - # The user is creating an alias - else: - # Unquote redirection and pipes - index = 1 - while index < len(alias_arg_list): - unquoted_arg = utils.strip_quotes(alias_arg_list[index]) - if unquoted_arg in constants.REDIRECTION_TOKENS: - alias_arg_list[index] = unquoted_arg - index += 1 - - # Build the alias value string - value = ' '.join(alias_arg_list[1:]) - - # Validate the alias to ensure it doesn't include weird characters - # like terminators, output redirection, or whitespace - valid, invalidchars = self.statement_parser.is_valid_command(name) - if valid: - # Set the alias - self.aliases[name] = value - self.poutput("Alias {!r} created".format(name)) - else: - errmsg = "Aliases can not contain: {}".format(invalidchars) - self.perror(errmsg, traceback_war=False) + utils.unquote_redirection_tokens(args.command_args) - def complete_alias(self, text: str, line: str, begidx: int, endidx: int) -> List[str]: - """ Tab completion for alias """ - alias_names = set(self.aliases.keys()) - visible_commands = set(self.get_visible_commands()) + # Build the macro value string + value = args.command + if args.command_args: + value += ' ' + ' '.join(args.command_args) - index_dict = \ - { - 1: alias_names, - 2: list(alias_names | visible_commands) - } - return self.index_based_complete(text, line, begidx, endidx, index_dict, self.path_complete) + # Find all normal arguments + arg_list = [] + normal_matches = re.finditer(MacroArg.macro_normal_arg_pattern, value) + max_arg_num = 0 + arg_nums = set() - @with_argument_list - def do_unalias(self, arglist: List[str]) -> None: - """Unsets aliases + while True: + try: + cur_match = normal_matches.__next__() -Usage: Usage: unalias [-a] name [name ...] - Where: - name - name of the alias being unset + # Get the number string between the braces + cur_num_str = (re.findall(MacroArg.digit_pattern, cur_match.group())[0]) + cur_num = int(cur_num_str) + if cur_num < 1: + self.perror("Argument numbers must be greater than 0", traceback_war=False) + return - Options: - -a remove all alias definitions -""" - if not arglist: - self.do_help(['unalias']) + arg_nums.add(cur_num) + if cur_num > max_arg_num: + max_arg_num = cur_num - if '-a' in arglist: - self.aliases.clear() - self.poutput("All aliases cleared") + arg_list.append(MacroArg(start_index=cur_match.start(), number_str=cur_num_str, is_escaped=False)) + + except StopIteration: + break + + # Make sure the argument numbers are continuous + if len(arg_nums) != max_arg_num: + self.perror("Not all numbers between 1 and {} are present " + "in the argument placeholders".format(max_arg_num), traceback_war=False) + return + + # Find all escaped arguments + escaped_matches = re.finditer(MacroArg.macro_escaped_arg_pattern, value) + + while True: + try: + cur_match = escaped_matches.__next__() + # Get the number string between the braces + cur_num_str = re.findall(MacroArg.digit_pattern, cur_match.group())[0] + + arg_list.append(MacroArg(start_index=cur_match.start(), number_str=cur_num_str, is_escaped=True)) + except StopIteration: + break + + # Set the macro + result = "overwritten" if args.name in self.macros else "created" + self.macros[args.name] = Macro(name=args.name, value=value, required_arg_count=max_arg_num, arg_list=arg_list) + self.poutput("Macro '{}' {}".format(args.name, result)) + + def macro_delete(self, args: argparse.Namespace): + """ Deletes macros """ + if args.all: + self.macros.clear() + self.poutput("All macros deleted") + elif not args.name: + self.do_help('macro delete') else: - # Get rid of duplicates - arglist = utils.remove_duplicates(arglist) + # Get rid of duplicates and strip quotes since the argparse decorator for do_macro() preserves them + macros_to_delete = [utils.strip_quotes(cur_name) for cur_name in utils.remove_duplicates(args.name)] + + for cur_name in macros_to_delete: + if cur_name in self.macros: + del self.macros[cur_name] + self.poutput("Macro '{}' deleted".format(cur_name)) + else: + self.perror("Macro '{}' does not exist".format(cur_name), traceback_war=False) + + def macro_list(self, args: argparse.Namespace): + """ Lists some or all macros """ + if args.name: + # Get rid of duplicates and strip quotes since the argparse decorator for do_macro() preserves them + names_to_view = [utils.strip_quotes(cur_name) for cur_name in utils.remove_duplicates(args.name)] - for cur_arg in arglist: - if cur_arg in self.aliases: - del self.aliases[cur_arg] - self.poutput("Alias {!r} cleared".format(cur_arg)) + for cur_name in names_to_view: + if cur_name in self.macros: + self.poutput("macro create {} {}".format(cur_name, self.macros[cur_name].value)) else: - self.perror("Alias {!r} does not exist".format(cur_arg), traceback_war=False) - - def complete_unalias(self, text: str, line: str, begidx: int, endidx: int) -> List[str]: - """ Tab completion for unalias """ - return self.basic_complete(text, line, begidx, endidx, self.aliases) - - @with_argument_list - def do_help(self, arglist: List[str]) -> None: - """ List available commands with "help" or detailed help with "help cmd" """ - if not arglist or (len(arglist) == 1 and arglist[0] in ('--verbose', '-v')): - verbose = len(arglist) == 1 and arglist[0] in ('--verbose', '-v') - self._help_menu(verbose) + self.perror("Macro '{}' not found".format(cur_name), traceback_war=False) + else: + sorted_macros = utils.alphabetical_sort(self.macros) + for cur_macro in sorted_macros: + self.poutput("macro create {} {}".format(cur_macro, self.macros[cur_macro].value)) + + # Top-level parser for macro + macro_description = ("Manage macros\n" + "\n" + "A macro is similar to an alias, but it can take arguments when called.") + macro_epilog = ("See also:\n" + " alias") + macro_parser = ACArgumentParser(description=macro_description, epilog=macro_epilog, prog='macro') + + # Add subcommands to macro + macro_subparsers = macro_parser.add_subparsers() + + # macro -> create + macro_create_help = "create or overwrite a macro" + macro_create_description = "Create or overwrite a macro" + + macro_create_epilog = ("A macro is similar to an alias, but it can take arguments when called.\n" + "Arguments are expressed when creating a macro using {#} notation where {1}\n" + "means the first argument.\n" + "\n" + "The following creates a macro called my_macro that expects two arguments:\n" + "\n" + " macro create my_macro make_dinner -meat {1} -veggie {2}\n" + "\n" + "When the macro is called, the provided arguments are resolved and the assembled\n" + "command is run. For example:\n" + "\n" + " my_macro beef broccoli ---> make_dinner -meat beef -veggie broccoli\n" + "\n" + "Notes:\n" + " To use the literal string {1} in your command, escape it this way: {{1}}.\n" + "\n" + " An argument number can be repeated in a macro. In the following example the\n" + " first argument will populate both {1} instances.\n" + "\n" + " macro create ft file_taxes -p {1} -q {2} -r {1}\n" + "\n" + " To quote an argument in the resolved command, quote it during creation.\n" + "\n" + " macro create backup !cp \"{1}\" \"{1}.orig\"\n" + "\n" + " Be careful! Since macros can resolve into commands, aliases, and macros,\n" + " it is possible to create a macro that results in infinite recursion.\n" + "\n" + " If you want to use redirection or pipes in the macro, then quote them as in\n" + " this example to prevent the 'macro create' command from being redirected.\n" + "\n" + " macro create show_results print_results -type {1} \"|\" less\n" + "\n" + " Because macros do not resolve until after parsing (hitting Enter), tab\n" + " completion will only complete paths.") + + macro_create_parser = macro_subparsers.add_parser('create', help=macro_create_help, + description=macro_create_description, + epilog=macro_create_epilog) + macro_create_parser.add_argument('name', help='name of this macro') + setattr(macro_create_parser.add_argument('command', help='what the macro resolves to'), + ACTION_ARG_CHOICES, get_commands_aliases_and_macros_for_completion) + setattr(macro_create_parser.add_argument('command_args', nargs=argparse.REMAINDER, + help='arguments to pass to command'), + ACTION_ARG_CHOICES, ('path_complete',)) + macro_create_parser.set_defaults(func=macro_create) + + # macro -> delete + macro_delete_help = "delete macros" + macro_delete_description = "Delete specified macros or all macros if --all is used" + macro_delete_parser = macro_subparsers.add_parser('delete', help=macro_delete_help, + description=macro_delete_description) + setattr(macro_delete_parser.add_argument('name', nargs='*', help='macro to delete'), + ACTION_ARG_CHOICES, get_macro_names) + macro_delete_parser.add_argument('-a', '--all', action='store_true', help="delete all macros") + macro_delete_parser.set_defaults(func=macro_delete) + + # macro -> list + macro_list_help = "list macros" + macro_list_description = ("List specified macros in a reusable form that can be saved to a startup script\n" + "to preserve macros across sessions\n" + "\n" + "Without arguments, all macros will be listed.") + + macro_list_parser = macro_subparsers.add_parser('list', help=macro_list_help, description=macro_list_description) + setattr(macro_list_parser.add_argument('name', nargs="*", help='macro to list'), + ACTION_ARG_CHOICES, get_macro_names) + macro_list_parser.set_defaults(func=macro_list) + + # Preserve quotes since we are passing strings to other commands + @with_argparser(macro_parser, preserve_quotes=True) + def do_macro(self, args: argparse.Namespace): + """Manage macros""" + func = getattr(args, 'func', None) + if func is not None: + # Call whatever subcommand function was selected + func(self, args) + else: + # No subcommand was provided, so call help + self.do_help('macro') + + def complete_help_command(self, text: str, line: str, begidx: int, endidx: int) -> List[str]: + """Completes the command argument of help""" + + # Complete token against topics and visible commands + topics = set(self.get_help_topics()) + visible_commands = set(self.get_visible_commands()) + strs_to_match = list(topics | visible_commands) + return self.basic_complete(text, line, begidx, endidx, strs_to_match) + + def complete_help_subcommand(self, text: str, line: str, begidx: int, endidx: int) -> List[str]: + """Completes the subcommand argument of help""" + + # Get all tokens through the one being completed + tokens, _ = self.tokens_for_completion(line, begidx, endidx) + + if not tokens: + return [] + + # Must have at least 3 args for 'help command subcommand' + if len(tokens) < 3: + return [] + + # Find where the command is by skipping past any flags + cmd_index = 1 + for cur_token in tokens[cmd_index:]: + if not cur_token.startswith('-'): + break + cmd_index += 1 + + if cmd_index >= len(tokens): + return [] + + command = tokens[cmd_index] + matches = [] + + # Check if this is a command with an argparse function + func = self.cmd_func(command) + if func and hasattr(func, 'argparser'): + completer = AutoCompleter(getattr(func, 'argparser'), cmd2_app=self) + matches = completer.complete_command_help(tokens[cmd_index:], text, line, begidx, endidx) + + return matches + + help_parser = ACArgumentParser() + + setattr(help_parser.add_argument('command', help="command to retrieve help for", nargs="?"), + ACTION_ARG_CHOICES, ('complete_help_command',)) + setattr(help_parser.add_argument('subcommand', help="subcommand to retrieve help for", + nargs=argparse.REMAINDER), + ACTION_ARG_CHOICES, ('complete_help_subcommand',)) + help_parser.add_argument('-v', '--verbose', action='store_true', + help="print a list of all commands with descriptions of each") + + @with_argparser(help_parser) + def do_help(self, args: argparse.Namespace) -> None: + """List available commands or provide detailed help for a specific command""" + if not args.command or args.verbose: + self._help_menu(args.verbose) + else: # Getting help for a specific command - funcname = self._func_named(arglist[0]) - if funcname: - # Check to see if this function was decorated with an argparse ArgumentParser - func = getattr(self, funcname) - if hasattr(func, 'argparser'): - # Function has an argparser, so get help based on all the arguments in case there are sub-commands - new_arglist = arglist[1:] - new_arglist.append('-h') - - # Temporarily redirect all argparse output to both sys.stdout and sys.stderr to self.stdout - with redirect_stdout(self.stdout): - with redirect_stderr(self.stdout): - func(new_arglist) - else: - # No special behavior needed, delegate to cmd base class do_help() - cmd.Cmd.do_help(self, funcname[3:]) + func = self.cmd_func(args.command) + if func and hasattr(func, 'argparser'): + completer = AutoCompleter(getattr(func, 'argparser'), cmd2_app=self) + tokens = [args.command] + args.subcommand + self.poutput(completer.format_help(tokens)) else: - # This could be a help topic - cmd.Cmd.do_help(self, arglist[0]) + # No special behavior needed, delegate to cmd base class do_help() + super().do_help(args.command) def _help_menu(self, verbose: bool=False) -> None: """Show a list of commands which help can be displayed for. @@ -2375,11 +2639,12 @@ Usage: Usage: unalias [-a] name [name ...] cmds_cats = {} for command in visible_commands: - if command in help_topics or getattr(self, self._func_named(command)).__doc__: + func = self.cmd_func(command) + if command in help_topics or func.__doc__: if command in help_topics: help_topics.remove(command) - if hasattr(getattr(self, self._func_named(command)), HELP_CATEGORY): - category = getattr(getattr(self, self._func_named(command)), HELP_CATEGORY) + if hasattr(func, HELP_CATEGORY): + category = getattr(func, HELP_CATEGORY) cmds_cats.setdefault(category, []) cmds_cats[category].append(command) else: @@ -2432,12 +2697,13 @@ Usage: Usage: unalias [-a] name [name ...] func = getattr(self, 'help_' + command) except AttributeError: # Couldn't find a help function + func = self.cmd_func(command) try: # Now see if help_summary has been set - doc = getattr(self, self._func_named(command)).help_summary + doc = func.help_summary except AttributeError: # Last, try to directly access the function's doc-string - doc = getattr(self, self._func_named(command)).__doc__ + doc = func.__doc__ else: # we found the help function result = io.StringIO() @@ -2458,13 +2724,17 @@ Usage: Usage: unalias [-a] name [name ...] doc_block = [] found_first = False for doc_line in doc.splitlines(): - str(doc_line).strip() - if len(doc_line.strip()) > 0: - doc_block.append(doc_line.strip()) - found_first = True - else: + stripped_line = doc_line.strip() + + # Don't include :param type lines + if stripped_line.startswith(':'): if found_first: break + elif stripped_line: + doc_block.append(stripped_line) + found_first = True + elif found_first: + break for doc_line in doc_block: self.stdout.write('{: <{col_width}}{doc}\n'.format(command, @@ -2473,18 +2743,21 @@ Usage: Usage: unalias [-a] name [name ...] command = '' self.stdout.write("\n") - def do_shortcuts(self, _: str) -> None: - """Lists shortcuts available""" + @with_argparser(ACArgumentParser()) + def do_shortcuts(self, _: argparse.Namespace) -> None: + """List available shortcuts""" result = "\n".join('%s: %s' % (sc[0], sc[1]) for sc in sorted(self.shortcuts)) self.poutput("Shortcuts for other commands:\n{}\n".format(result)) - def do_eof(self, _: str) -> bool: + @with_argparser(ACArgumentParser(epilog=INTERNAL_COMMAND_EPILOG)) + def do_eof(self, _: argparse.Namespace) -> bool: """Called when <Ctrl>-D is pressed""" # End of script should not exit app, but <Ctrl>-D should. return self._STOP_AND_EXIT - def do_quit(self, _: str) -> bool: - """Exits this application""" + @with_argparser(ACArgumentParser()) + def do_quit(self, _: argparse.Namespace) -> bool: + """Exit this application""" self._should_quit = True return self._STOP_AND_EXIT @@ -2514,7 +2787,8 @@ Usage: Usage: unalias [-a] name [name ...] for (idx, (_, text)) in enumerate(fulloptions): self.poutput(' %2d. %s\n' % (idx + 1, text)) while True: - response = input(prompt) + safe_prompt = rl_make_safe_prompt(prompt) + response = input(safe_prompt) if rl_type != RlType.NONE: hlen = readline.get_current_history_length() @@ -2541,22 +2815,21 @@ Usage: Usage: unalias [-a] name [name ...] Output redirection and pipes allowed: {}""" return read_only_settings.format(str(self.terminators), self.allow_cli_args, self.allow_redirection) - def show(self, args: argparse.Namespace, parameter: str) -> None: + def show(self, args: argparse.Namespace, parameter: str='') -> None: """Shows current settings of parameters. :param args: argparse parsed arguments from the set command - :param parameter: - :return: + :param parameter: optional search parameter """ - param = '' - if parameter: - param = parameter.strip().lower() + param = parameter.strip().lower() result = {} maxlen = 0 + for p in self.settable: if (not param) or p.startswith(param): - result[p] = '%s: %s' % (p, str(getattr(self, p))) + result[p] = '{}: {}'.format(p, str(getattr(self, p))) maxlen = max(maxlen, len(result[p])) + if result: for p in sorted(result): if args.long: @@ -2568,58 +2841,69 @@ Usage: Usage: unalias [-a] name [name ...] if args.all: self.poutput('\nRead only settings:{}'.format(self.cmdenvironment())) else: - raise LookupError("Parameter '%s' not supported (type 'set' for list of parameters)." % param) + raise LookupError("Parameter '{}' not supported (type 'set' for list of parameters).".format(param)) - set_description = "Sets a settable parameter or shows current settings of parameters.\n" - set_description += "\n" - set_description += "Accepts abbreviated parameter names so long as there is no ambiguity.\n" - set_description += "Call without arguments for a list of settable parameters with their values." + set_description = ("Set a settable parameter or show current settings of parameters\n" + "\n" + "Accepts abbreviated parameter names so long as there is no ambiguity.\n" + "Call without arguments for a list of settable parameters with their values.") set_parser = ACArgumentParser(description=set_description) set_parser.add_argument('-a', '--all', action='store_true', help='display read-only settings as well') set_parser.add_argument('-l', '--long', action='store_true', help='describe function of parameter') - set_parser.add_argument('settable', nargs=(0, 2), help='[param_name] [value]') + setattr(set_parser.add_argument('param', nargs='?', help='parameter to set or view'), + ACTION_ARG_CHOICES, settable) + set_parser.add_argument('value', nargs='?', help='the new value for settable') @with_argparser(set_parser) def do_set(self, args: argparse.Namespace) -> None: - """Sets a settable parameter or shows current settings of parameters""" - try: - param_name, val = args.settable - val = val.strip() - param_name = param_name.strip().lower() - if param_name not in self.settable: - hits = [p for p in self.settable if p.startswith(param_name)] - if len(hits) == 1: - param_name = hits[0] - else: - return self.show(args, param_name) - current_val = getattr(self, param_name) - if (val[0] == val[-1]) and val[0] in ("'", '"'): - val = val[1:-1] + """Set a settable parameter or show current settings of parameters""" + + # Check if param was passed in + if not args.param: + return self.show(args) + param = args.param.strip().lower() + + # Check if value was passed in + if not args.value: + return self.show(args, param) + value = args.value + + # Check if param points to just one settable + if param not in self.settable: + hits = [p for p in self.settable if p.startswith(param)] + if len(hits) == 1: + param = hits[0] else: - val = utils.cast(current_val, val) - setattr(self, param_name, val) - self.poutput('%s - was: %s\nnow: %s\n' % (param_name, current_val, val)) - if current_val != val: - try: - onchange_hook = getattr(self, '_onchange_%s' % param_name) - onchange_hook(old=current_val, new=val) - except AttributeError: - pass - except (ValueError, AttributeError): - param = '' - if args.settable: - param = args.settable[0] - self.show(args, param) - - def do_shell(self, statement: Statement) -> None: - """Execute a command as if at the OS prompt - - Usage: shell <command> [arguments]""" + return self.show(args, param) + + # Update the settable's value + current_value = getattr(self, param) + value = utils.cast(current_value, value) + setattr(self, param, value) + + self.poutput('{} - was: {}\nnow: {}\n'.format(param, current_value, value)) + + # See if we need to call a change hook for this settable + if current_value != value: + onchange_hook = getattr(self, '_onchange_{}'.format(param), None) + if onchange_hook is not None: + onchange_hook(old=current_value, new=value) + + shell_parser = ACArgumentParser() + setattr(shell_parser.add_argument('command', help='the command to run'), + ACTION_ARG_CHOICES, ('shell_cmd_complete',)) + setattr(shell_parser.add_argument('command_args', nargs=argparse.REMAINDER, + help='arguments to pass to command'), + ACTION_ARG_CHOICES, ('path_complete',)) + + @with_argparser(shell_parser, preserve_quotes=True) + def do_shell(self, args: argparse.Namespace) -> None: + """Execute a command as if at the OS prompt""" import subprocess - # Get list of arguments to shell with quotes preserved - tokens = statement.arg_list + # Create a list of arguments to shell + tokens = [args.command] + args.command_args # Support expanding ~ in quoted paths for index, _ in enumerate(tokens): @@ -2640,18 +2924,6 @@ Usage: Usage: unalias [-a] name [name ...] proc = subprocess.Popen(expanded_command, stdout=self.stdout, shell=True) proc.communicate() - def complete_shell(self, text: str, line: str, begidx: int, endidx: int) -> List[str]: - """Handles tab completion of executable commands and local file system paths for the shell command - - :param text: the string prefix we are attempting to match (all returned matches must begin with it) - :param line: the current input line with leading whitespace removed - :param begidx: the beginning index of the prefix text - :param endidx: the ending index of the prefix text - :return: a list of possible tab completions - """ - index_dict = {1: self.shell_cmd_complete} - return self.index_based_complete(text, line, begidx, endidx, index_dict, self.path_complete) - @staticmethod def _reset_py_display() -> None: """ @@ -2676,37 +2948,35 @@ Usage: Usage: unalias [-a] name [name ...] sys.displayhook = sys.__displayhook__ sys.excepthook = sys.__excepthook__ - def do_py(self, arg: str) -> bool: - """ - Invoke python command, shell, or script + py_parser = ACArgumentParser() + py_parser.add_argument('command', help="command to run", nargs='?') + py_parser.add_argument('remainder', help="remainder of command", nargs=argparse.REMAINDER) - py <command>: Executes a Python command. - py: Enters interactive Python mode. - End with ``Ctrl-D`` (Unix) / ``Ctrl-Z`` (Windows), ``quit()``, '`exit()``. - Non-python commands can be issued with ``pyscript_name("your command")``. - Run python code from external script files with ``run("script.py")`` - """ - from .pyscript_bridge import PyscriptBridge + @with_argparser(py_parser) + def do_py(self, args: argparse.Namespace) -> bool: + """Invoke Python command or shell""" + from .pyscript_bridge import PyscriptBridge, CommandResult if self._in_py: - self.perror("Recursively entering interactive Python consoles is not allowed.", traceback_war=False) + err = "Recursively entering interactive Python consoles is not allowed." + self.perror(err, traceback_war=False) + self._last_result = CommandResult('', err) return False self._in_py = True # noinspection PyBroadException try: - arg = arg.strip() - # Support the run command even if called prior to invoking an interactive interpreter - def run(filename): + def run(filename: str): """Run a Python script file in the interactive console. - :param filename: str - filename of *.py script file to run + :param filename: filename of *.py script file to run """ + expanded_filename = os.path.expanduser(filename) try: - with open(filename) as f: + with open(expanded_filename) as f: interp.runcode(f.read()) except OSError as ex: - error_msg = "Error opening script file '{}': {}".format(filename, ex) + error_msg = "Error opening script file '{}': {}".format(expanded_filename, ex) self.perror(error_msg, traceback_war=False) bridge = PyscriptBridge(self) @@ -2721,8 +2991,12 @@ Usage: Usage: unalias [-a] name [name ...] interp = InteractiveConsole(locals=localvars) interp.runcode('import sys, os;sys.path.insert(0, os.getcwd())') - if arg: - interp.runcode(arg) + if args.command: + full_command = utils.quote_string_if_needed(args.command) + for cur_token in args.remainder: + full_command += ' ' + utils.quote_string_if_needed(cur_token) + + interp.runcode(full_command) # If there are no args, then we will open an interactive Python console else: @@ -2787,11 +3061,14 @@ Usage: Usage: unalias [-a] name [name ...] sys.stdin = self.stdin cprt = 'Type "help", "copyright", "credits" or "license" for more information.' - docstr = self.do_py.__doc__.replace('pyscript_name', self.pyscript_name) + instructions = ('End with `Ctrl-D` (Unix) / `Ctrl-Z` (Windows), `quit()`, `exit()`.\n' + 'Non-Python commands can be issued with: {}("your command")\n' + 'Run Python code from external script files with: run("script.py")' + .format(self.pyscript_name)) try: - interp.interact(banner="Python {} on {}\n{}\n({})\n{}". - format(sys.version, sys.platform, cprt, self.__class__.__name__, docstr)) + interp.interact(banner="Python {} on {}\n{}\n\n{}\n". + format(sys.version, sys.platform, cprt, instructions)) except EmbeddedConsoleExit: pass @@ -2832,30 +3109,22 @@ Usage: Usage: unalias [-a] name [name ...] self._in_py = False return self._should_quit - @with_argument_list - def do_pyscript(self, arglist: List[str]) -> None: - """\nRuns a python script file inside the console - - Usage: pyscript <script_path> [script_arguments] + pyscript_parser = ACArgumentParser() + setattr(pyscript_parser.add_argument('script_path', help='path to the script file'), + ACTION_ARG_CHOICES, ('path_complete',)) + pyscript_parser.add_argument('script_arguments', nargs=argparse.REMAINDER, + help='arguments to pass to script') -Console commands can be executed inside this script with cmd("your command") -However, you cannot run nested "py" or "pyscript" commands from within this script -Paths or arguments that contain spaces must be enclosed in quotes -""" - if not arglist: - self.perror("pyscript command requires at least 1 argument ...", traceback_war=False) - self.do_help(['pyscript']) - return - - # Get the absolute path of the script - script_path = os.path.expanduser(arglist[0]) + @with_argparser(pyscript_parser) + def do_pyscript(self, args: argparse.Namespace) -> None: + """Run a Python script file inside the console""" + script_path = os.path.expanduser(args.script_path) # Save current command line arguments orig_args = sys.argv # Overwrite sys.argv to allow the script to take command line arguments - sys.argv = [script_path] - sys.argv.extend(arglist[1:]) + sys.argv = [script_path] + args.script_arguments # Run the script - use repr formatting to escape things which need to be escaped to prevent issues on Windows self.do_py("run({!r})".format(script_path)) @@ -2863,33 +3132,24 @@ Paths or arguments that contain spaces must be enclosed in quotes # Restore command line arguments to original state sys.argv = orig_args - def complete_pyscript(self, text: str, line: str, begidx: int, endidx: int) -> List[str]: - """Enable tab-completion for pyscript command.""" - index_dict = {1: self.path_complete} - return self.index_based_complete(text, line, begidx, endidx, index_dict) - # Only include the do_ipy() method if IPython is available on the system - if ipython_available: - # noinspection PyMethodMayBeStatic,PyUnusedLocal - def do_ipy(self, arg: str) -> None: - """Enters an interactive IPython shell. - - Run python code from external files with ``run filename.py`` - End with ``Ctrl-D`` (Unix) / ``Ctrl-Z`` (Windows), ``quit()``, '`exit()``. - """ + if ipython_available: # pragma: no cover + @with_argparser(ACArgumentParser()) + def do_ipy(self, _: argparse.Namespace) -> None: + """Enter an interactive IPython shell""" from .pyscript_bridge import PyscriptBridge bridge = PyscriptBridge(self) + banner = ('Entering an embedded IPython shell. Type quit or <Ctrl>-d to exit.\n' + 'Run Python code from external files with: run filename.py\n') + exit_msg = 'Leaving IPython, back to {}'.format(sys.argv[0]) + if self.locals_in_py: def load_ipy(self, app): - banner = 'Entering an embedded IPython shell type quit() or <Ctrl>-d to exit ...' - exit_msg = 'Leaving IPython, back to {}'.format(sys.argv[0]) embed(banner1=banner, exit_msg=exit_msg) load_ipy(self, bridge) else: def load_ipy(app): - banner = 'Entering an embedded IPython shell type quit() or <Ctrl>-d to exit ...' - exit_msg = 'Leaving IPython, back to {}'.format(sys.argv[0]) embed(banner1=banner, exit_msg=exit_msg) load_ipy(bridge) @@ -2898,10 +3158,10 @@ Paths or arguments that contain spaces must be enclosed in quotes history_parser_group.add_argument('-r', '--run', action='store_true', help='run selected history items') history_parser_group.add_argument('-e', '--edit', action='store_true', help='edit and then run selected history items') - history_parser_group.add_argument('-s', '--script', action='store_true', help='script format; no separation lines') + history_parser_group.add_argument('-s', '--script', action='store_true', help='output commands in script format') history_parser_group.add_argument('-o', '--output-file', metavar='FILE', help='output commands to a script file') history_parser_group.add_argument('-t', '--transcript', help='output commands and results to a transcript file') - history_parser_group.add_argument('-c', '--clear', action="store_true", help='clears all history') + history_parser_group.add_argument('-c', '--clear', action="store_true", help='clear all history') _history_arg_help = """empty all history items a one history item by number a..b, a:b, a:, ..b items by indices (inclusive) @@ -3031,7 +3291,7 @@ a..b, a:b, a:, ..b items by indices (inclusive) # get the output out of the buffer output = membuf.read() # and add the regex-escaped output to the transcript - transcript += output.replace('/', '\/') + transcript += output.replace('/', r'\/') # Restore stdout to its original state self.stdout = saved_self_stdout @@ -3053,29 +3313,28 @@ a..b, a:b, a:, ..b items by indices (inclusive) msg = '{} {} saved to transcript file {!r}' self.pfeedback(msg.format(len(history), plural, transcript_file)) - @with_argument_list - def do_edit(self, arglist: List[str]) -> None: - """Edit a file in a text editor + edit_description = ("Edit a file in a text editor\n" + "\n" + "The editor used is determined by a settable parameter. To set it:\n" + "\n" + " set editor (program-name)") -Usage: edit [file_path] - Where: - * file_path - path to a file to open in editor + edit_parser = ACArgumentParser(description=edit_description) + setattr(edit_parser.add_argument('file_path', help="path to a file to open in editor", nargs="?"), + ACTION_ARG_CHOICES, ('path_complete',)) -The editor used is determined by the ``editor`` settable parameter. -"set editor (program-name)" to change or set the EDITOR environment variable. -""" + @with_argparser(edit_parser) + def do_edit(self, args: argparse.Namespace) -> None: + """Edit a file in a text editor""" if not self.editor: raise EnvironmentError("Please use 'set editor' to specify your text editing program of choice.") - filename = arglist[0] if arglist else '' - if filename: - os.system('"{}" "{}"'.format(self.editor, filename)) - else: - os.system('"{}"'.format(self.editor)) - def complete_edit(self, text: str, line: str, begidx: int, endidx: int) -> List[str]: - """Enable tab-completion for edit command.""" - index_dict = {1: self.path_complete} - return self.index_based_complete(text, line, begidx, endidx, index_dict) + editor = utils.quote_string_if_needed(self.editor) + if args.file_path: + expanded_path = utils.quote_string_if_needed(os.path.expanduser(args.file_path)) + os.system('{} {}'.format(editor, expanded_path)) + else: + os.system('{}'.format(editor)) @property def _current_script_dir(self) -> Optional[str]: @@ -3085,54 +3344,25 @@ The editor used is determined by the ``editor`` settable parameter. else: return None - @with_argument_list - def do__relative_load(self, arglist: List[str]) -> None: - """Runs commands in script file that is encoded as either ASCII or UTF-8 text - - Usage: _relative_load <file_path> - - optional argument: - file_path a file path pointing to a script - -Script should contain one command per line, just like command would be typed in console. - -If this is called from within an already-running script, the filename will be interpreted -relative to the already-running script's directory. - -NOTE: This command is intended to only be used within text file scripts. - """ - # If arg is None or arg is an empty string this is an error - if not arglist: - self.perror('_relative_load command requires a file path:', traceback_war=False) - return - - file_path = arglist[0].strip() - # NOTE: Relative path is an absolute path, it is just relative to the current script directory - relative_path = os.path.join(self._current_script_dir or '', file_path) - self.do_load([relative_path]) - - def do_eos(self, _: str) -> None: - """Handles cleanup when a script has finished executing""" + @with_argparser(ACArgumentParser(epilog=INTERNAL_COMMAND_EPILOG)) + def do_eos(self, _: argparse.Namespace) -> None: + """Handle cleanup when a script has finished executing""" if self._script_dir: self._script_dir.pop() - @with_argument_list - def do_load(self, arglist: List[str]) -> None: - """Runs commands in script file that is encoded as either ASCII or UTF-8 text + load_description = ("Run commands in script file that is encoded as either ASCII or UTF-8 text\n" + "\n" + "Script should contain one command per line, just like the command would be\n" + "typed in the console.") - Usage: load <file_path> + load_parser = ACArgumentParser(description=load_description) + setattr(load_parser.add_argument('script_path', help="path to the script file"), + ACTION_ARG_CHOICES, ('path_complete',)) - * file_path - a file path pointing to a script - -Script should contain one command per line, just like command would be typed in console. - """ - # If arg is None or arg is an empty string this is an error - if not arglist: - self.perror('load command requires a file path', traceback_war=False) - return - - file_path = arglist[0].strip() - expanded_path = os.path.abspath(os.path.expanduser(file_path)) + @with_argparser(load_parser) + def do_load(self, args: argparse.Namespace) -> None: + """Run commands in script file that is encoded as either ASCII or UTF-8 text""" + expanded_path = os.path.abspath(os.path.expanduser(args.script_path)) # Make sure the path exists and we can access it if not os.path.exists(expanded_path): @@ -3166,10 +3396,24 @@ Script should contain one command per line, just like command would be typed in self._script_dir.append(os.path.dirname(expanded_path)) - def complete_load(self, text: str, line: str, begidx: int, endidx: int) -> List[str]: - """Enable tab-completion for load command.""" - index_dict = {1: self.path_complete} - return self.index_based_complete(text, line, begidx, endidx, index_dict) + relative_load_description = load_description + relative_load_description += ("\n\n" + "If this is called from within an already-running script, the filename will be\n" + "interpreted relative to the already-running script's directory.") + + relative_load_epilog = ("Notes:\n" + " This command is intended to only be used within text file scripts.") + + relative_load_parser = ACArgumentParser(description=relative_load_description, epilog=relative_load_epilog) + relative_load_parser.add_argument('file_path', help='a file path pointing to a script') + + @with_argparser(relative_load_parser) + def do__relative_load(self, args: argparse.Namespace) -> None: + """""" + file_path = args.file_path + # NOTE: Relative path is an absolute path, it is just relative to the current script directory + relative_path = os.path.join(self._current_script_dir or '', file_path) + self.do_load(relative_path) def run_transcript_tests(self, callargs: List[str]) -> None: """Runs transcript tests for provided file(s). @@ -3191,6 +3435,125 @@ Script should contain one command per line, just like command would be typed in runner = unittest.TextTestRunner() runner.run(testcase) + def _clear_input_lines_str(self) -> str: # pragma: no cover + """ + Returns a string that if printed will clear the prompt and input lines in the terminal, + leaving the cursor at the beginning of the first input line + :return: the string to print + """ + if not (vt100_support and self.use_rawinput): + return '' + + import shutil + import colorama.ansi as ansi + from colorama import Cursor + + visible_prompt = self.visible_prompt + + # Get the size of the terminal + terminal_size = shutil.get_terminal_size() + + # Figure out how many lines the prompt and user input take up + total_str_size = len(visible_prompt) + len(readline.get_line_buffer()) + num_input_lines = int(total_str_size / terminal_size.columns) + 1 + + # Get the cursor's offset from the beginning of the first input line + cursor_input_offset = len(visible_prompt) + rl_get_point() + + # Calculate what input line the cursor is on + cursor_input_line = int(cursor_input_offset / terminal_size.columns) + 1 + + # Create a string that will clear all input lines and print the alert + terminal_str = '' + + # Move the cursor down to the last input line + if cursor_input_line != num_input_lines: + terminal_str += Cursor.DOWN(num_input_lines - cursor_input_line) + + # Clear each input line from the bottom up so that the cursor ends up on the original first input line + terminal_str += (ansi.clear_line() + Cursor.UP(1)) * (num_input_lines - 1) + terminal_str += ansi.clear_line() + + # Move the cursor to the beginning of the first input line + terminal_str += '\r' + + return terminal_str + + def async_alert(self, alert_msg: str, new_prompt: Optional[str] = None) -> None: # pragma: no cover + """ + Used to display an important message to the user while they are at the prompt in between commands. + To the user it appears as if an alert message is printed above the prompt and their current input + text and cursor location is left alone. + + IMPORTANT: Do not call this unless you have acquired self.terminal_lock + first, which ensures a prompt is onscreen + + :param alert_msg: the message to display to the user + :param new_prompt: if you also want to change the prompt that is displayed, then include it here + see async_update_prompt() docstring for guidance on updating a prompt + :raises RuntimeError if called while another thread holds terminal_lock + """ + if not (vt100_support and self.use_rawinput): + return + + # Sanity check that can't fail if self.terminal_lock was acquired before calling this function + if self.terminal_lock.acquire(blocking=False): + + # Generate a string to clear the prompt and input lines and replace with the alert + terminal_str = self._clear_input_lines_str() + if alert_msg: + terminal_str += alert_msg + '\n' + + # Set the new prompt now that _clear_input_lines_str is done using the old prompt + if new_prompt is not None: + self.prompt = new_prompt + rl_set_prompt(self.prompt) + + # Print terminal_str to erase the lines + if rl_type == RlType.GNU: + sys.stderr.write(terminal_str) + elif rl_type == RlType.PYREADLINE: + readline.rl.mode.console.write(terminal_str) + + # Redraw the prompt and input lines + rl_force_redisplay() + + self.terminal_lock.release() + + else: + raise RuntimeError("another thread holds terminal_lock") + + def async_update_prompt(self, new_prompt: str) -> None: # pragma: no cover + """ + Updates the prompt while the user is still typing at it. This is good for alerting the user to system + changes dynamically in between commands. For instance you could alter the color of the prompt to indicate + a system status or increase a counter to report an event. If you do alter the actual text of the prompt, + it is best to keep the prompt the same width as what's on screen. Otherwise the user's input text will + be shifted and the update will not be seamless. + + IMPORTANT: Do not call this unless you have acquired self.terminal_lock + first, which ensures a prompt is onscreen + + :param new_prompt: what to change the prompt to + """ + self.async_alert('', new_prompt) + + @staticmethod + def set_window_title(title: str) -> None: # pragma: no cover + """ + Sets the terminal window title + :param title: the new window title + """ + if not vt100_support: + return + + import colorama.ansi as ansi + try: + sys.stderr.write(ansi.set_title(title)) + except AttributeError: + # Debugging in Pycharm has issues with setting terminal title + pass + def cmdloop(self, intro: Optional[str]=None) -> None: """This is an outer wrapper around _cmdloop() which deals with extra features provided by cmd2. @@ -3216,6 +3579,14 @@ Script should contain one command per line, just like command would be typed in if callargs: self.cmdqueue.extend(callargs) + # Register a SIGINT signal handler for Ctrl+C + import signal + original_sigint_handler = signal.getsignal(signal.SIGINT) + signal.signal(signal.SIGINT, self.sigint_handler) + + # Grab terminal lock before the prompt has been drawn by readline + self.terminal_lock.acquire() + # Always run the preloop first for func in self._preloop_hooks: func() @@ -3241,6 +3612,13 @@ Script should contain one command per line, just like command would be typed in func() self.postloop() + # Release terminal lock now that postloop code should have stopped any terminal updater threads + # This will also zero the lock count in case cmdloop() is called again + self.terminal_lock.release() + + # Restore the original signal handler + signal.signal(signal.SIGINT, original_sigint_handler) + if self.exit_code is not None: sys.exit(self.exit_code) @@ -3249,7 +3627,7 @@ Script should contain one command per line, just like command would be typed in # plugin related functions # ### - def _initialize_plugin_system(self): + def _initialize_plugin_system(self) -> None: """Initialize the plugin system""" self._preloop_hooks = [] self._postloop_hooks = [] @@ -3259,7 +3637,7 @@ Script should contain one command per line, just like command would be typed in self._cmdfinalization_hooks = [] @classmethod - def _validate_callable_param_count(cls, func: Callable, count: int): + def _validate_callable_param_count(cls, func: Callable, count: int) -> None: """Ensure a function has the given number of parameters.""" signature = inspect.signature(func) # validate that the callable has the right number of parameters @@ -3272,7 +3650,7 @@ Script should contain one command per line, just like command would be typed in )) @classmethod - def _validate_prepostloop_callable(cls, func: Callable): + def _validate_prepostloop_callable(cls, func: Callable[[None], None]) -> None: """Check parameter and return types for preloop and postloop hooks.""" cls._validate_callable_param_count(func, 0) # make sure there is no return notation @@ -3282,18 +3660,18 @@ Script should contain one command per line, just like command would be typed in func.__name__, )) - def register_preloop_hook(self, func: Callable): + def register_preloop_hook(self, func: Callable[[None], None]) -> None: """Register a function to be called at the beginning of the command loop.""" self._validate_prepostloop_callable(func) self._preloop_hooks.append(func) - def register_postloop_hook(self, func: Callable): + def register_postloop_hook(self, func: Callable[[None], None]) -> None: """Register a function to be called at the end of the command loop.""" self._validate_prepostloop_callable(func) self._postloop_hooks.append(func) @classmethod - def _validate_postparsing_callable(cls, func: Callable): + def _validate_postparsing_callable(cls, func: Callable[[plugin.PostparsingData], plugin.PostparsingData]) -> None: """Check parameter and return types for postparsing hooks""" cls._validate_callable_param_count(func, 1) signature = inspect.signature(func) @@ -3307,13 +3685,13 @@ Script should contain one command per line, just like command would be typed in func.__name__ )) - def register_postparsing_hook(self, func: Callable): + def register_postparsing_hook(self, func: Callable[[plugin.PostparsingData], plugin.PostparsingData]) -> None: """Register a function to be called after parsing user input but before running the command""" self._validate_postparsing_callable(func) self._postparsing_hooks.append(func) @classmethod - def _validate_prepostcmd_hook(cls, func: Callable, data_type: Type): + def _validate_prepostcmd_hook(cls, func: Callable, data_type: Type) -> None: """Check parameter and return types for pre and post command hooks.""" signature = inspect.signature(func) # validate that the callable has the right number of parameters @@ -3340,18 +3718,19 @@ Script should contain one command per line, just like command would be typed in data_type, )) - def register_precmd_hook(self, func: Callable): + def register_precmd_hook(self, func: Callable[[plugin.PrecommandData], plugin.PrecommandData]) -> None: """Register a hook to be called before the command function.""" self._validate_prepostcmd_hook(func, plugin.PrecommandData) self._precmd_hooks.append(func) - def register_postcmd_hook(self, func: Callable): + def register_postcmd_hook(self, func: Callable[[plugin.PostcommandData], plugin.PostcommandData]) -> None: """Register a hook to be called after the command function.""" self._validate_prepostcmd_hook(func, plugin.PostcommandData) self._postcmd_hooks.append(func) @classmethod - def _validate_cmdfinalization_callable(cls, func: Callable): + def _validate_cmdfinalization_callable(cls, func: Callable[[plugin.CommandFinalizationData], + plugin.CommandFinalizationData]) -> None: """Check parameter and return types for command finalization hooks.""" cls._validate_callable_param_count(func, 1) signature = inspect.signature(func) @@ -3363,7 +3742,8 @@ Script should contain one command per line, just like command would be typed in raise TypeError("{} must declare return a return type of " "'cmd2.plugin.CommandFinalizationData'".format(func.__name__)) - def register_cmdfinalization_hook(self, func: Callable): + def register_cmdfinalization_hook(self, func: Callable[[plugin.CommandFinalizationData], + plugin.CommandFinalizationData]) -> None: """Register a hook to be called after a command is completed, whether it completes successfully or not.""" self._validate_cmdfinalization_callable(func) self._cmdfinalization_hooks.append(func) @@ -3429,7 +3809,7 @@ class History(list): def get(self, getme: Optional[Union[int, str]]=None) -> List[HistoryItem]: """Get an item or items from the History list using 1-based indexing. - :param getme: item(s) to get - either an integer index or string to search for + :param getme: optional item(s) to get (either an integer index or string to search for) :return: list of HistoryItems matching the retrieval criteria """ if not getme: |