diff options
Diffstat (limited to 'cmd2')
-rw-r--r-- | cmd2/argcomplete_bridge.py | 10 | ||||
-rwxr-xr-x | cmd2/argparse_completer.py | 64 | ||||
-rw-r--r-- | cmd2/cmd2.py | 1562 | ||||
-rw-r--r-- | cmd2/constants.py | 5 | ||||
-rw-r--r-- | cmd2/parsing.py | 95 | ||||
-rw-r--r-- | cmd2/pyscript_bridge.py | 71 | ||||
-rw-r--r-- | cmd2/rl_utils.py | 116 | ||||
-rw-r--r-- | cmd2/transcript.py | 25 | ||||
-rw-r--r-- | cmd2/utils.py | 158 |
9 files changed, 1389 insertions, 717 deletions
diff --git a/cmd2/argcomplete_bridge.py b/cmd2/argcomplete_bridge.py index 7bdb816f..51e856ef 100644 --- a/cmd2/argcomplete_bridge.py +++ b/cmd2/argcomplete_bridge.py @@ -23,16 +23,18 @@ else: import os import shlex import sys + from typing import List, Tuple, Union from . import constants from . import utils - def tokens_for_completion(line, endidx): + def tokens_for_completion(line: str, endidx: int) -> Union[Tuple[List[str], List[str], int, int], + Tuple[None, None, None, None]]: """ Used by tab completion functions to get all tokens through the one being completed - :param line: str - the current input line with leading whitespace removed - :param endidx: int - the ending index of the prefix text + :param line: the current input line with leading whitespace removed + :param endidx: the ending index of the prefix text :return: A 4 item tuple where the items are On Success tokens: list of unquoted tokens @@ -46,7 +48,7 @@ else: The last item in both lists is the token being tab completed On Failure - Both items are None + All 4 items are None """ unclosed_quote = '' quotes_to_try = copy.copy(constants.QUOTES) diff --git a/cmd2/argparse_completer.py b/cmd2/argparse_completer.py index 0e241cd9..ad2c520b 100755 --- a/cmd2/argparse_completer.py +++ b/cmd2/argparse_completer.py @@ -510,6 +510,18 @@ class AutoCompleter(object): return self.basic_complete(text, line, begidx, endidx, completers.keys()) return [] + def format_help(self, tokens: List[str]) -> str: + """Supports the completion of sub-commands for commands through the cmd2 help command.""" + for idx, token in enumerate(tokens): + if idx >= self._token_start_index: + if self._positional_completers: + # For now argparse only allows 1 sub-command group per level + # so this will only loop once. + for completers in self._positional_completers.values(): + if token in completers: + return completers[token].format_help(tokens) + return self._parser.format_help() + @staticmethod def _process_action_nargs(action: argparse.Action, arg_state: _ArgumentState) -> None: if isinstance(action, _RangeAction): @@ -654,12 +666,19 @@ class AutoCompleter(object): else: prefix = '' + if action.help is None: + help_text = '' + else: + help_text = action.help + + # is there anything to print for this parameter? + if not prefix and not help_text: + return + prefix = ' {0: <{width}} '.format(prefix, width=20) pref_len = len(prefix) - if action.help is not None: - help_lines = action.help.splitlines() - else: - help_lines = [''] + help_lines = help_text.splitlines() + if len(help_lines) == 1: print('\nHint:\n{}{}\n'.format(prefix, help_lines[0])) else: @@ -676,12 +695,12 @@ class AutoCompleter(object): """ Performs tab completion against a list - :param text: str - the string prefix we are attempting to match (all returned matches must begin with it) - :param line: str - the current input line with leading whitespace removed - :param begidx: int - the beginning index of the prefix text - :param endidx: int - the ending index of the prefix text - :param match_against: Collection - the list being matched against - :return: List[str] - a list of possible tab completions + :param text: the string prefix we are attempting to match (all returned matches must begin with it) + :param line: the current input line with leading whitespace removed + :param begidx: the beginning index of the prefix text + :param endidx: the ending index of the prefix text + :param match_against: the list being matched against + :return: a list of possible tab completions """ return [cur_match for cur_match in match_against if cur_match.startswith(text)] @@ -731,7 +750,7 @@ class ACHelpFormatter(argparse.RawTextHelpFormatter): # build full usage string format = self._format_actions_usage - action_usage = format(positionals + required_options + optionals, groups) + action_usage = format(required_options + optionals + positionals, groups) usage = ' '.join([s for s in [prog, action_usage] if s]) # wrap the usage parts if it's too long @@ -742,15 +761,15 @@ class ACHelpFormatter(argparse.RawTextHelpFormatter): # break usage into wrappable parts part_regexp = r'\(.*?\)+|\[.*?\]+|\S+' + req_usage = format(required_options, groups) opt_usage = format(optionals, groups) pos_usage = format(positionals, groups) - req_usage = format(required_options, groups) + req_parts = _re.findall(part_regexp, req_usage) opt_parts = _re.findall(part_regexp, opt_usage) pos_parts = _re.findall(part_regexp, pos_usage) - req_parts = _re.findall(part_regexp, req_usage) + assert ' '.join(req_parts) == req_usage assert ' '.join(opt_parts) == opt_usage assert ' '.join(pos_parts) == pos_usage - assert ' '.join(req_parts) == req_usage # End cmd2 customization @@ -780,13 +799,15 @@ class ACHelpFormatter(argparse.RawTextHelpFormatter): if len(prefix) + len(prog) <= 0.75 * text_width: indent = ' ' * (len(prefix) + len(prog) + 1) # Begin cmd2 customization - if opt_parts: - lines = get_lines([prog] + pos_parts, indent, prefix) - lines.extend(get_lines(req_parts, indent)) + if req_parts: + lines = get_lines([prog] + req_parts, indent, prefix) lines.extend(get_lines(opt_parts, indent)) + lines.extend(get_lines(pos_parts, indent)) + elif opt_parts: + lines = get_lines([prog] + opt_parts, indent, prefix) + lines.extend(get_lines(pos_parts, indent)) elif pos_parts: lines = get_lines([prog] + pos_parts, indent, prefix) - lines.extend(get_lines(req_parts, indent)) else: lines = [prog] # End cmd2 customization @@ -795,13 +816,13 @@ class ACHelpFormatter(argparse.RawTextHelpFormatter): else: indent = ' ' * len(prefix) # Begin cmd2 customization - parts = pos_parts + req_parts + opt_parts + parts = req_parts + opt_parts + pos_parts lines = get_lines(parts, indent) if len(lines) > 1: lines = [] - lines.extend(get_lines(pos_parts, indent)) lines.extend(get_lines(req_parts, indent)) lines.extend(get_lines(opt_parts, indent)) + lines.extend(get_lines(pos_parts, indent)) # End cmd2 customization lines = [prog] + lines @@ -870,6 +891,9 @@ class ACHelpFormatter(argparse.RawTextHelpFormatter): result = super()._format_args(action, default_metavar) return result + def format_help(self): + return super().format_help() + '\n' + # noinspection PyCompatibility class ACArgumentParser(argparse.ArgumentParser): diff --git a/cmd2/cmd2.py b/cmd2/cmd2.py index fb929078..dec0a04d 100644 --- a/cmd2/cmd2.py +++ b/cmd2/cmd2.py @@ -32,25 +32,26 @@ Git repository on GitHub at https://github.com/python-cmd2/cmd2 import argparse import cmd import collections +import colorama from colorama import Fore import glob import inspect import os -import platform import re import shlex import sys -from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple, Type, Union +import threading +from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple, Type, Union, IO from . import constants from . import utils from . import plugin -from .argparse_completer import AutoCompleter, ACArgumentParser +from .argparse_completer import AutoCompleter, ACArgumentParser, ACTION_ARG_CHOICES from .clipboard import can_clip, get_paste_buffer, write_to_paste_buffer -from .parsing import StatementParser, Statement +from .parsing import StatementParser, Statement, Macro, MacroArg # Set up readline -from .rl_utils import rl_type, RlType +from .rl_utils import rl_type, RlType, rl_get_point, rl_set_prompt, vt100_support, rl_make_safe_prompt if rl_type == RlType.NONE: # pragma: no cover rl_warning = "Readline features including tab completion have been disabled since no \n" \ "supported version of readline was found. To resolve this, install \n" \ @@ -104,9 +105,9 @@ except ImportError: # Python 3.4 require contextlib2 for temporarily redirecting stderr and stdout if sys.version_info < (3, 5): - from contextlib2 import redirect_stdout, redirect_stderr + from contextlib2 import redirect_stdout else: - from contextlib import redirect_stdout, redirect_stderr + from contextlib import redirect_stdout # Detect whether IPython is installed to determine if the built-in "ipy" command should be included ipython_available = True @@ -121,6 +122,13 @@ except ImportError: # pragma: no cover HELP_CATEGORY = 'help_category' HELP_SUMMARY = 'help_summary' +INTERNAL_COMMAND_EPILOG = ("Notes:\n" + " This command is for internal use and is not intended to be called from the\n" + " command line.") + +# All command functions start with this +COMMAND_PREFIX = 'do_' + def categorize(func: Union[Callable, Iterable], category: str) -> None: """Categorize a function. @@ -137,19 +145,21 @@ def categorize(func: Union[Callable, Iterable], category: str) -> None: setattr(func, HELP_CATEGORY, category) -def parse_quoted_string(cmdline: str) -> List[str]: - """Parse a quoted string into a list of arguments.""" - if isinstance(cmdline, list): +def parse_quoted_string(string: str, preserve_quotes: bool) -> List[str]: + """ + Parse a quoted string into a list of arguments + :param string: the string being parsed + :param preserve_quotes: if True, then quotes will not be stripped + """ + if isinstance(string, list): # arguments are already a list, return the list we were passed - lexed_arglist = cmdline + lexed_arglist = string else: # Use shlex to split the command line into a list of arguments based on shell rules - lexed_arglist = shlex.split(cmdline, posix=False) - # strip off outer quotes for convenience - temp_arglist = [] - for arg in lexed_arglist: - temp_arglist.append(utils.strip_quotes(arg)) - lexed_arglist = temp_arglist + lexed_arglist = shlex.split(string, posix=False) + + if not preserve_quotes: + lexed_arglist = [utils.strip_quotes(arg) for arg in lexed_arglist] return lexed_arglist @@ -161,7 +171,7 @@ def with_category(category: str) -> Callable: return cat_decorator -def with_argument_list(func: Callable) -> Callable: +def with_argument_list(func: Callable, preserve_quotes: bool=False) -> Callable: """A decorator to alter the arguments passed to a do_* cmd2 method. Default passes a string of whatever the user typed. With this decorator, the decorated method will receive a list @@ -170,18 +180,19 @@ def with_argument_list(func: Callable) -> Callable: @functools.wraps(func) def cmd_wrapper(self, cmdline): - lexed_arglist = parse_quoted_string(cmdline) + lexed_arglist = parse_quoted_string(cmdline, preserve_quotes) return func(self, lexed_arglist) cmd_wrapper.__doc__ = func.__doc__ return cmd_wrapper -def with_argparser_and_unknown_args(argparser: argparse.ArgumentParser) -> Callable: +def with_argparser_and_unknown_args(argparser: argparse.ArgumentParser, preserve_quotes: bool=False) -> Callable: """A decorator to alter a cmd2 method to populate its ``args`` argument by parsing arguments with the given instance of argparse.ArgumentParser, but also returning unknown args as a list. - :param argparser: argparse.ArgumentParser - given instance of ArgumentParser + :param argparser: given instance of ArgumentParser + :param preserve_quotes: if True, then the arguments passed to arparse be maintain their quotes :return: function that gets passed parsed args and a list of unknown args """ import functools @@ -190,7 +201,7 @@ def with_argparser_and_unknown_args(argparser: argparse.ArgumentParser) -> Calla def arg_decorator(func: Callable): @functools.wraps(func) def cmd_wrapper(instance, cmdline): - lexed_arglist = parse_quoted_string(cmdline) + lexed_arglist = parse_quoted_string(cmdline, preserve_quotes) try: args, unknown = argparser.parse_known_args(lexed_arglist) except SystemExit: @@ -219,11 +230,12 @@ def with_argparser_and_unknown_args(argparser: argparse.ArgumentParser) -> Calla return arg_decorator -def with_argparser(argparser: argparse.ArgumentParser) -> Callable: +def with_argparser(argparser: argparse.ArgumentParser, preserve_quotes: bool=False) -> Callable: """A decorator to alter a cmd2 method to populate its ``args`` argument by parsing arguments with the given instance of argparse.ArgumentParser. - :param argparser: argparse.ArgumentParser - given instance of ArgumentParser + :param argparser: given instance of ArgumentParser + :param preserve_quotes: if True, then the arguments passed to arparse be maintain their quotes :return: function that gets passed parsed args """ import functools @@ -232,7 +244,7 @@ def with_argparser(argparser: argparse.ArgumentParser) -> Callable: def arg_decorator(func: Callable): @functools.wraps(func) def cmd_wrapper(instance, cmdline): - lexed_arglist = parse_quoted_string(cmdline) + lexed_arglist = parse_quoted_string(cmdline, preserve_quotes) try: args = argparser.parse_args(lexed_arglist) except SystemExit: @@ -306,7 +318,6 @@ class Cmd(cmd.Cmd): # Attributes used to configure the StatementParser, best not to change these at runtime multiline_commands = [] shortcuts = {'?': 'help', '!': 'shell', '@': 'load', '@@': '_relative_load'} - aliases = dict() terminators = [';'] # Attributes which are NOT dynamically settable at runtime @@ -317,7 +328,7 @@ class Cmd(cmd.Cmd): reserved_words = [] # Attributes which ARE dynamically settable at runtime - colors = (platform.system() != 'Windows') + colors = constants.COLORS_TERMINAL continuation_prompt = '> ' debug = False echo = False @@ -337,7 +348,7 @@ class Cmd(cmd.Cmd): # To make an attribute settable with the "do_set" command, add it to this ... # This starts out as a dictionary but gets converted to an OrderedDict sorted alphabetically by key - settable = {'colors': 'Colorized output (*nix only)', + settable = {'colors': 'Allow colorized output (valid values: Terminal, Always, Never)', 'continuation_prompt': 'On 2nd+ line of input', 'debug': 'Show full error stack on error', 'echo': 'Echo command issued into output', @@ -369,6 +380,9 @@ class Cmd(cmd.Cmd): except AttributeError: pass + # Override whether ansi codes should be stripped from the output since cmd2 has its own logic for doing this + colorama.init(strip=False) + # initialize plugin system # needs to be done before we call __init__(0) self._initialize_plugin_system() @@ -376,12 +390,20 @@ class Cmd(cmd.Cmd): # Call super class constructor super().__init__(completekey=completekey, stdin=stdin, stdout=stdout) + # Get rid of cmd's complete_help() functions so AutoCompleter will complete our help command + if getattr(cmd.Cmd, 'complete_help', None) is not None: + delattr(cmd.Cmd, 'complete_help') + # Commands to exclude from the help menu and tab completion self.hidden_commands = ['eof', 'eos', '_relative_load'] # Commands to exclude from the history command self.exclude_from_history = '''history edit eof eos'''.split() + # Command aliases and macros + self.aliases = dict() + self.macros = dict() + self._finalize_app_parameters() self.initial_stdout = sys.stdout @@ -389,7 +411,7 @@ class Cmd(cmd.Cmd): self.pystate = {} self.py_history = [] self.pyscript_name = 'app' - self.keywords = self.reserved_words + [fname[3:] for fname in dir(self) if fname.startswith('do_')] + self.keywords = self.reserved_words + self.get_all_commands() self.statement_parser = StatementParser( allow_redirection=self.allow_redirection, terminators=self.terminators, @@ -417,13 +439,13 @@ class Cmd(cmd.Cmd): self._STOP_AND_EXIT = True # cmd convention self._colorcodes = {'bold': {True: '\x1b[1m', False: '\x1b[22m'}, - 'cyan': {True: '\x1b[36m', False: '\x1b[39m'}, - 'blue': {True: '\x1b[34m', False: '\x1b[39m'}, - 'red': {True: '\x1b[31m', False: '\x1b[39m'}, - 'magenta': {True: '\x1b[35m', False: '\x1b[39m'}, - 'green': {True: '\x1b[32m', False: '\x1b[39m'}, - 'underline': {True: '\x1b[4m', False: '\x1b[24m'}, - 'yellow': {True: '\x1b[33m', False: '\x1b[39m'}} + 'cyan': {True: Fore.CYAN, False: Fore.RESET}, + 'blue': {True: Fore.BLUE, False: Fore.RESET}, + 'red': {True: Fore.RED, False: Fore.RESET}, + 'magenta': {True: Fore.MAGENTA, False: Fore.RESET}, + 'green': {True: Fore.GREEN, False: Fore.RESET}, + 'underline': {True: '\x1b[4m', False: Fore.RESET}, + 'yellow': {True: Fore.YELLOW, False: Fore.RESET}} # Used load command to store the current script dir as a LIFO queue to support _relative_load command self._script_dir = [] @@ -441,6 +463,7 @@ class Cmd(cmd.Cmd): self.broken_pipe_warning = '' # Check if history should persist + self.persistent_history_file = '' if persistent_history_file and rl_type != RlType.NONE: persistent_history_file = os.path.expanduser(persistent_history_file) read_err = False @@ -526,6 +549,11 @@ class Cmd(cmd.Cmd): # This determines if a non-zero exit code should be used when exiting the application self.exit_code = None + # This lock should be acquired before doing any asynchronous changes to the terminal to + # ensure the updates to the terminal don't interfere with the input being typed. It can be + # acquired any time there is a readline prompt on screen. + self.terminal_lock = threading.RLock() + # ----- Methods related to presenting output to the user ----- @property @@ -547,34 +575,53 @@ class Cmd(cmd.Cmd): # Make sure settable parameters are sorted alphabetically by key self.settable = collections.OrderedDict(sorted(self.settable.items(), key=lambda t: t[0])) - def poutput(self, msg: str, end: str='\n') -> None: - """Convenient shortcut for self.stdout.write(); by default adds newline to end if not already present. + def decolorized_write(self, fileobj: IO, msg: str) -> None: + """Write a string to a fileobject, stripping ANSI escape sequences if necessary - Also handles BrokenPipeError exceptions for when a commands's output has been piped to another process and - that process terminates before the cmd2 command is finished executing. + Honor the current colors setting, which requires us to check whether the + fileobject is a tty. + """ + if self.colors.lower() == constants.COLORS_NEVER.lower() or \ + (self.colors.lower() == constants.COLORS_TERMINAL.lower() and not fileobj.isatty()): + msg = utils.strip_ansi(msg) + fileobj.write(msg) - :param msg: message to print to current stdout - anything convertible to a str with '{}'.format() is OK - :param end: string appended after the end of the message if not already present, default a newline + def poutput(self, msg: Any, end: str='\n', color: str='') -> None: + """Smarter self.stdout.write(); color aware and adds newline of not present. + + Also handles BrokenPipeError exceptions for when a commands's output has + been piped to another process and that process terminates before the + cmd2 command is finished executing. + + :param msg: message to print to current stdout (anything convertible to a str with '{}'.format() is OK) + :param end: (optional) string appended after the end of the message if not already present, default a newline + :param color: (optional) color escape to output this message with """ if msg is not None and msg != '': try: msg_str = '{}'.format(msg) - self.stdout.write(msg_str) if not msg_str.endswith(end): - self.stdout.write(end) + msg_str += end + if color: + msg_str = color + msg_str + Fore.RESET + self.decolorized_write(self.stdout, msg_str) except BrokenPipeError: - # This occurs if a command's output is being piped to another process and that process closes before the - # command is finished. If you would like your application to print a warning message, then set the - # broken_pipe_warning attribute to the message you want printed. + # This occurs if a command's output is being piped to another + # process and that process closes before the command is + # finished. If you would like your application to print a + # warning message, then set the broken_pipe_warning attribute + # to the message you want printed. if self.broken_pipe_warning: sys.stderr.write(self.broken_pipe_warning) - def perror(self, err: Union[str, Exception], traceback_war: bool=True) -> None: + def perror(self, err: Union[str, Exception], traceback_war: bool=True, err_color: str=Fore.LIGHTRED_EX, + war_color: str=Fore.LIGHTYELLOW_EX) -> None: """ Print error message to sys.stderr and if debug is true, print an exception Traceback if one exists. :param err: an Exception or error message to print out :param traceback_war: (optional) if True, print a message to let user know they can enable debug - :return: + :param err_color: (optional) color escape to output error with + :param war_color: (optional) color escape to output warning with """ if self.debug: import traceback @@ -582,14 +629,15 @@ class Cmd(cmd.Cmd): if isinstance(err, Exception): err_msg = "EXCEPTION of type '{}' occurred with message: '{}'\n".format(type(err).__name__, err) - sys.stderr.write(self.colorize(err_msg, 'red')) else: - err_msg = self.colorize("ERROR: {}\n".format(err), 'red') - sys.stderr.write(err_msg) + err_msg = "ERROR: {}\n".format(err) + err_msg = err_color + err_msg + Fore.RESET + self.decolorized_write(sys.stderr, err_msg) if traceback_war: war = "To enable full traceback, run the following command: 'set debug true'\n" - sys.stderr.write(self.colorize(war, 'yellow')) + war = war_color + war + Fore.RESET + self.decolorized_write(sys.stderr, war) def pfeedback(self, msg: str) -> None: """For printing nonessential feedback. Can be silenced with `quiet`. @@ -598,7 +646,7 @@ class Cmd(cmd.Cmd): if self.feedback_to_output: self.poutput(msg) else: - sys.stderr.write("{}\n".format(msg)) + self.decolorized_write(sys.stderr, "{}\n".format(msg)) def ppaged(self, msg: str, end: str='\n', chop: bool=False) -> None: """Print output using a pager if it would go off screen and stdout isn't currently being redirected. @@ -606,7 +654,7 @@ class Cmd(cmd.Cmd): Never uses a pager inside of a script (Python or text) or when output is being redirected or piped or when stdout or stdin are not a fully functional terminal. - :param msg: message to print to current stdout - anything convertible to a str with '{}'.format() is OK + :param msg: message to print to current stdout (anything convertible to a str with '{}'.format() is OK) :param end: string appended after the end of the message if not already present, default a newline :param chop: True -> causes lines longer than the screen width to be chopped (truncated) rather than wrapped - truncated text is still accessible by scrolling with the right & left arrow keys @@ -634,6 +682,9 @@ class Cmd(cmd.Cmd): # Don't attempt to use a pager that can block if redirecting or running a script (either text or Python) # Also only attempt to use a pager if actually running in a real fully functional terminal if functional_terminal and not self.redirecting and not self._in_py and not self._script_dir: + if self.colors.lower() == constants.COLORS_NEVER.lower(): + msg_str = utils.strip_ansi(msg_str) + pager = self.pager if chop: pager = self.pager_chop @@ -658,7 +709,7 @@ class Cmd(cmd.Cmd): except BrokenPipeError: # This occurs if a command's output is being piped to another process and that process closes before the # command is finished. If you would like your application to print a warning message, then set the - # broken_pipe_warning attribute to the message you want printed. + # broken_pipe_warning attribute to the message you want printed.` if self.broken_pipe_warning: sys.stderr.write(self.broken_pipe_warning) @@ -669,7 +720,7 @@ class Cmd(cmd.Cmd): is running on Windows, will return ``val`` unchanged. ``color`` should be one of the supported strings (or styles): red/blue/green/cyan/magenta, bold, underline""" - if self.colors and (self.stdout == self.initial_stdout): + if self.colors.lower() != constants.COLORS_NEVER.lower() and (self.stdout == self.initial_stdout): return self._colorcodes[color][True] + val + self._colorcodes[color][False] return val @@ -897,14 +948,13 @@ class Cmd(cmd.Cmd): :param line: the current input line with leading whitespace removed :param begidx: the beginning index of the prefix text :param endidx: the ending index of the prefix text - :param flag_dict: dict - dictionary whose structure is the following: - keys - flags (ex: -c, --create) that result in tab completion for the next - argument in the command line - values - there are two types of values - 1. iterable list of strings to match against (dictionaries, lists, etc.) - 2. function that performs tab completion (ex: path_complete) - :param all_else: Collection or function - an optional parameter for tab completing any token that isn't preceded - by a flag in flag_dict + :param flag_dict: dictionary whose structure is the following: + keys - flags (ex: -c, --create) that result in tab completion for the next + argument in the command line + values - there are two types of values + 1. iterable list of strings to match against (dictionaries, lists, etc.) + 2. function that performs tab completion (ex: path_complete) + :param all_else: an optional parameter for tab completing any token that isn't preceded by a flag in flag_dict :return: a list of possible tab completions """ # Get all tokens through the one being completed @@ -940,14 +990,13 @@ class Cmd(cmd.Cmd): :param line: the current input line with leading whitespace removed :param begidx: the beginning index of the prefix text :param endidx: the ending index of the prefix text - :param index_dict: dict - dictionary whose structure is the following: - keys - 0-based token indexes into command line that determine which tokens - perform tab completion - values - there are two types of values - 1. iterable list of strings to match against (dictionaries, lists, etc.) - 2. function that performs tab completion (ex: path_complete) - :param all_else: Collection or function - an optional parameter for tab completing any token that isn't at an - index in index_dict + :param index_dict: dictionary whose structure is the following: + keys - 0-based token indexes into command line that determine which tokens + perform tab completion + values - there are two types of values + 1. iterable list of strings to match against (dictionaries, lists, etc.) + 2. function that performs tab completion (ex: path_complete) + :param all_else: an optional parameter for tab completing any token that isn't at an index in index_dict :return: a list of possible tab completions """ # Get all tokens through the one being completed @@ -1093,8 +1142,8 @@ class Cmd(cmd.Cmd): self.allow_appended_space = False self.allow_closing_quote = False - # Sort the matches before any trailing slashes are added - matches = utils.alphabetical_sort(matches) + # Sort the matches alphabetically before any trailing slashes are added + matches.sort(key=utils.norm_fold) self.matches_sorted = True # Build display_matches and add a slash to directories @@ -1433,18 +1482,21 @@ class Cmd(cmd.Cmd): # Check if a valid command was entered if command in self.get_all_commands(): # Get the completer function for this command - try: - compfunc = getattr(self, 'complete_' + command) - except AttributeError: + compfunc = getattr(self, 'complete_' + command, None) + + if compfunc is None: # There's no completer function, next see if the command uses argparser - try: - cmd_func = getattr(self, 'do_' + command) - argparser = getattr(cmd_func, 'argparser') - # Command uses argparser, switch to the default argparse completer - compfunc = functools.partial(self._autocomplete_default, argparser=argparser) - except AttributeError: + func = self.cmd_func(command) + if func and hasattr(func, 'argparser'): + compfunc = functools.partial(self._autocomplete_default, + argparser=getattr(func, 'argparser')) + else: compfunc = self.completedefault + # Check if a macro was entered + elif command in self.macros: + compfunc = self.path_complete + # A valid command was not entered else: # Check if this command should be run as a shell command @@ -1512,11 +1564,9 @@ class Cmd(cmd.Cmd): [shortcut_to_restore + match for match in self.completion_matches] else: - # Complete token against aliases and command names - alias_names = set(self.aliases.keys()) - visible_commands = set(self.get_visible_commands()) - strs_to_match = list(alias_names | visible_commands) - self.completion_matches = self.basic_complete(text, line, begidx, endidx, strs_to_match) + # Complete token against anything a user can run + self.completion_matches = self.basic_complete(text, line, begidx, endidx, + self.get_commands_aliases_and_macros_for_completion()) # Handle single result if len(self.completion_matches) == 1: @@ -1534,8 +1584,8 @@ class Cmd(cmd.Cmd): # Sort matches alphabetically if they haven't already been sorted if not self.matches_sorted: - self.completion_matches = utils.alphabetical_sort(self.completion_matches) - self.display_matches = utils.alphabetical_sort(self.display_matches) + self.completion_matches.sort(key=utils.norm_fold) + self.display_matches.sort(key=utils.norm_fold) self.matches_sorted = True try: @@ -1556,8 +1606,8 @@ class Cmd(cmd.Cmd): def get_all_commands(self) -> List[str]: """Returns a list of all commands.""" - return [name[3:] for name in self.get_names() - if name.startswith('do_') and isinstance(getattr(self, name), Callable)] + return [name[len(COMMAND_PREFIX):] for name in self.get_names() + if name.startswith(COMMAND_PREFIX) and callable(getattr(self, name))] def get_visible_commands(self) -> List[str]: """Returns a list of commands that have not been hidden.""" @@ -1570,53 +1620,25 @@ class Cmd(cmd.Cmd): return commands - def get_help_topics(self) -> List[str]: - """ Returns a list of help topics """ - return [name[5:] for name in self.get_names() - if name.startswith('help_') and isinstance(getattr(self, name), Callable)] - - def complete_help(self, text: str, line: str, begidx: int, endidx: int) -> List[str]: - """ - Override of parent class method to handle tab completing subcommands and not showing hidden commands - Returns a list of possible tab completions - """ - - # The command is the token at index 1 in the command line - cmd_index = 1 - - # The subcommand is the token at index 2 in the command line - subcmd_index = 2 - - # Get all tokens through the one being completed - tokens, _ = self.tokens_for_completion(line, begidx, endidx) - if not tokens: - return [] - - matches = [] + def get_alias_names(self) -> List[str]: + """Return a list of alias names.""" + return list(self.aliases) - # Get the index of the token being completed - index = len(tokens) - 1 - - # Check if we are completing a command or help topic - if index == cmd_index: + def get_macro_names(self) -> List[str]: + """Return a list of macro names.""" + return list(self.macros) - # Complete token against topics and visible commands - topics = set(self.get_help_topics()) - visible_commands = set(self.get_visible_commands()) - strs_to_match = list(topics | visible_commands) - matches = self.basic_complete(text, line, begidx, endidx, strs_to_match) - - # check if the command uses argparser - elif index >= subcmd_index: - try: - cmd_func = getattr(self, 'do_' + tokens[cmd_index]) - parser = getattr(cmd_func, 'argparser') - completer = AutoCompleter(parser) - matches = completer.complete_command_help(tokens[1:], text, line, begidx, endidx) - except AttributeError: - pass + def get_commands_aliases_and_macros_for_completion(self) -> List[str]: + """Return a list of visible commands, aliases, and macros for tab completion""" + visible_commands = set(self.get_visible_commands()) + alias_names = set(self.get_alias_names()) + macro_names = set(self.get_macro_names()) + return list(visible_commands | alias_names | macro_names) - return matches + def get_help_topics(self) -> List[str]: + """ Returns a list of help topics """ + return [name[5:] for name in self.get_names() + if name.startswith('help_') and callable(getattr(self, name))] # noinspection PyUnusedLocal def sigint_handler(self, signum: int, frame) -> None: @@ -1637,12 +1659,6 @@ class Cmd(cmd.Cmd): # Re-raise a KeyboardInterrupt so other parts of the code can catch it raise KeyboardInterrupt("Got a keyboard interrupt") - def preloop(self) -> None: - """Hook method executed once when the cmdloop() method is called.""" - import signal - # Register a default SIGINT signal handler for Ctrl+C - signal.signal(signal.SIGINT, self.sigint_handler) - def precmd(self, statement: Statement) -> Statement: """Hook method executed just before the command is processed by ``onecmd()`` and after adding it to the history. @@ -1651,58 +1667,6 @@ class Cmd(cmd.Cmd): """ return statement - # ----- Methods which are cmd2-specific lifecycle hooks which are not present in cmd ----- - - # noinspection PyMethodMayBeStatic - def preparse(self, raw: str) -> str: - """Hook method executed before user input is parsed. - - WARNING: If it's a multiline command, `preparse()` may not get all the - user input. _complete_statement() really does two things: a) parse the - user input, and b) accept more input in case it's a multiline command - the passed string doesn't have a terminator. `preparse()` is currently - called before we know whether it's a multiline command, and before we - know whether the user input includes a termination character. - - If you want a reliable pre parsing hook method, register a postparsing - hook, modify the user input, and then reparse it. - - :param raw: raw command line input :return: potentially modified raw command line input - :return: a potentially modified version of the raw input string - """ - return raw - - # noinspection PyMethodMayBeStatic - def postparsing_precmd(self, statement: Statement) -> Tuple[bool, Statement]: - """This runs after parsing the command-line, but before anything else; even before adding cmd to history. - - NOTE: This runs before precmd() and prior to any potential output redirection or piping. - - If you wish to fatally fail this command and exit the application entirely, set stop = True. - - If you wish to just fail this command you can do so by raising an exception: - - - raise EmptyStatement - will silently fail and do nothing - - raise <AnyOtherException> - will fail and print an error message - - :param statement: - the parsed command-line statement as a Statement object - :return: (bool, statement) - (stop, statement) containing a potentially modified version of the statement object - """ - stop = False - return stop, statement - - # noinspection PyMethodMayBeStatic - def postparsing_postcmd(self, stop: bool) -> bool: - """This runs after everything else, including after postcmd(). - - It even runs when an empty line is entered. Thus, if you need to do something like update the prompt due - to notifications from a background thread, then this is the method you want to override to do it. - - :param stop: bool - True implies the entire application should exit. - :return: bool - True implies the entire application should exit. - """ - return stop - def parseline(self, line: str) -> Tuple[str, str, str]: """Parse the line into a command name and a string containing the arguments. @@ -1742,9 +1706,6 @@ class Cmd(cmd.Cmd): data = func(data) if data.stop: break - # postparsing_precmd is deprecated - if not data.stop: - (data.stop, data.statement) = self.postparsing_precmd(data.statement) # unpack the data object statement = data.statement stop = data.stop @@ -1809,9 +1770,7 @@ class Cmd(cmd.Cmd): data = func(data) # retrieve the final value of stop, ignoring any # modifications to the statement - stop = data.stop - # postparsing_postcmd is deprecated - return self.postparsing_postcmd(stop) + return data.stop except Exception as ex: self.perror(ex) @@ -1865,9 +1824,6 @@ class Cmd(cmd.Cmd): pipe runs out. We can't refactor it because we need to retain backwards compatibility with the standard library version of cmd. """ - # preparse() is deprecated, use self.register_postparsing_hook() instead - line = self.preparse(line) - while True: try: statement = self.statement_parser.parse(line) @@ -2014,49 +1970,91 @@ class Cmd(cmd.Cmd): self.redirecting = False - def _func_named(self, arg: str) -> str: - """Gets the method name associated with a given command. + def cmd_func(self, command: str) -> Optional[Callable]: + """ + Get the function for a command + :param command: the name of the command + """ + func_name = self.cmd_func_name(command) + if func_name: + return getattr(self, func_name) + + def cmd_func_name(self, command: str) -> str: + """Get the method name associated with a given command. - :param arg: command to look up method name which implements it + :param command: command to look up method name which implements it :return: method name which implements the given command """ - result = None - target = 'do_' + arg - if target in dir(self): - result = target - return result + target = COMMAND_PREFIX + command + return target if callable(getattr(self, target, None)) else '' - def onecmd(self, statement: Union[Statement, str]) -> Optional[bool]: + def onecmd(self, statement: Union[Statement, str]) -> bool: """ This executes the actual do_* method for a command. If the command provided doesn't exist, then it executes _default() instead. - :param statement: Command - intended to be a Statement instance parsed command from the input stream, - alternative acceptance of a str is present only for backward compatibility with cmd + :param statement: intended to be a Statement instance parsed command from the input stream, alternative + acceptance of a str is present only for backward compatibility with cmd :return: a flag indicating whether the interpretation of commands should stop """ # For backwards compatibility with cmd, allow a str to be passed in if not isinstance(statement, Statement): statement = self._complete_statement(statement) - funcname = self._func_named(statement.command) - if not funcname: - self.default(statement) - return + # Check if this is a macro + if statement.command in self.macros: + stop = self._run_macro(statement) + else: + func = self.cmd_func(statement.command) + if func: + stop = func(statement) - # Since we have a valid command store it in the history - if statement.command not in self.exclude_from_history: - self.history.append(statement.raw) + # Since we have a valid command store it in the history + if statement.command not in self.exclude_from_history: + self.history.append(statement.raw) - try: - func = getattr(self, funcname) - except AttributeError: - self.default(statement) - return + else: + self.default(statement) + stop = False - stop = func(statement) return stop + def _run_macro(self, statement: Statement) -> bool: + """ + Resolve a macro and run the resulting string + + :param statement: the parsed statement from the command line + :return: a flag indicating whether the interpretation of commands should stop + """ + if statement.command not in self.macros.keys(): + raise KeyError('{} is not a macro'.format(statement.command)) + + macro = self.macros[statement.command] + + # For macros, every argument must be provided and there can be no extra arguments. + if len(statement.arg_list) != macro.required_arg_count: + self.perror("The macro '{}' expects {} argument(s)".format(statement.command, macro.required_arg_count), + traceback_war=False) + return False + + # Resolve the arguments in reverse + resolved = macro.value + reverse_arg_list = sorted(macro.arg_list, key=lambda ma: ma.start_index, reverse=True) + + for arg in reverse_arg_list: + if arg.is_escaped: + to_replace = '{{' + arg.number_str + '}}' + replacement = '{' + arg.number_str + '}' + else: + to_replace = '{' + arg.number_str + '}' + replacement = statement.argv[int(arg.number_str)] + + parts = resolved.rsplit(to_replace, maxsplit=1) + resolved = parts[0] + replacement + parts[1] + + # Run the resolved command + return self.onecmd_plus_hooks(resolved) + def default(self, statement: Statement) -> None: """Executed when the command given isn't a recognized command implemented by a do_* method. @@ -2072,34 +2070,6 @@ class Cmd(cmd.Cmd): # Print out a message stating this is an unknown command self.poutput('*** Unknown syntax: {}\n'.format(arg)) - @staticmethod - def _surround_ansi_escapes(prompt: str, start: str="\x01", end: str="\x02") -> str: - """Overcome bug in GNU Readline in relation to calculation of prompt length in presence of ANSI escape codes. - - :param prompt: original prompt - :param start: start code to tell GNU Readline about beginning of invisible characters - :param end: end code to tell GNU Readline about end of invisible characters - :return: prompt safe to pass to GNU Readline - """ - # Windows terminals don't use ANSI escape codes and Windows readline isn't based on GNU Readline - if sys.platform == "win32": - return prompt - - escaped = False - result = "" - - for c in prompt: - if c == "\x1b" and not escaped: - result += start + c - escaped = True - elif c.isalpha() and escaped: - result += c + end - escaped = False - else: - result += c - - return result - def pseudo_raw_input(self, prompt: str) -> str: """Began life as a copy of cmd's cmdloop; like raw_input but @@ -2108,23 +2078,37 @@ class Cmd(cmd.Cmd): to decide whether to print the prompt and the input """ - # Deal with the vagaries of readline and ANSI escape codes - safe_prompt = self._surround_ansi_escapes(prompt) + # Temporarily save over self.prompt to reflect what will be on screen + orig_prompt = self.prompt + self.prompt = prompt if self.use_rawinput: try: if sys.stdin.isatty(): + # Wrap in try since terminal_lock may not be locked when this function is called from unit tests + try: + # A prompt is about to be drawn. Allow asynchronous changes to the terminal. + self.terminal_lock.release() + except RuntimeError: + pass + + # Deal with the vagaries of readline and ANSI escape codes + safe_prompt = rl_make_safe_prompt(prompt) line = input(safe_prompt) else: line = input() if self.echo: - sys.stdout.write('{}{}\n'.format(safe_prompt, line)) + sys.stdout.write('{}{}\n'.format(self.prompt, line)) except EOFError: line = 'eof' + finally: + if sys.stdin.isatty(): + # The prompt is gone. Do not allow asynchronous changes to the terminal. + self.terminal_lock.acquire() else: if self.stdin.isatty(): # on a tty, print the prompt first, then read the line - self.poutput(safe_prompt, end='') + self.poutput(self.prompt, end='') self.stdout.flush() line = self.stdin.readline() if len(line) == 0: @@ -2137,9 +2121,13 @@ class Cmd(cmd.Cmd): if len(line): # we read something, output the prompt and the something if self.echo: - self.poutput('{}{}'.format(safe_prompt, line)) + self.poutput('{}{}'.format(self.prompt, line)) else: line = 'eof' + + # Restore prompt + self.prompt = orig_prompt + return line.strip() def _cmdloop(self) -> bool: @@ -2219,147 +2207,423 @@ class Cmd(cmd.Cmd): return stop - def do_alias(self, statement: Statement) -> None: - """Define or display aliases + # ----- Alias subcommand functions ----- -Usage: Usage: alias [name] | [<name> <value>] - Where: - name - name of the alias being looked up, added, or replaced - value - what the alias will be resolved to (if adding or replacing) - this can contain spaces and does not need to be quoted + def alias_create(self, args: argparse.Namespace): + """ Creates or overwrites an alias """ - Without arguments, 'alias' prints a list of all aliases in a reusable form which - can be outputted to a startup_script to preserve aliases across sessions. + # Validate the alias name + args.name = utils.strip_quotes(args.name) + valid, errmsg = self.statement_parser.is_valid_command(args.name) + if not valid: + errmsg = "Invalid alias name: {}".format(errmsg) + self.perror(errmsg, traceback_war=False) + return - With one argument, 'alias' shows the value of the specified alias. - Example: alias ls (Prints the value of the alias called 'ls' if it exists) + if args.name in self.macros: + errmsg = "Alias cannot have the same name as a macro" + self.perror(errmsg, traceback_war=False) + return - With two or more arguments, 'alias' creates or replaces an alias. + utils.unquote_redirection_tokens(args.command_args) - Example: alias ls !ls -lF + # Build the alias value string + value = args.command + if args.command_args: + value += ' ' + ' '.join(args.command_args) - If you want to use redirection or pipes in the alias, then quote them to prevent - the alias command itself from being redirected + # Set the alias + result = "overwritten" if args.name in self.aliases else "created" + self.aliases[args.name] = value + self.poutput("Alias '{}' {}".format(args.name, result)) - Examples: - alias save_results print_results ">" out.txt - alias save_results print_results '>' out.txt -""" - # Get alias arguments as a list with quotes preserved - alias_arg_list = statement.arg_list + def alias_delete(self, args: argparse.Namespace): + """ Deletes aliases """ + if args.all: + self.aliases.clear() + self.poutput("All aliases deleted") + elif not args.name: + self.do_help('alias delete') + else: + # Get rid of duplicates and strip quotes since the argparse decorator for do_alias() preserves them + aliases_to_delete = [utils.strip_quotes(cur_name) for cur_name in utils.remove_duplicates(args.name)] + + for cur_name in aliases_to_delete: + if cur_name in self.aliases: + del self.aliases[cur_name] + self.poutput("Alias '{}' deleted".format(cur_name)) + else: + self.perror("Alias '{}' does not exist".format(cur_name), traceback_war=False) + + def alias_list(self, args: argparse.Namespace): + """ Lists some or all aliases """ + if args.name: + # Get rid of duplicates and strip quotes since the argparse decorator for do_alias() preserves them + names_to_view = [utils.strip_quotes(cur_name) for cur_name in utils.remove_duplicates(args.name)] + + for cur_name in names_to_view: + if cur_name in self.aliases: + self.poutput("alias create {} {}".format(cur_name, self.aliases[cur_name])) + else: + self.perror("Alias '{}' not found".format(cur_name), traceback_war=False) + else: + sorted_aliases = utils.alphabetical_sort(self.aliases) + for cur_alias in sorted_aliases: + self.poutput("alias create {} {}".format(cur_alias, self.aliases[cur_alias])) + + # Top-level parser for alias + alias_description = ("Manage aliases\n" + "\n" + "An alias is a command that enables replacement of a word by another string.") + alias_epilog = ("See also:\n" + " macro") + alias_parser = ACArgumentParser(description=alias_description, epilog=alias_epilog, prog='alias') + + # Add subcommands to alias + alias_subparsers = alias_parser.add_subparsers() + + # alias -> create + alias_create_help = "create or overwrite an alias" + alias_create_description = "Create or overwrite an alias" + + alias_create_epilog = ("Notes:\n" + " If you want to use redirection or pipes in the alias, then quote them to\n" + " prevent the 'alias create' command from being redirected.\n" + "\n" + " Since aliases are resolved during parsing, tab completion will function as it\n" + " would for the actual command the alias resolves to.\n" + "\n" + "Examples:\n" + " alias ls !ls -lF\n" + " alias create show_log !cat \"log file.txt\"\n" + " alias create save_results print_results \">\" out.txt\n") + + alias_create_parser = alias_subparsers.add_parser('create', help=alias_create_help, + description=alias_create_description, + epilog=alias_create_epilog) + alias_create_parser.add_argument('name', help='name of this alias') + setattr(alias_create_parser.add_argument('command', help='what the alias resolves to'), + ACTION_ARG_CHOICES, get_commands_aliases_and_macros_for_completion) + setattr(alias_create_parser.add_argument('command_args', nargs=argparse.REMAINDER, + help='arguments to pass to command'), + ACTION_ARG_CHOICES, ('path_complete',)) + alias_create_parser.set_defaults(func=alias_create) + + # alias -> delete + alias_delete_help = "delete aliases" + alias_delete_description = "Delete specified aliases or all aliases if --all is used" + alias_delete_parser = alias_subparsers.add_parser('delete', help=alias_delete_help, + description=alias_delete_description) + setattr(alias_delete_parser.add_argument('name', nargs='*', help='alias to delete'), + ACTION_ARG_CHOICES, get_alias_names) + alias_delete_parser.add_argument('-a', '--all', action='store_true', help="delete all aliases") + alias_delete_parser.set_defaults(func=alias_delete) + + # alias -> list + alias_list_help = "list aliases" + alias_list_description = ("List specified aliases in a reusable form that can be saved to a startup script\n" + "to preserve aliases across sessions\n" + "\n" + "Without arguments, all aliases will be listed.") + + alias_list_parser = alias_subparsers.add_parser('list', help=alias_list_help, + description=alias_list_description) + setattr(alias_list_parser.add_argument('name', nargs="*", help='alias to list'), + ACTION_ARG_CHOICES, get_alias_names) + alias_list_parser.set_defaults(func=alias_list) + + # Preserve quotes since we are passing strings to other commands + @with_argparser(alias_parser, preserve_quotes=True) + def do_alias(self, args: argparse.Namespace): + """Manage aliases""" + func = getattr(args, 'func', None) + if func is not None: + # Call whatever subcommand function was selected + func(self, args) + else: + # No subcommand was provided, so call help + self.do_help('alias') + + # ----- Macro subcommand functions ----- + + def macro_create(self, args: argparse.Namespace): + """ Creates or overwrites a macro """ - # If no args were given, then print a list of current aliases - if not alias_arg_list: - for cur_alias in self.aliases: - self.poutput("alias {} {}".format(cur_alias, self.aliases[cur_alias])) + # Validate the macro name + args.name = utils.strip_quotes(args.name) + valid, errmsg = self.statement_parser.is_valid_command(args.name) + if not valid: + errmsg = "Invalid macro name: {}".format(errmsg) + self.perror(errmsg, traceback_war=False) return - # Get the alias name - name = alias_arg_list[0] + if args.name in self.get_all_commands(): + errmsg = "Macro cannot have the same name as a command" + self.perror(errmsg, traceback_war=False) + return - # The user is looking up an alias - if len(alias_arg_list) == 1: - if name in self.aliases: - self.poutput("alias {} {}".format(name, self.aliases[name])) - else: - self.perror("Alias {!r} not found".format(name), traceback_war=False) + if args.name in self.aliases: + errmsg = "Macro cannot have the same name as an alias" + self.perror(errmsg, traceback_war=False) + return - # The user is creating an alias - else: - # Unquote redirection and pipes - index = 1 - while index < len(alias_arg_list): - unquoted_arg = utils.strip_quotes(alias_arg_list[index]) - if unquoted_arg in constants.REDIRECTION_TOKENS: - alias_arg_list[index] = unquoted_arg - index += 1 - - # Build the alias value string - value = ' '.join(alias_arg_list[1:]) - - # Validate the alias to ensure it doesn't include weird characters - # like terminators, output redirection, or whitespace - valid, invalidchars = self.statement_parser.is_valid_command(name) - if valid: - # Set the alias - self.aliases[name] = value - self.poutput("Alias {!r} created".format(name)) - else: - errmsg = "Aliases can not contain: {}".format(invalidchars) - self.perror(errmsg, traceback_war=False) + utils.unquote_redirection_tokens(args.command_args) - def complete_alias(self, text: str, line: str, begidx: int, endidx: int) -> List[str]: - """ Tab completion for alias """ - alias_names = set(self.aliases.keys()) - visible_commands = set(self.get_visible_commands()) + # Build the macro value string + value = args.command + if args.command_args: + value += ' ' + ' '.join(args.command_args) - index_dict = \ - { - 1: alias_names, - 2: list(alias_names | visible_commands) - } - return self.index_based_complete(text, line, begidx, endidx, index_dict, self.path_complete) + # Find all normal arguments + arg_list = [] + normal_matches = re.finditer(MacroArg.macro_normal_arg_pattern, value) + max_arg_num = 0 + arg_nums = set() - @with_argument_list - def do_unalias(self, arglist: List[str]) -> None: - """Unsets aliases + while True: + try: + cur_match = normal_matches.__next__() -Usage: Usage: unalias [-a] name [name ...] - Where: - name - name of the alias being unset + # Get the number string between the braces + cur_num_str = (re.findall(MacroArg.digit_pattern, cur_match.group())[0]) + cur_num = int(cur_num_str) + if cur_num < 1: + self.perror("Argument numbers must be greater than 0", traceback_war=False) + return - Options: - -a remove all alias definitions -""" - if not arglist: - self.do_help(['unalias']) + arg_nums.add(cur_num) + if cur_num > max_arg_num: + max_arg_num = cur_num - if '-a' in arglist: - self.aliases.clear() - self.poutput("All aliases cleared") + arg_list.append(MacroArg(start_index=cur_match.start(), number_str=cur_num_str, is_escaped=False)) + + except StopIteration: + break + + # Make sure the argument numbers are continuous + if len(arg_nums) != max_arg_num: + self.perror("Not all numbers between 1 and {} are present " + "in the argument placeholders".format(max_arg_num), traceback_war=False) + return + + # Find all escaped arguments + escaped_matches = re.finditer(MacroArg.macro_escaped_arg_pattern, value) + + while True: + try: + cur_match = escaped_matches.__next__() + # Get the number string between the braces + cur_num_str = re.findall(MacroArg.digit_pattern, cur_match.group())[0] + + arg_list.append(MacroArg(start_index=cur_match.start(), number_str=cur_num_str, is_escaped=True)) + except StopIteration: + break + + # Set the macro + result = "overwritten" if args.name in self.macros else "created" + self.macros[args.name] = Macro(name=args.name, value=value, required_arg_count=max_arg_num, arg_list=arg_list) + self.poutput("Macro '{}' {}".format(args.name, result)) + + def macro_delete(self, args: argparse.Namespace): + """ Deletes macros """ + if args.all: + self.macros.clear() + self.poutput("All macros deleted") + elif not args.name: + self.do_help('macro delete') else: - # Get rid of duplicates - arglist = utils.remove_duplicates(arglist) + # Get rid of duplicates and strip quotes since the argparse decorator for do_macro() preserves them + macros_to_delete = [utils.strip_quotes(cur_name) for cur_name in utils.remove_duplicates(args.name)] + + for cur_name in macros_to_delete: + if cur_name in self.macros: + del self.macros[cur_name] + self.poutput("Macro '{}' deleted".format(cur_name)) + else: + self.perror("Macro '{}' does not exist".format(cur_name), traceback_war=False) + + def macro_list(self, args: argparse.Namespace): + """ Lists some or all macros """ + if args.name: + # Get rid of duplicates and strip quotes since the argparse decorator for do_macro() preserves them + names_to_view = [utils.strip_quotes(cur_name) for cur_name in utils.remove_duplicates(args.name)] - for cur_arg in arglist: - if cur_arg in self.aliases: - del self.aliases[cur_arg] - self.poutput("Alias {!r} cleared".format(cur_arg)) + for cur_name in names_to_view: + if cur_name in self.macros: + self.poutput("macro create {} {}".format(cur_name, self.macros[cur_name].value)) else: - self.perror("Alias {!r} does not exist".format(cur_arg), traceback_war=False) - - def complete_unalias(self, text: str, line: str, begidx: int, endidx: int) -> List[str]: - """ Tab completion for unalias """ - return self.basic_complete(text, line, begidx, endidx, self.aliases) - - @with_argument_list - def do_help(self, arglist: List[str]) -> None: - """ List available commands with "help" or detailed help with "help cmd" """ - if not arglist or (len(arglist) == 1 and arglist[0] in ('--verbose', '-v')): - verbose = len(arglist) == 1 and arglist[0] in ('--verbose', '-v') - self._help_menu(verbose) + self.perror("Macro '{}' not found".format(cur_name), traceback_war=False) + else: + sorted_macros = utils.alphabetical_sort(self.macros) + for cur_macro in sorted_macros: + self.poutput("macro create {} {}".format(cur_macro, self.macros[cur_macro].value)) + + # Top-level parser for macro + macro_description = ("Manage macros\n" + "\n" + "A macro is similar to an alias, but it can take arguments when called.") + macro_epilog = ("See also:\n" + " alias") + macro_parser = ACArgumentParser(description=macro_description, epilog=macro_epilog, prog='macro') + + # Add subcommands to macro + macro_subparsers = macro_parser.add_subparsers() + + # macro -> create + macro_create_help = "create or overwrite a macro" + macro_create_description = "Create or overwrite a macro" + + macro_create_epilog = ("A macro is similar to an alias, but it can take arguments when called.\n" + "Arguments are expressed when creating a macro using {#} notation where {1}\n" + "means the first argument.\n" + "\n" + "The following creates a macro called my_macro that expects two arguments:\n" + "\n" + " macro create my_macro make_dinner -meat {1} -veggie {2}\n" + "\n" + "When the macro is called, the provided arguments are resolved and the assembled\n" + "command is run. For example:\n" + "\n" + " my_macro beef broccoli ---> make_dinner -meat beef -veggie broccoli\n" + "\n" + "Notes:\n" + " To use the literal string {1} in your command, escape it this way: {{1}}.\n" + "\n" + " An argument number can be repeated in a macro. In the following example the\n" + " first argument will populate both {1} instances.\n" + "\n" + " macro create ft file_taxes -p {1} -q {2} -r {1}\n" + "\n" + " To quote an argument in the resolved command, quote it during creation.\n" + "\n" + " macro create backup !cp \"{1}\" \"{1}.orig\"\n" + "\n" + " Be careful! Since macros can resolve into commands, aliases, and macros,\n" + " it is possible to create a macro that results in infinite recursion.\n" + "\n" + " If you want to use redirection or pipes in the macro, then quote them as in\n" + " this example to prevent the 'macro create' command from being redirected.\n" + "\n" + " macro create show_results print_results -type {1} \"|\" less\n" + "\n" + " Because macros do not resolve until after parsing (hitting Enter), tab\n" + " completion will only complete paths.") + + macro_create_parser = macro_subparsers.add_parser('create', help=macro_create_help, + description=macro_create_description, + epilog=macro_create_epilog) + macro_create_parser.add_argument('name', help='name of this macro') + setattr(macro_create_parser.add_argument('command', help='what the macro resolves to'), + ACTION_ARG_CHOICES, get_commands_aliases_and_macros_for_completion) + setattr(macro_create_parser.add_argument('command_args', nargs=argparse.REMAINDER, + help='arguments to pass to command'), + ACTION_ARG_CHOICES, ('path_complete',)) + macro_create_parser.set_defaults(func=macro_create) + + # macro -> delete + macro_delete_help = "delete macros" + macro_delete_description = "Delete specified macros or all macros if --all is used" + macro_delete_parser = macro_subparsers.add_parser('delete', help=macro_delete_help, + description=macro_delete_description) + setattr(macro_delete_parser.add_argument('name', nargs='*', help='macro to delete'), + ACTION_ARG_CHOICES, get_macro_names) + macro_delete_parser.add_argument('-a', '--all', action='store_true', help="delete all macros") + macro_delete_parser.set_defaults(func=macro_delete) + + # macro -> list + macro_list_help = "list macros" + macro_list_description = ("List specified macros in a reusable form that can be saved to a startup script\n" + "to preserve macros across sessions\n" + "\n" + "Without arguments, all macros will be listed.") + + macro_list_parser = macro_subparsers.add_parser('list', help=macro_list_help, description=macro_list_description) + setattr(macro_list_parser.add_argument('name', nargs="*", help='macro to list'), + ACTION_ARG_CHOICES, get_macro_names) + macro_list_parser.set_defaults(func=macro_list) + + # Preserve quotes since we are passing strings to other commands + @with_argparser(macro_parser, preserve_quotes=True) + def do_macro(self, args: argparse.Namespace): + """Manage macros""" + func = getattr(args, 'func', None) + if func is not None: + # Call whatever subcommand function was selected + func(self, args) + else: + # No subcommand was provided, so call help + self.do_help('macro') + + def complete_help_command(self, text: str, line: str, begidx: int, endidx: int) -> List[str]: + """Completes the command argument of help""" + + # Complete token against topics and visible commands + topics = set(self.get_help_topics()) + visible_commands = set(self.get_visible_commands()) + strs_to_match = list(topics | visible_commands) + return self.basic_complete(text, line, begidx, endidx, strs_to_match) + + def complete_help_subcommand(self, text: str, line: str, begidx: int, endidx: int) -> List[str]: + """Completes the subcommand argument of help""" + + # Get all tokens through the one being completed + tokens, _ = self.tokens_for_completion(line, begidx, endidx) + + if not tokens: + return [] + + # Must have at least 3 args for 'help command subcommand' + if len(tokens) < 3: + return [] + + # Find where the command is by skipping past any flags + cmd_index = 1 + for cur_token in tokens[cmd_index:]: + if not cur_token.startswith('-'): + break + cmd_index += 1 + + if cmd_index >= len(tokens): + return [] + + command = tokens[cmd_index] + matches = [] + + # Check if this is a command with an argparse function + func = self.cmd_func(command) + if func and hasattr(func, 'argparser'): + completer = AutoCompleter(getattr(func, 'argparser'), cmd2_app=self) + matches = completer.complete_command_help(tokens[cmd_index:], text, line, begidx, endidx) + + return matches + + help_parser = ACArgumentParser() + + setattr(help_parser.add_argument('command', help="command to retrieve help for", nargs="?"), + ACTION_ARG_CHOICES, ('complete_help_command',)) + setattr(help_parser.add_argument('subcommand', help="subcommand to retrieve help for", + nargs=argparse.REMAINDER), + ACTION_ARG_CHOICES, ('complete_help_subcommand',)) + help_parser.add_argument('-v', '--verbose', action='store_true', + help="print a list of all commands with descriptions of each") + + @with_argparser(help_parser) + def do_help(self, args: argparse.Namespace) -> None: + """List available commands or provide detailed help for a specific command""" + if not args.command or args.verbose: + self._help_menu(args.verbose) + else: # Getting help for a specific command - funcname = self._func_named(arglist[0]) - if funcname: - # Check to see if this function was decorated with an argparse ArgumentParser - func = getattr(self, funcname) - if hasattr(func, 'argparser'): - # Function has an argparser, so get help based on all the arguments in case there are sub-commands - new_arglist = arglist[1:] - new_arglist.append('-h') - - # Temporarily redirect all argparse output to both sys.stdout and sys.stderr to self.stdout - with redirect_stdout(self.stdout): - with redirect_stderr(self.stdout): - func(new_arglist) - else: - # No special behavior needed, delegate to cmd base class do_help() - cmd.Cmd.do_help(self, funcname[3:]) + func = self.cmd_func(args.command) + if func and hasattr(func, 'argparser'): + completer = AutoCompleter(getattr(func, 'argparser'), cmd2_app=self) + tokens = [args.command] + args.subcommand + self.poutput(completer.format_help(tokens)) else: - # This could be a help topic - cmd.Cmd.do_help(self, arglist[0]) + # No special behavior needed, delegate to cmd base class do_help() + super().do_help(args.command) def _help_menu(self, verbose: bool=False) -> None: """Show a list of commands which help can be displayed for. @@ -2375,11 +2639,12 @@ Usage: Usage: unalias [-a] name [name ...] cmds_cats = {} for command in visible_commands: - if command in help_topics or getattr(self, self._func_named(command)).__doc__: + func = self.cmd_func(command) + if command in help_topics or func.__doc__: if command in help_topics: help_topics.remove(command) - if hasattr(getattr(self, self._func_named(command)), HELP_CATEGORY): - category = getattr(getattr(self, self._func_named(command)), HELP_CATEGORY) + if hasattr(func, HELP_CATEGORY): + category = getattr(func, HELP_CATEGORY) cmds_cats.setdefault(category, []) cmds_cats[category].append(command) else: @@ -2432,12 +2697,13 @@ Usage: Usage: unalias [-a] name [name ...] func = getattr(self, 'help_' + command) except AttributeError: # Couldn't find a help function + func = self.cmd_func(command) try: # Now see if help_summary has been set - doc = getattr(self, self._func_named(command)).help_summary + doc = func.help_summary except AttributeError: # Last, try to directly access the function's doc-string - doc = getattr(self, self._func_named(command)).__doc__ + doc = func.__doc__ else: # we found the help function result = io.StringIO() @@ -2458,13 +2724,17 @@ Usage: Usage: unalias [-a] name [name ...] doc_block = [] found_first = False for doc_line in doc.splitlines(): - str(doc_line).strip() - if len(doc_line.strip()) > 0: - doc_block.append(doc_line.strip()) - found_first = True - else: + stripped_line = doc_line.strip() + + # Don't include :param type lines + if stripped_line.startswith(':'): if found_first: break + elif stripped_line: + doc_block.append(stripped_line) + found_first = True + elif found_first: + break for doc_line in doc_block: self.stdout.write('{: <{col_width}}{doc}\n'.format(command, @@ -2473,18 +2743,21 @@ Usage: Usage: unalias [-a] name [name ...] command = '' self.stdout.write("\n") - def do_shortcuts(self, _: str) -> None: - """Lists shortcuts available""" + @with_argparser(ACArgumentParser()) + def do_shortcuts(self, _: argparse.Namespace) -> None: + """List available shortcuts""" result = "\n".join('%s: %s' % (sc[0], sc[1]) for sc in sorted(self.shortcuts)) self.poutput("Shortcuts for other commands:\n{}\n".format(result)) - def do_eof(self, _: str) -> bool: + @with_argparser(ACArgumentParser(epilog=INTERNAL_COMMAND_EPILOG)) + def do_eof(self, _: argparse.Namespace) -> bool: """Called when <Ctrl>-D is pressed""" # End of script should not exit app, but <Ctrl>-D should. return self._STOP_AND_EXIT - def do_quit(self, _: str) -> bool: - """Exits this application""" + @with_argparser(ACArgumentParser()) + def do_quit(self, _: argparse.Namespace) -> bool: + """Exit this application""" self._should_quit = True return self._STOP_AND_EXIT @@ -2514,7 +2787,8 @@ Usage: Usage: unalias [-a] name [name ...] for (idx, (_, text)) in enumerate(fulloptions): self.poutput(' %2d. %s\n' % (idx + 1, text)) while True: - response = input(prompt) + safe_prompt = rl_make_safe_prompt(prompt) + response = input(safe_prompt) if rl_type != RlType.NONE: hlen = readline.get_current_history_length() @@ -2541,22 +2815,21 @@ Usage: Usage: unalias [-a] name [name ...] Output redirection and pipes allowed: {}""" return read_only_settings.format(str(self.terminators), self.allow_cli_args, self.allow_redirection) - def show(self, args: argparse.Namespace, parameter: str) -> None: + def show(self, args: argparse.Namespace, parameter: str='') -> None: """Shows current settings of parameters. :param args: argparse parsed arguments from the set command - :param parameter: - :return: + :param parameter: optional search parameter """ - param = '' - if parameter: - param = parameter.strip().lower() + param = parameter.strip().lower() result = {} maxlen = 0 + for p in self.settable: if (not param) or p.startswith(param): - result[p] = '%s: %s' % (p, str(getattr(self, p))) + result[p] = '{}: {}'.format(p, str(getattr(self, p))) maxlen = max(maxlen, len(result[p])) + if result: for p in sorted(result): if args.long: @@ -2568,58 +2841,69 @@ Usage: Usage: unalias [-a] name [name ...] if args.all: self.poutput('\nRead only settings:{}'.format(self.cmdenvironment())) else: - raise LookupError("Parameter '%s' not supported (type 'set' for list of parameters)." % param) + raise LookupError("Parameter '{}' not supported (type 'set' for list of parameters).".format(param)) - set_description = "Sets a settable parameter or shows current settings of parameters.\n" - set_description += "\n" - set_description += "Accepts abbreviated parameter names so long as there is no ambiguity.\n" - set_description += "Call without arguments for a list of settable parameters with their values." + set_description = ("Set a settable parameter or show current settings of parameters\n" + "\n" + "Accepts abbreviated parameter names so long as there is no ambiguity.\n" + "Call without arguments for a list of settable parameters with their values.") set_parser = ACArgumentParser(description=set_description) set_parser.add_argument('-a', '--all', action='store_true', help='display read-only settings as well') set_parser.add_argument('-l', '--long', action='store_true', help='describe function of parameter') - set_parser.add_argument('settable', nargs=(0, 2), help='[param_name] [value]') + setattr(set_parser.add_argument('param', nargs='?', help='parameter to set or view'), + ACTION_ARG_CHOICES, settable) + set_parser.add_argument('value', nargs='?', help='the new value for settable') @with_argparser(set_parser) def do_set(self, args: argparse.Namespace) -> None: - """Sets a settable parameter or shows current settings of parameters""" - try: - param_name, val = args.settable - val = val.strip() - param_name = param_name.strip().lower() - if param_name not in self.settable: - hits = [p for p in self.settable if p.startswith(param_name)] - if len(hits) == 1: - param_name = hits[0] - else: - return self.show(args, param_name) - current_val = getattr(self, param_name) - if (val[0] == val[-1]) and val[0] in ("'", '"'): - val = val[1:-1] + """Set a settable parameter or show current settings of parameters""" + + # Check if param was passed in + if not args.param: + return self.show(args) + param = args.param.strip().lower() + + # Check if value was passed in + if not args.value: + return self.show(args, param) + value = args.value + + # Check if param points to just one settable + if param not in self.settable: + hits = [p for p in self.settable if p.startswith(param)] + if len(hits) == 1: + param = hits[0] else: - val = utils.cast(current_val, val) - setattr(self, param_name, val) - self.poutput('%s - was: %s\nnow: %s\n' % (param_name, current_val, val)) - if current_val != val: - try: - onchange_hook = getattr(self, '_onchange_%s' % param_name) - onchange_hook(old=current_val, new=val) - except AttributeError: - pass - except (ValueError, AttributeError): - param = '' - if args.settable: - param = args.settable[0] - self.show(args, param) - - def do_shell(self, statement: Statement) -> None: - """Execute a command as if at the OS prompt - - Usage: shell <command> [arguments]""" + return self.show(args, param) + + # Update the settable's value + current_value = getattr(self, param) + value = utils.cast(current_value, value) + setattr(self, param, value) + + self.poutput('{} - was: {}\nnow: {}\n'.format(param, current_value, value)) + + # See if we need to call a change hook for this settable + if current_value != value: + onchange_hook = getattr(self, '_onchange_{}'.format(param), None) + if onchange_hook is not None: + onchange_hook(old=current_value, new=value) + + shell_parser = ACArgumentParser() + setattr(shell_parser.add_argument('command', help='the command to run'), + ACTION_ARG_CHOICES, ('shell_cmd_complete',)) + setattr(shell_parser.add_argument('command_args', nargs=argparse.REMAINDER, + help='arguments to pass to command'), + ACTION_ARG_CHOICES, ('path_complete',)) + + @with_argparser(shell_parser, preserve_quotes=True) + def do_shell(self, args: argparse.Namespace) -> None: + """Execute a command as if at the OS prompt""" import subprocess - # Get list of arguments to shell with quotes preserved - tokens = statement.arg_list + # Create a list of arguments to shell + tokens = [args.command] + args.command_args # Support expanding ~ in quoted paths for index, _ in enumerate(tokens): @@ -2640,18 +2924,6 @@ Usage: Usage: unalias [-a] name [name ...] proc = subprocess.Popen(expanded_command, stdout=self.stdout, shell=True) proc.communicate() - def complete_shell(self, text: str, line: str, begidx: int, endidx: int) -> List[str]: - """Handles tab completion of executable commands and local file system paths for the shell command - - :param text: the string prefix we are attempting to match (all returned matches must begin with it) - :param line: the current input line with leading whitespace removed - :param begidx: the beginning index of the prefix text - :param endidx: the ending index of the prefix text - :return: a list of possible tab completions - """ - index_dict = {1: self.shell_cmd_complete} - return self.index_based_complete(text, line, begidx, endidx, index_dict, self.path_complete) - @staticmethod def _reset_py_display() -> None: """ @@ -2676,37 +2948,35 @@ Usage: Usage: unalias [-a] name [name ...] sys.displayhook = sys.__displayhook__ sys.excepthook = sys.__excepthook__ - def do_py(self, arg: str) -> bool: - """ - Invoke python command, shell, or script + py_parser = ACArgumentParser() + py_parser.add_argument('command', help="command to run", nargs='?') + py_parser.add_argument('remainder', help="remainder of command", nargs=argparse.REMAINDER) - py <command>: Executes a Python command. - py: Enters interactive Python mode. - End with ``Ctrl-D`` (Unix) / ``Ctrl-Z`` (Windows), ``quit()``, '`exit()``. - Non-python commands can be issued with ``pyscript_name("your command")``. - Run python code from external script files with ``run("script.py")`` - """ - from .pyscript_bridge import PyscriptBridge + @with_argparser(py_parser) + def do_py(self, args: argparse.Namespace) -> bool: + """Invoke Python command or shell""" + from .pyscript_bridge import PyscriptBridge, CommandResult if self._in_py: - self.perror("Recursively entering interactive Python consoles is not allowed.", traceback_war=False) + err = "Recursively entering interactive Python consoles is not allowed." + self.perror(err, traceback_war=False) + self._last_result = CommandResult('', err) return False self._in_py = True # noinspection PyBroadException try: - arg = arg.strip() - # Support the run command even if called prior to invoking an interactive interpreter - def run(filename): + def run(filename: str): """Run a Python script file in the interactive console. - :param filename: str - filename of *.py script file to run + :param filename: filename of *.py script file to run """ + expanded_filename = os.path.expanduser(filename) try: - with open(filename) as f: + with open(expanded_filename) as f: interp.runcode(f.read()) except OSError as ex: - error_msg = "Error opening script file '{}': {}".format(filename, ex) + error_msg = "Error opening script file '{}': {}".format(expanded_filename, ex) self.perror(error_msg, traceback_war=False) bridge = PyscriptBridge(self) @@ -2721,8 +2991,12 @@ Usage: Usage: unalias [-a] name [name ...] interp = InteractiveConsole(locals=localvars) interp.runcode('import sys, os;sys.path.insert(0, os.getcwd())') - if arg: - interp.runcode(arg) + if args.command: + full_command = utils.quote_string_if_needed(args.command) + for cur_token in args.remainder: + full_command += ' ' + utils.quote_string_if_needed(cur_token) + + interp.runcode(full_command) # If there are no args, then we will open an interactive Python console else: @@ -2787,11 +3061,14 @@ Usage: Usage: unalias [-a] name [name ...] sys.stdin = self.stdin cprt = 'Type "help", "copyright", "credits" or "license" for more information.' - docstr = self.do_py.__doc__.replace('pyscript_name', self.pyscript_name) + instructions = ('End with `Ctrl-D` (Unix) / `Ctrl-Z` (Windows), `quit()`, `exit()`.\n' + 'Non-Python commands can be issued with: {}("your command")\n' + 'Run Python code from external script files with: run("script.py")' + .format(self.pyscript_name)) try: - interp.interact(banner="Python {} on {}\n{}\n({})\n{}". - format(sys.version, sys.platform, cprt, self.__class__.__name__, docstr)) + interp.interact(banner="Python {} on {}\n{}\n\n{}\n". + format(sys.version, sys.platform, cprt, instructions)) except EmbeddedConsoleExit: pass @@ -2832,30 +3109,22 @@ Usage: Usage: unalias [-a] name [name ...] self._in_py = False return self._should_quit - @with_argument_list - def do_pyscript(self, arglist: List[str]) -> None: - """\nRuns a python script file inside the console - - Usage: pyscript <script_path> [script_arguments] + pyscript_parser = ACArgumentParser() + setattr(pyscript_parser.add_argument('script_path', help='path to the script file'), + ACTION_ARG_CHOICES, ('path_complete',)) + pyscript_parser.add_argument('script_arguments', nargs=argparse.REMAINDER, + help='arguments to pass to script') -Console commands can be executed inside this script with cmd("your command") -However, you cannot run nested "py" or "pyscript" commands from within this script -Paths or arguments that contain spaces must be enclosed in quotes -""" - if not arglist: - self.perror("pyscript command requires at least 1 argument ...", traceback_war=False) - self.do_help(['pyscript']) - return - - # Get the absolute path of the script - script_path = os.path.expanduser(arglist[0]) + @with_argparser(pyscript_parser) + def do_pyscript(self, args: argparse.Namespace) -> None: + """Run a Python script file inside the console""" + script_path = os.path.expanduser(args.script_path) # Save current command line arguments orig_args = sys.argv # Overwrite sys.argv to allow the script to take command line arguments - sys.argv = [script_path] - sys.argv.extend(arglist[1:]) + sys.argv = [script_path] + args.script_arguments # Run the script - use repr formatting to escape things which need to be escaped to prevent issues on Windows self.do_py("run({!r})".format(script_path)) @@ -2863,33 +3132,24 @@ Paths or arguments that contain spaces must be enclosed in quotes # Restore command line arguments to original state sys.argv = orig_args - def complete_pyscript(self, text: str, line: str, begidx: int, endidx: int) -> List[str]: - """Enable tab-completion for pyscript command.""" - index_dict = {1: self.path_complete} - return self.index_based_complete(text, line, begidx, endidx, index_dict) - # Only include the do_ipy() method if IPython is available on the system - if ipython_available: - # noinspection PyMethodMayBeStatic,PyUnusedLocal - def do_ipy(self, arg: str) -> None: - """Enters an interactive IPython shell. - - Run python code from external files with ``run filename.py`` - End with ``Ctrl-D`` (Unix) / ``Ctrl-Z`` (Windows), ``quit()``, '`exit()``. - """ + if ipython_available: # pragma: no cover + @with_argparser(ACArgumentParser()) + def do_ipy(self, _: argparse.Namespace) -> None: + """Enter an interactive IPython shell""" from .pyscript_bridge import PyscriptBridge bridge = PyscriptBridge(self) + banner = ('Entering an embedded IPython shell. Type quit or <Ctrl>-d to exit.\n' + 'Run Python code from external files with: run filename.py\n') + exit_msg = 'Leaving IPython, back to {}'.format(sys.argv[0]) + if self.locals_in_py: def load_ipy(self, app): - banner = 'Entering an embedded IPython shell type quit() or <Ctrl>-d to exit ...' - exit_msg = 'Leaving IPython, back to {}'.format(sys.argv[0]) embed(banner1=banner, exit_msg=exit_msg) load_ipy(self, bridge) else: def load_ipy(app): - banner = 'Entering an embedded IPython shell type quit() or <Ctrl>-d to exit ...' - exit_msg = 'Leaving IPython, back to {}'.format(sys.argv[0]) embed(banner1=banner, exit_msg=exit_msg) load_ipy(bridge) @@ -2898,10 +3158,10 @@ Paths or arguments that contain spaces must be enclosed in quotes history_parser_group.add_argument('-r', '--run', action='store_true', help='run selected history items') history_parser_group.add_argument('-e', '--edit', action='store_true', help='edit and then run selected history items') - history_parser_group.add_argument('-s', '--script', action='store_true', help='script format; no separation lines') + history_parser_group.add_argument('-s', '--script', action='store_true', help='output commands in script format') history_parser_group.add_argument('-o', '--output-file', metavar='FILE', help='output commands to a script file') history_parser_group.add_argument('-t', '--transcript', help='output commands and results to a transcript file') - history_parser_group.add_argument('-c', '--clear', action="store_true", help='clears all history') + history_parser_group.add_argument('-c', '--clear', action="store_true", help='clear all history') _history_arg_help = """empty all history items a one history item by number a..b, a:b, a:, ..b items by indices (inclusive) @@ -3031,7 +3291,7 @@ a..b, a:b, a:, ..b items by indices (inclusive) # get the output out of the buffer output = membuf.read() # and add the regex-escaped output to the transcript - transcript += output.replace('/', '\/') + transcript += output.replace('/', r'\/') # Restore stdout to its original state self.stdout = saved_self_stdout @@ -3053,29 +3313,28 @@ a..b, a:b, a:, ..b items by indices (inclusive) msg = '{} {} saved to transcript file {!r}' self.pfeedback(msg.format(len(history), plural, transcript_file)) - @with_argument_list - def do_edit(self, arglist: List[str]) -> None: - """Edit a file in a text editor + edit_description = ("Edit a file in a text editor\n" + "\n" + "The editor used is determined by a settable parameter. To set it:\n" + "\n" + " set editor (program-name)") -Usage: edit [file_path] - Where: - * file_path - path to a file to open in editor + edit_parser = ACArgumentParser(description=edit_description) + setattr(edit_parser.add_argument('file_path', help="path to a file to open in editor", nargs="?"), + ACTION_ARG_CHOICES, ('path_complete',)) -The editor used is determined by the ``editor`` settable parameter. -"set editor (program-name)" to change or set the EDITOR environment variable. -""" + @with_argparser(edit_parser) + def do_edit(self, args: argparse.Namespace) -> None: + """Edit a file in a text editor""" if not self.editor: raise EnvironmentError("Please use 'set editor' to specify your text editing program of choice.") - filename = arglist[0] if arglist else '' - if filename: - os.system('"{}" "{}"'.format(self.editor, filename)) - else: - os.system('"{}"'.format(self.editor)) - def complete_edit(self, text: str, line: str, begidx: int, endidx: int) -> List[str]: - """Enable tab-completion for edit command.""" - index_dict = {1: self.path_complete} - return self.index_based_complete(text, line, begidx, endidx, index_dict) + editor = utils.quote_string_if_needed(self.editor) + if args.file_path: + expanded_path = utils.quote_string_if_needed(os.path.expanduser(args.file_path)) + os.system('{} {}'.format(editor, expanded_path)) + else: + os.system('{}'.format(editor)) @property def _current_script_dir(self) -> Optional[str]: @@ -3085,54 +3344,25 @@ The editor used is determined by the ``editor`` settable parameter. else: return None - @with_argument_list - def do__relative_load(self, arglist: List[str]) -> None: - """Runs commands in script file that is encoded as either ASCII or UTF-8 text - - Usage: _relative_load <file_path> - - optional argument: - file_path a file path pointing to a script - -Script should contain one command per line, just like command would be typed in console. - -If this is called from within an already-running script, the filename will be interpreted -relative to the already-running script's directory. - -NOTE: This command is intended to only be used within text file scripts. - """ - # If arg is None or arg is an empty string this is an error - if not arglist: - self.perror('_relative_load command requires a file path:', traceback_war=False) - return - - file_path = arglist[0].strip() - # NOTE: Relative path is an absolute path, it is just relative to the current script directory - relative_path = os.path.join(self._current_script_dir or '', file_path) - self.do_load([relative_path]) - - def do_eos(self, _: str) -> None: - """Handles cleanup when a script has finished executing""" + @with_argparser(ACArgumentParser(epilog=INTERNAL_COMMAND_EPILOG)) + def do_eos(self, _: argparse.Namespace) -> None: + """Handle cleanup when a script has finished executing""" if self._script_dir: self._script_dir.pop() - @with_argument_list - def do_load(self, arglist: List[str]) -> None: - """Runs commands in script file that is encoded as either ASCII or UTF-8 text + load_description = ("Run commands in script file that is encoded as either ASCII or UTF-8 text\n" + "\n" + "Script should contain one command per line, just like the command would be\n" + "typed in the console.") - Usage: load <file_path> + load_parser = ACArgumentParser(description=load_description) + setattr(load_parser.add_argument('script_path', help="path to the script file"), + ACTION_ARG_CHOICES, ('path_complete',)) - * file_path - a file path pointing to a script - -Script should contain one command per line, just like command would be typed in console. - """ - # If arg is None or arg is an empty string this is an error - if not arglist: - self.perror('load command requires a file path', traceback_war=False) - return - - file_path = arglist[0].strip() - expanded_path = os.path.abspath(os.path.expanduser(file_path)) + @with_argparser(load_parser) + def do_load(self, args: argparse.Namespace) -> None: + """Run commands in script file that is encoded as either ASCII or UTF-8 text""" + expanded_path = os.path.abspath(os.path.expanduser(args.script_path)) # Make sure the path exists and we can access it if not os.path.exists(expanded_path): @@ -3166,10 +3396,24 @@ Script should contain one command per line, just like command would be typed in self._script_dir.append(os.path.dirname(expanded_path)) - def complete_load(self, text: str, line: str, begidx: int, endidx: int) -> List[str]: - """Enable tab-completion for load command.""" - index_dict = {1: self.path_complete} - return self.index_based_complete(text, line, begidx, endidx, index_dict) + relative_load_description = load_description + relative_load_description += ("\n\n" + "If this is called from within an already-running script, the filename will be\n" + "interpreted relative to the already-running script's directory.") + + relative_load_epilog = ("Notes:\n" + " This command is intended to only be used within text file scripts.") + + relative_load_parser = ACArgumentParser(description=relative_load_description, epilog=relative_load_epilog) + relative_load_parser.add_argument('file_path', help='a file path pointing to a script') + + @with_argparser(relative_load_parser) + def do__relative_load(self, args: argparse.Namespace) -> None: + """""" + file_path = args.file_path + # NOTE: Relative path is an absolute path, it is just relative to the current script directory + relative_path = os.path.join(self._current_script_dir or '', file_path) + self.do_load(relative_path) def run_transcript_tests(self, callargs: List[str]) -> None: """Runs transcript tests for provided file(s). @@ -3191,6 +3435,125 @@ Script should contain one command per line, just like command would be typed in runner = unittest.TextTestRunner() runner.run(testcase) + def _clear_input_lines_str(self) -> str: # pragma: no cover + """ + Returns a string that if printed will clear the prompt and input lines in the terminal, + leaving the cursor at the beginning of the first input line + :return: the string to print + """ + if not (vt100_support and self.use_rawinput): + return '' + + import shutil + import colorama.ansi as ansi + from colorama import Cursor + + visible_prompt = self.visible_prompt + + # Get the size of the terminal + terminal_size = shutil.get_terminal_size() + + # Figure out how many lines the prompt and user input take up + total_str_size = len(visible_prompt) + len(readline.get_line_buffer()) + num_input_lines = int(total_str_size / terminal_size.columns) + 1 + + # Get the cursor's offset from the beginning of the first input line + cursor_input_offset = len(visible_prompt) + rl_get_point() + + # Calculate what input line the cursor is on + cursor_input_line = int(cursor_input_offset / terminal_size.columns) + 1 + + # Create a string that will clear all input lines and print the alert + terminal_str = '' + + # Move the cursor down to the last input line + if cursor_input_line != num_input_lines: + terminal_str += Cursor.DOWN(num_input_lines - cursor_input_line) + + # Clear each input line from the bottom up so that the cursor ends up on the original first input line + terminal_str += (ansi.clear_line() + Cursor.UP(1)) * (num_input_lines - 1) + terminal_str += ansi.clear_line() + + # Move the cursor to the beginning of the first input line + terminal_str += '\r' + + return terminal_str + + def async_alert(self, alert_msg: str, new_prompt: Optional[str] = None) -> None: # pragma: no cover + """ + Used to display an important message to the user while they are at the prompt in between commands. + To the user it appears as if an alert message is printed above the prompt and their current input + text and cursor location is left alone. + + IMPORTANT: Do not call this unless you have acquired self.terminal_lock + first, which ensures a prompt is onscreen + + :param alert_msg: the message to display to the user + :param new_prompt: if you also want to change the prompt that is displayed, then include it here + see async_update_prompt() docstring for guidance on updating a prompt + :raises RuntimeError if called while another thread holds terminal_lock + """ + if not (vt100_support and self.use_rawinput): + return + + # Sanity check that can't fail if self.terminal_lock was acquired before calling this function + if self.terminal_lock.acquire(blocking=False): + + # Generate a string to clear the prompt and input lines and replace with the alert + terminal_str = self._clear_input_lines_str() + if alert_msg: + terminal_str += alert_msg + '\n' + + # Set the new prompt now that _clear_input_lines_str is done using the old prompt + if new_prompt is not None: + self.prompt = new_prompt + rl_set_prompt(self.prompt) + + # Print terminal_str to erase the lines + if rl_type == RlType.GNU: + sys.stderr.write(terminal_str) + elif rl_type == RlType.PYREADLINE: + readline.rl.mode.console.write(terminal_str) + + # Redraw the prompt and input lines + rl_force_redisplay() + + self.terminal_lock.release() + + else: + raise RuntimeError("another thread holds terminal_lock") + + def async_update_prompt(self, new_prompt: str) -> None: # pragma: no cover + """ + Updates the prompt while the user is still typing at it. This is good for alerting the user to system + changes dynamically in between commands. For instance you could alter the color of the prompt to indicate + a system status or increase a counter to report an event. If you do alter the actual text of the prompt, + it is best to keep the prompt the same width as what's on screen. Otherwise the user's input text will + be shifted and the update will not be seamless. + + IMPORTANT: Do not call this unless you have acquired self.terminal_lock + first, which ensures a prompt is onscreen + + :param new_prompt: what to change the prompt to + """ + self.async_alert('', new_prompt) + + @staticmethod + def set_window_title(title: str) -> None: # pragma: no cover + """ + Sets the terminal window title + :param title: the new window title + """ + if not vt100_support: + return + + import colorama.ansi as ansi + try: + sys.stderr.write(ansi.set_title(title)) + except AttributeError: + # Debugging in Pycharm has issues with setting terminal title + pass + def cmdloop(self, intro: Optional[str]=None) -> None: """This is an outer wrapper around _cmdloop() which deals with extra features provided by cmd2. @@ -3216,6 +3579,14 @@ Script should contain one command per line, just like command would be typed in if callargs: self.cmdqueue.extend(callargs) + # Register a SIGINT signal handler for Ctrl+C + import signal + original_sigint_handler = signal.getsignal(signal.SIGINT) + signal.signal(signal.SIGINT, self.sigint_handler) + + # Grab terminal lock before the prompt has been drawn by readline + self.terminal_lock.acquire() + # Always run the preloop first for func in self._preloop_hooks: func() @@ -3241,6 +3612,13 @@ Script should contain one command per line, just like command would be typed in func() self.postloop() + # Release terminal lock now that postloop code should have stopped any terminal updater threads + # This will also zero the lock count in case cmdloop() is called again + self.terminal_lock.release() + + # Restore the original signal handler + signal.signal(signal.SIGINT, original_sigint_handler) + if self.exit_code is not None: sys.exit(self.exit_code) @@ -3249,7 +3627,7 @@ Script should contain one command per line, just like command would be typed in # plugin related functions # ### - def _initialize_plugin_system(self): + def _initialize_plugin_system(self) -> None: """Initialize the plugin system""" self._preloop_hooks = [] self._postloop_hooks = [] @@ -3259,7 +3637,7 @@ Script should contain one command per line, just like command would be typed in self._cmdfinalization_hooks = [] @classmethod - def _validate_callable_param_count(cls, func: Callable, count: int): + def _validate_callable_param_count(cls, func: Callable, count: int) -> None: """Ensure a function has the given number of parameters.""" signature = inspect.signature(func) # validate that the callable has the right number of parameters @@ -3272,7 +3650,7 @@ Script should contain one command per line, just like command would be typed in )) @classmethod - def _validate_prepostloop_callable(cls, func: Callable): + def _validate_prepostloop_callable(cls, func: Callable[[None], None]) -> None: """Check parameter and return types for preloop and postloop hooks.""" cls._validate_callable_param_count(func, 0) # make sure there is no return notation @@ -3282,18 +3660,18 @@ Script should contain one command per line, just like command would be typed in func.__name__, )) - def register_preloop_hook(self, func: Callable): + def register_preloop_hook(self, func: Callable[[None], None]) -> None: """Register a function to be called at the beginning of the command loop.""" self._validate_prepostloop_callable(func) self._preloop_hooks.append(func) - def register_postloop_hook(self, func: Callable): + def register_postloop_hook(self, func: Callable[[None], None]) -> None: """Register a function to be called at the end of the command loop.""" self._validate_prepostloop_callable(func) self._postloop_hooks.append(func) @classmethod - def _validate_postparsing_callable(cls, func: Callable): + def _validate_postparsing_callable(cls, func: Callable[[plugin.PostparsingData], plugin.PostparsingData]) -> None: """Check parameter and return types for postparsing hooks""" cls._validate_callable_param_count(func, 1) signature = inspect.signature(func) @@ -3307,13 +3685,13 @@ Script should contain one command per line, just like command would be typed in func.__name__ )) - def register_postparsing_hook(self, func: Callable): + def register_postparsing_hook(self, func: Callable[[plugin.PostparsingData], plugin.PostparsingData]) -> None: """Register a function to be called after parsing user input but before running the command""" self._validate_postparsing_callable(func) self._postparsing_hooks.append(func) @classmethod - def _validate_prepostcmd_hook(cls, func: Callable, data_type: Type): + def _validate_prepostcmd_hook(cls, func: Callable, data_type: Type) -> None: """Check parameter and return types for pre and post command hooks.""" signature = inspect.signature(func) # validate that the callable has the right number of parameters @@ -3340,18 +3718,19 @@ Script should contain one command per line, just like command would be typed in data_type, )) - def register_precmd_hook(self, func: Callable): + def register_precmd_hook(self, func: Callable[[plugin.PrecommandData], plugin.PrecommandData]) -> None: """Register a hook to be called before the command function.""" self._validate_prepostcmd_hook(func, plugin.PrecommandData) self._precmd_hooks.append(func) - def register_postcmd_hook(self, func: Callable): + def register_postcmd_hook(self, func: Callable[[plugin.PostcommandData], plugin.PostcommandData]) -> None: """Register a hook to be called after the command function.""" self._validate_prepostcmd_hook(func, plugin.PostcommandData) self._postcmd_hooks.append(func) @classmethod - def _validate_cmdfinalization_callable(cls, func: Callable): + def _validate_cmdfinalization_callable(cls, func: Callable[[plugin.CommandFinalizationData], + plugin.CommandFinalizationData]) -> None: """Check parameter and return types for command finalization hooks.""" cls._validate_callable_param_count(func, 1) signature = inspect.signature(func) @@ -3363,7 +3742,8 @@ Script should contain one command per line, just like command would be typed in raise TypeError("{} must declare return a return type of " "'cmd2.plugin.CommandFinalizationData'".format(func.__name__)) - def register_cmdfinalization_hook(self, func: Callable): + def register_cmdfinalization_hook(self, func: Callable[[plugin.CommandFinalizationData], + plugin.CommandFinalizationData]) -> None: """Register a hook to be called after a command is completed, whether it completes successfully or not.""" self._validate_cmdfinalization_callable(func) self._cmdfinalization_hooks.append(func) @@ -3429,7 +3809,7 @@ class History(list): def get(self, getme: Optional[Union[int, str]]=None) -> List[HistoryItem]: """Get an item or items from the History list using 1-based indexing. - :param getme: item(s) to get - either an integer index or string to search for + :param getme: optional item(s) to get (either an integer index or string to search for) :return: list of HistoryItems matching the retrieval criteria """ if not getme: diff --git a/cmd2/constants.py b/cmd2/constants.py index d3e8a125..3c133b70 100644 --- a/cmd2/constants.py +++ b/cmd2/constants.py @@ -17,3 +17,8 @@ REDIRECTION_TOKENS = [REDIRECTION_PIPE, REDIRECTION_OUTPUT, REDIRECTION_APPEND] ANSI_ESCAPE_RE = re.compile(r'\x1b[^m]*m') LINE_FEED = '\n' + +# values for colors setting +COLORS_NEVER = 'Never' +COLORS_TERMINAL = 'Terminal' +COLORS_ALWAYS = 'Always' diff --git a/cmd2/parsing.py b/cmd2/parsing.py index 8edfacb9..e90eac43 100644 --- a/cmd2/parsing.py +++ b/cmd2/parsing.py @@ -14,6 +14,55 @@ from . import utils @attr.s(frozen=True) +class MacroArg: + """ + Information used to replace or unescape arguments in a macro value when the macro is resolved + Normal argument syntax : {5} + Escaped argument syntax: {{5}} + """ + # The starting index of this argument in the macro value + start_index = attr.ib(validator=attr.validators.instance_of(int)) + + # The number string that appears between the braces + # This is a string instead of an int because we support unicode digits and must be able + # to reproduce this string later + number_str = attr.ib(validator=attr.validators.instance_of(str)) + + # Tells if this argument is escaped and therefore needs to be unescaped + is_escaped = attr.ib(validator=attr.validators.instance_of(bool)) + + # Pattern used to find normal argument + # Digits surrounded by exactly 1 brace on a side and 1 or more braces on the opposite side + # Match strings like: {5}, {{{{{4}, {2}}}}} + macro_normal_arg_pattern = re.compile(r'(?<!{){\d+}|{\d+}(?!})') + + # Pattern used to find escaped arguments + # Digits surrounded by 2 or more braces on both sides + # Match strings like: {{5}}, {{{{{4}}, {{2}}}}} + macro_escaped_arg_pattern = re.compile(r'{{2}\d+}{2}') + + # Finds a string of digits + digit_pattern = re.compile(r'\d+') + + +@attr.s(frozen=True) +class Macro: + """Defines a cmd2 macro""" + + # Name of the macro + name = attr.ib(validator=attr.validators.instance_of(str)) + + # The string the macro resolves to + value = attr.ib(validator=attr.validators.instance_of(str)) + + # The required number of args the user has to pass to this macro + required_arg_count = attr.ib(validator=attr.validators.instance_of(int)) + + # Used to fill in argument placeholders in the macro + arg_list = attr.ib(default=attr.Factory(list), validator=attr.validators.instance_of(list)) + + +@attr.s(frozen=True) class Statement(str): """String subclass with additional attributes to store the results of parsing. @@ -81,34 +130,34 @@ class Statement(str): argv[1:], which strips them all off for you. """ # the arguments, but not the command, nor the output redirection clauses. - args = attr.ib(default='', validator=attr.validators.instance_of(str), type=str) + args = attr.ib(default='', validator=attr.validators.instance_of(str)) # string containing exactly what we input by the user - raw = attr.ib(default='', validator=attr.validators.instance_of(str), type=str) + raw = attr.ib(default='', validator=attr.validators.instance_of(str)) # the command, i.e. the first whitespace delimited word - command = attr.ib(default='', validator=attr.validators.instance_of(str), type=str) + command = attr.ib(default='', validator=attr.validators.instance_of(str)) # list of arguments to the command, not including any output redirection or terminators; quoted args remain quoted - arg_list = attr.ib(factory=list, validator=attr.validators.instance_of(list), type=List[str]) + arg_list = attr.ib(default=attr.Factory(list), validator=attr.validators.instance_of(list)) # if the command is a multiline command, the name of the command, otherwise empty - multiline_command = attr.ib(default='', validator=attr.validators.instance_of(str), type=str) + multiline_command = attr.ib(default='', validator=attr.validators.instance_of(str)) # the character which terminated the multiline command, if there was one - terminator = attr.ib(default='', validator=attr.validators.instance_of(str), type=str) + terminator = attr.ib(default='', validator=attr.validators.instance_of(str)) # characters appearing after the terminator but before output redirection, if any - suffix = attr.ib(default='', validator=attr.validators.instance_of(str), type=str) + suffix = attr.ib(default='', validator=attr.validators.instance_of(str)) # if output was piped to a shell command, the shell command as a list of tokens - pipe_to = attr.ib(factory=list, validator=attr.validators.instance_of(list), type=List[str]) + pipe_to = attr.ib(default=attr.Factory(list), validator=attr.validators.instance_of(list)) # if output was redirected, the redirection token, i.e. '>>' - output = attr.ib(default='', validator=attr.validators.instance_of(str), type=str) + output = attr.ib(default='', validator=attr.validators.instance_of(str)) # if output was redirected, the destination file - output_to = attr.ib(default='', validator=attr.validators.instance_of(str), type=str) + output_to = attr.ib(default='', validator=attr.validators.instance_of(str)) def __new__(cls, value: object, *pos_args, **kw_args): """Create a new instance of Statement. @@ -247,23 +296,33 @@ class StatementParser: self._command_pattern = re.compile(expr) def is_valid_command(self, word: str) -> Tuple[bool, str]: - """Determine whether a word is a valid alias. + """Determine whether a word is a valid name for a command. - Aliases can not include redirection characters, whitespace, - or termination characters. + Commands can not include redirection characters, whitespace, + or termination characters. They also cannot start with a + shortcut. - If word is not a valid command, return False and a comma - separated string of characters that can not appear in a command. + If word is not a valid command, return False and error text This string is suitable for inclusion in an error message of your choice: - valid, invalidchars = statement_parser.is_valid_command('>') + valid, errmsg = statement_parser.is_valid_command('>') if not valid: - errmsg = "Aliases can not contain: {}".format(invalidchars) + errmsg = "Alias {}".format(errmsg) """ valid = False - errmsg = 'whitespace, quotes, ' + if not word: + return False, 'cannot be an empty string' + + for (shortcut, _) in self.shortcuts: + if word.startswith(shortcut): + # Build an error string with all shortcuts listed + errmsg = 'cannot start with a shortcut: ' + errmsg += ', '.join(shortcut for (shortcut, _) in self.shortcuts) + return False, errmsg + + errmsg = 'cannot contain: whitespace, quotes, ' errchars = [] errchars.extend(constants.REDIRECTION_CHARS) errchars.extend(self.terminators) diff --git a/cmd2/pyscript_bridge.py b/cmd2/pyscript_bridge.py index 3f58ab84..f03b530f 100644 --- a/cmd2/pyscript_bridge.py +++ b/cmd2/pyscript_bridge.py @@ -12,15 +12,15 @@ import functools import sys from typing import List, Callable +from .argparse_completer import _RangeAction +from .utils import namedtuple_with_defaults, StdSim + # Python 3.4 require contextlib2 for temporarily redirecting stderr and stdout if sys.version_info < (3, 5): from contextlib2 import redirect_stdout, redirect_stderr else: from contextlib import redirect_stdout, redirect_stderr -from .argparse_completer import _RangeAction -from .utils import namedtuple_with_defaults - class CommandResult(namedtuple_with_defaults('CommandResult', ['stdout', 'stderr', 'data'])): """Encapsulates the results from a command. @@ -38,37 +38,12 @@ class CommandResult(namedtuple_with_defaults('CommandResult', ['stdout', 'stderr return not self.stderr and self.data is not None -class CopyStream(object): - """Copies all data written to a stream""" - def __init__(self, inner_stream, echo: bool = False) -> None: - self.buffer = '' - self.inner_stream = inner_stream - self.echo = echo - - def write(self, s): - self.buffer += s - if self.echo: - self.inner_stream.write(s) - - def read(self): - raise NotImplementedError - - def clear(self): - self.buffer = '' - - def __getattr__(self, item: str): - if item in self.__dict__: - return self.__dict__[item] - else: - return getattr(self.inner_stream, item) - - def _exec_cmd(cmd2_app, func: Callable, echo: bool): """Helper to encapsulate executing a command and capturing the results""" - copy_stdout = CopyStream(sys.stdout, echo) - copy_stderr = CopyStream(sys.stderr, echo) + copy_stdout = StdSim(sys.stdout, echo) + copy_stderr = StdSim(sys.stderr, echo) - copy_cmd_stdout = CopyStream(cmd2_app.stdout, echo) + copy_cmd_stdout = StdSim(cmd2_app.stdout, echo) cmd2_app._last_result = None @@ -81,9 +56,9 @@ def _exec_cmd(cmd2_app, func: Callable, echo: bool): cmd2_app.stdout = copy_cmd_stdout.inner_stream # if stderr is empty, set it to None - stderr = copy_stderr.buffer if copy_stderr.buffer else None + stderr = copy_stderr.getvalue() if copy_stderr.getvalue() else None - outbuf = copy_cmd_stdout.buffer if copy_cmd_stdout.buffer else copy_stdout.buffer + outbuf = copy_cmd_stdout.getvalue() if copy_cmd_stdout.getvalue() else copy_stdout.getvalue() result = CommandResult(stdout=outbuf, stderr=stderr, data=cmd2_app._last_result) return result @@ -204,7 +179,10 @@ class ArgparseFunctor: def _run(self): # look up command function - func = getattr(self._cmd2_app, 'do_' + self._command_name) + func = self._cmd2_app.cmd_func(self._command_name) + if func is None: + raise AttributeError("{!r} object has no command called {!r}".format(self._cmd2_app.__class__.__name__, + self._command_name)) # reconstruct the cmd2 command from the python call cmd_str = [''] @@ -273,23 +251,20 @@ class PyscriptBridge(object): def __getattr__(self, item: str): """Check if the attribute is a command. If so, return a callable.""" - commands = self._cmd2_app.get_all_commands() - if item in commands: - func = getattr(self._cmd2_app, 'do_' + item) - - try: - # See if the command uses argparse - parser = getattr(func, 'argparser') - except AttributeError: - # Command doesn't, we will accept parameters in the form of a command string + func = self._cmd2_app.cmd_func(item) + + if func: + if hasattr(func, 'argparser'): + # Command uses argparse, return an object that can traverse the argparse subcommands and arguments + return ArgparseFunctor(self.cmd_echo, self._cmd2_app, item, getattr(func, 'argparser')) + else: + # Command doesn't use argparse, we will accept parameters in the form of a command string def wrap_func(args=''): return _exec_cmd(self._cmd2_app, functools.partial(func, args), self.cmd_echo) - return wrap_func - else: - # Command does use argparse, return an object that can traverse the argparse subcommands and arguments - return ArgparseFunctor(self.cmd_echo, self._cmd2_app, item, parser) - return super().__getattr__(item) + return wrap_func + else: + return super().__getattr__(item) def __dir__(self): """Return a custom set of attribute names to match the available commands""" diff --git a/cmd2/rl_utils.py b/cmd2/rl_utils.py index 7e49ea47..0819232d 100644 --- a/cmd2/rl_utils.py +++ b/cmd2/rl_utils.py @@ -26,13 +26,54 @@ class RlType(Enum): # Check what implementation of readline we are using - rl_type = RlType.NONE +# Tells if the terminal we are running in supports vt100 control characters +vt100_support = False + # The order of this check matters since importing pyreadline will also show readline in the modules list if 'pyreadline' in sys.modules: rl_type = RlType.PYREADLINE + from ctypes import byref + from ctypes.wintypes import DWORD, HANDLE + import atexit + + # Check if we are running in a terminal + if sys.stdout.isatty(): # pragma: no cover + # noinspection PyPep8Naming + def enable_win_vt100(handle: HANDLE) -> bool: + """ + Enables VT100 character sequences in a Windows console + This only works on Windows 10 and up + :param handle: the handle on which to enable vt100 + :return: True if vt100 characters are enabled for the handle + """ + ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x0004 + + # Get the current mode for this handle in the console + cur_mode = DWORD(0) + readline.rl.console.GetConsoleMode(handle, byref(cur_mode)) + + retVal = False + + # Check if ENABLE_VIRTUAL_TERMINAL_PROCESSING is already enabled + if (cur_mode.value & ENABLE_VIRTUAL_TERMINAL_PROCESSING) != 0: + retVal = True + + elif readline.rl.console.SetConsoleMode(handle, cur_mode.value | ENABLE_VIRTUAL_TERMINAL_PROCESSING): + # Restore the original mode when we exit + atexit.register(readline.rl.console.SetConsoleMode, handle, cur_mode) + retVal = True + + return retVal + + # Enable VT100 sequences for stdout and stderr + STD_OUT_HANDLE = -11 + STD_ERROR_HANDLE = -12 + vt100_support = (enable_win_vt100(readline.rl.console.GetStdHandle(STD_OUT_HANDLE)) and + enable_win_vt100(readline.rl.console.GetStdHandle(STD_ERROR_HANDLE))) + ############################################################################################################ # pyreadline is incomplete in terms of the Python readline API. Add the missing functions we need. ############################################################################################################ @@ -74,9 +115,13 @@ elif 'gnureadline' in sys.modules or 'readline' in sys.modules: import ctypes readline_lib = ctypes.CDLL(readline.__file__) + # Check if we are running in a terminal + if sys.stdout.isatty(): + vt100_support = True + # noinspection PyProtectedMember -def rl_force_redisplay() -> None: +def rl_force_redisplay() -> None: # pragma: no cover """ Causes readline to display the prompt and input text wherever the cursor is and start reading input from this location. This is the proper way to restore the input line after @@ -85,14 +130,77 @@ def rl_force_redisplay() -> None: if not sys.stdout.isatty(): return - if rl_type == RlType.GNU: # pragma: no cover + if rl_type == RlType.GNU: readline_lib.rl_forced_update_display() # After manually updating the display, readline asks that rl_display_fixed be set to 1 for efficiency display_fixed = ctypes.c_int.in_dll(readline_lib, "rl_display_fixed") display_fixed.value = 1 - elif rl_type == RlType.PYREADLINE: # pragma: no cover + elif rl_type == RlType.PYREADLINE: # Call _print_prompt() first to set the new location of the prompt readline.rl.mode._print_prompt() readline.rl.mode._update_line() + + +# noinspection PyProtectedMember +def rl_get_point() -> int: # pragma: no cover + """ + Returns the offset of the current cursor position in rl_line_buffer + """ + if rl_type == RlType.GNU: + return ctypes.c_int.in_dll(readline_lib, "rl_point").value + + elif rl_type == RlType.PYREADLINE: + return readline.rl.mode.l_buffer.point + + else: + return 0 + + +# noinspection PyProtectedMember +def rl_set_prompt(prompt: str) -> None: # pragma: no cover + """ + Sets readline's prompt + :param prompt: the new prompt value + """ + safe_prompt = rl_make_safe_prompt(prompt) + + if rl_type == RlType.GNU: + encoded_prompt = bytes(safe_prompt, encoding='utf-8') + readline_lib.rl_set_prompt(encoded_prompt) + + elif rl_type == RlType.PYREADLINE: + readline.rl._set_prompt(safe_prompt) + + +def rl_make_safe_prompt(prompt: str) -> str: # pragma: no cover + """Overcome bug in GNU Readline in relation to calculation of prompt length in presence of ANSI escape codes. + + :param prompt: original prompt + :return: prompt safe to pass to GNU Readline + """ + if rl_type == RlType.GNU: + # start code to tell GNU Readline about beginning of invisible characters + start = "\x01" + + # end code to tell GNU Readline about end of invisible characters + end = "\x02" + + escaped = False + result = "" + + for c in prompt: + if c == "\x1b" and not escaped: + result += start + c + escaped = True + elif c.isalpha() and escaped: + result += c + end + escaped = False + else: + result += c + + return result + + else: + return prompt diff --git a/cmd2/transcript.py b/cmd2/transcript.py index 5ba8d20d..2d94f4e4 100644 --- a/cmd2/transcript.py +++ b/cmd2/transcript.py @@ -44,7 +44,7 @@ class Cmd2TestCase(unittest.TestCase): # Trap stdout self._orig_stdout = self.cmdapp.stdout - self.cmdapp.stdout = OutputTrap() + self.cmdapp.stdout = utils.StdSim(self.cmdapp.stdout) def runTest(self): # was testall if self.cmdapp: @@ -106,7 +106,7 @@ class Cmd2TestCase(unittest.TestCase): self.assertTrue(re.match(expected, result, re.MULTILINE | re.DOTALL), message) def _transform_transcript_expected(self, s: str) -> str: - """Parse the string with slashed regexes into a valid regex. + r"""Parse the string with slashed regexes into a valid regex. Given a string like: @@ -203,24 +203,3 @@ class Cmd2TestCase(unittest.TestCase): if self.cmdapp: # Restore stdout self.cmdapp.stdout = self._orig_stdout - -class OutputTrap(object): - """Instantiate an OutputTrap to divert/capture ALL stdout output. - For use in transcript testing. - """ - - def __init__(self): - self.contents = '' - - def write(self, txt: str): - """Add text to the internal contents.""" - self.contents += txt - - def read(self) -> str: - """Read from the internal contents and then clear them out. - - :return: str - text from the internal contents - """ - result = self.contents - self.contents = '' - return result diff --git a/cmd2/utils.py b/cmd2/utils.py index 02956f6b..ddd43507 100644 --- a/cmd2/utils.py +++ b/cmd2/utils.py @@ -4,8 +4,9 @@ import collections import os -from typing import Any, List, Optional, Union +import re import unicodedata +from typing import Any, Iterable, List, Optional, Union from . import constants @@ -19,6 +20,28 @@ def strip_ansi(text: str) -> str: return constants.ANSI_ESCAPE_RE.sub('', text) +def is_quoted(arg: str) -> bool: + """ + Checks if a string is quoted + :param arg: the string being checked for quotes + :return: True if a string is quoted + """ + return len(arg) > 1 and arg[0] == arg[-1] and arg[0] in constants.QUOTES + + +def quote_string_if_needed(arg: str) -> str: + """ Quotes a string if it contains spaces and isn't already quoted """ + if is_quoted(arg) or ' ' not in arg: + return arg + + if '"' in arg: + quote = "'" + else: + quote = '"' + + return quote + arg + quote + + def strip_quotes(arg: str) -> str: """ Strip outer quotes from a string. @@ -27,7 +50,7 @@ def strip_quotes(arg: str) -> str: :param arg: string to strip outer quotes from :return: same string with potentially outer quotes stripped """ - if len(arg) > 1 and arg[0] == arg[-1] and arg[0] in constants.QUOTES: + if is_quoted(arg): arg = arg[1:-1] return arg @@ -66,10 +89,12 @@ def cast(current: Any, new: str) -> Any: """Tries to force a new value into the same type as the current when trying to set the value for a parameter. :param current: current value for the parameter, type varies - :param new: str - new value + :param new: new value :return: new value with same type as current, or the current value if there was an error casting """ typ = type(current) + orig_new = new + if typ == bool: try: return bool(int(new)) @@ -77,18 +102,18 @@ def cast(current: Any, new: str) -> Any: pass try: new = new.lower() + if (new == 'on') or (new[0] in ('y', 't')): + return True + if (new == 'off') or (new[0] in ('n', 'f')): + return False except AttributeError: pass - if (new == 'on') or (new[0] in ('y', 't')): - return True - if (new == 'off') or (new[0] in ('n', 'f')): - return False else: try: return typ(new) except (ValueError, TypeError): pass - print("Problem setting parameter (now %s) to %s; incorrect type?" % (current, new)) + print("Problem setting parameter (now {}) to {}; incorrect type?".format(current, orig_new)) return current @@ -169,10 +194,125 @@ def norm_fold(astr: str) -> str: return unicodedata.normalize('NFC', astr).casefold() -def alphabetical_sort(list_to_sort: List[str]) -> List[str]: +def alphabetical_sort(list_to_sort: Iterable[str]) -> List[str]: """Sorts a list of strings alphabetically. + For example: ['a1', 'A11', 'A2', 'a22', 'a3'] + + To sort a list in place, don't call this method, which makes a copy. Instead, do this: + + my_list.sort(key=norm_fold) + :param list_to_sort: the list being sorted :return: the sorted list """ return sorted(list_to_sort, key=norm_fold) + + +def try_int_or_force_to_lower_case(input_str: str) -> Union[int, str]: + """ + Tries to convert the passed-in string to an integer. If that fails, it converts it to lower case using norm_fold. + :param input_str: string to convert + :return: the string as an integer or a lower case version of the string + """ + try: + return int(input_str) + except ValueError: + return norm_fold(input_str) + + +def natural_keys(input_str: str) -> List[Union[int, str]]: + """ + Converts a string into a list of integers and strings to support natural sorting (see natural_sort). + + For example: natural_keys('abc123def') -> ['abc', '123', 'def'] + :param input_str: string to convert + :return: list of strings and integers + """ + return [try_int_or_force_to_lower_case(substr) for substr in re.split(r'(\d+)', input_str)] + + +def natural_sort(list_to_sort: Iterable[str]) -> List[str]: + """ + Sorts a list of strings case insensitively as well as numerically. + + For example: ['a1', 'A2', 'a3', 'A11', 'a22'] + + To sort a list in place, don't call this method, which makes a copy. Instead, do this: + + my_list.sort(key=natural_keys) + + :param list_to_sort: the list being sorted + :return: the list sorted naturally + """ + return sorted(list_to_sort, key=natural_keys) + + +class StdSim(object): + """Class to simulate behavior of sys.stdout or sys.stderr. + + Stores contents in internal buffer and optionally echos to the inner stream it is simulating. + """ + class ByteBuf(object): + """Inner class which stores an actual bytes buffer and does the actual output if echo is enabled.""" + def __init__(self, inner_stream, echo: bool = False) -> None: + self.byte_buf = b'' + self.inner_stream = inner_stream + self.echo = echo + + def write(self, b: bytes) -> None: + """Add bytes to internal bytes buffer and if echo is True, echo contents to inner stream.""" + if not isinstance(b, bytes): + raise TypeError('a bytes-like object is required, not {}'.format(type(b))) + self.byte_buf += b + if self.echo: + self.inner_stream.buffer.write(b) + + def __init__(self, inner_stream, echo: bool = False) -> None: + self.buffer = self.ByteBuf(inner_stream, echo) + self.inner_stream = inner_stream + + def write(self, s: str) -> None: + """Add str to internal bytes buffer and if echo is True, echo contents to inner stream.""" + if not isinstance(s, str): + raise TypeError('write() argument must be str, not {}'.format(type(s))) + b = s.encode() + self.buffer.write(b) + + def getvalue(self) -> str: + """Get the internal contents as a str. + + :return string from the internal contents + """ + return self.buffer.byte_buf.decode() + + def read(self) -> str: + """Read from the internal contents as a str and then clear them out. + + :return: string from the internal contents + """ + result = self.getvalue() + self.clear() + return result + + def clear(self) -> None: + """Clear the internal contents.""" + self.buffer.byte_buf = b'' + + def __getattr__(self, item: str): + if item in self.__dict__: + return self.__dict__[item] + else: + return getattr(self.inner_stream, item) + + +def unquote_redirection_tokens(args: List[str]) -> None: + """ + Unquote redirection tokens in a list of command-line arguments + This is used when redirection tokens have to be passed to another command + :param args: the command line args + """ + for i, arg in enumerate(args): + unquoted_arg = strip_quotes(arg) + if unquoted_arg in constants.REDIRECTION_TOKENS: + args[i] = unquoted_arg |