diff options
author | Eric Lin <anselor@gmail.com> | 2018-05-16 10:39:49 -0400 |
---|---|---|
committer | Eric Lin <anselor@gmail.com> | 2018-05-16 10:39:49 -0400 |
commit | 371284d20370a8e85dd8527d9bbcc6267b335cde (patch) | |
tree | 002ac709eca15522a0b1abaa235aae04975b08e6 /cmd2/cmd2.py | |
parent | ab8194e92b9c3728d8f86cb9c81de180b6884eee (diff) | |
parent | a9b712108e5af49937b0af3aa51db2ebe5c159e4 (diff) | |
download | cmd2-git-371284d20370a8e85dd8527d9bbcc6267b335cde.tar.gz |
Merge branch 'master' into pyscript
Diffstat (limited to 'cmd2/cmd2.py')
-rwxr-xr-x | cmd2/cmd2.py | 526 |
1 files changed, 132 insertions, 394 deletions
diff --git a/cmd2/cmd2.py b/cmd2/cmd2.py index 63a3cbe3..a2d67def 100755 --- a/cmd2/cmd2.py +++ b/cmd2/cmd2.py @@ -27,6 +27,7 @@ import atexit import cmd import codecs import collections +from colorama import Fore import copy import datetime import functools @@ -51,26 +52,34 @@ import pyperclip from . import constants from . import utils +from cmd2.parsing import StatementParser, Statement + # Set up readline -from .rl_utils import rl_force_redisplay, readline, rl_type, RlType -from .argparse_completer import AutoCompleter, ACArgumentParser +from .rl_utils import rl_type, RlType +if rl_type == RlType.NONE: + rl_warning = "Readline features including tab completion have been disabled since no \n" \ + "supported version of readline was found. To resolve this, install \n" \ + "pyreadline on Windows or gnureadline on Mac.\n\n" + sys.stderr.write(Fore.LIGHTYELLOW_EX + rl_warning + Fore.RESET) +else: + from .rl_utils import rl_force_redisplay, readline -from cmd2.parsing import StatementParser, Statement + if rl_type == RlType.PYREADLINE: -if rl_type == RlType.PYREADLINE: + # Save the original pyreadline display completion function since we need to override it and restore it + # noinspection PyProtectedMember + orig_pyreadline_display = readline.rl.mode._display_completions - # Save the original pyreadline display completion function since we need to override it and restore it - # noinspection PyProtectedMember - orig_pyreadline_display = readline.rl.mode._display_completions + elif rl_type == RlType.GNU: -elif rl_type == RlType.GNU: + # We need wcswidth to calculate display width of tab completions + from wcwidth import wcswidth - # We need wcswidth to calculate display width of tab completions - from wcwidth import wcswidth + # Get the readline lib so we can make changes to it + import ctypes + from .rl_utils import readline_lib - # Get the readline lib so we can make changes to it - import ctypes - from .rl_utils import readline_lib +from .argparse_completer import AutoCompleter, ACArgumentParser # Newer versions of pyperclip are released as a single file, but older versions had a more complicated structure try: @@ -318,266 +327,6 @@ class EmptyStatement(Exception): pass -def _pop_readline_history(clear_history: bool=True) -> List[str]: - """Returns a copy of readline's history and optionally clears it (default)""" - # noinspection PyArgumentList - history = [ - readline.get_history_item(i) - for i in range(1, 1 + readline.get_current_history_length()) - ] - if clear_history: - readline.clear_history() - return history - - -def _push_readline_history(history, clear_history=True): - """Restores readline's history and optionally clears it first (default)""" - if clear_history: - readline.clear_history() - for line in history: - readline.add_history(line) - - -def _complete_from_cmd(cmd_obj, text, line, begidx, endidx): - """Complete as though the user was typing inside cmd's cmdloop()""" - from itertools import takewhile - command_subcommand_params = line.split(None, 3) - - if len(command_subcommand_params) < (3 if text else 2): - n = len(command_subcommand_params[0]) - n += sum(1 for _ in takewhile(str.isspace, line[n:])) - return cmd_obj.completenames(text, line[n:], begidx - n, endidx - n) - - command, subcommand = command_subcommand_params[:2] - n = len(command) + sum(1 for _ in takewhile(str.isspace, line)) - cfun = getattr(cmd_obj, 'complete_' + subcommand, cmd_obj.complete) - return cfun(text, line[n:], begidx - n, endidx - n) - - -class AddSubmenu(object): - """Conveniently add a submenu (Cmd-like class) to a Cmd - - e.g. given "class SubMenu(Cmd): ..." then - - @AddSubmenu(SubMenu(), 'sub') - class MyCmd(cmd.Cmd): - .... - - will have the following effects: - 1. 'sub' will interactively enter the cmdloop of a SubMenu instance - 2. 'sub cmd args' will call do_cmd(args) in a SubMenu instance - 3. 'sub ... [TAB]' will have the same behavior as [TAB] in a SubMenu cmdloop - i.e., autocompletion works the way you think it should - 4. 'help sub [cmd]' will print SubMenu's help (calls its do_help()) - """ - - class _Nonexistent(object): - """ - Used to mark missing attributes. - Disable __dict__ creation since this class does nothing - """ - __slots__ = () # - - def __init__(self, - submenu, - command, - aliases=(), - reformat_prompt="{super_prompt}>> {sub_prompt}", - shared_attributes=None, - require_predefined_shares=True, - create_subclass=False, - preserve_shares=False, - persistent_history_file=None - ): - """Set up the class decorator - - submenu (Cmd): Instance of something cmd.Cmd-like - - command (str): The command the user types to access the SubMenu instance - - aliases (iterable): More commands that will behave like "command" - - reformat_prompt (str): Format str or None to disable - if it's a string, it should contain one or more of: - {super_prompt}: The current cmd's prompt - {command}: The command in the current cmd with which it was called - {sub_prompt}: The subordinate cmd's original prompt - the default is "{super_prompt}{command} {sub_prompt}" - - shared_attributes (dict): dict of the form {'subordinate_attr': 'parent_attr'} - the attributes are copied to the submenu at the last moment; the submenu's - attributes are backed up before this and restored afterward - - require_predefined_shares: The shared attributes above must be independently - defined in the subordinate Cmd (default: True) - - create_subclass: put the modifications in a subclass rather than modifying - the existing class (default: False) - """ - self.submenu = submenu - self.command = command - self.aliases = aliases - if persistent_history_file: - self.persistent_history_file = os.path.expanduser(persistent_history_file) - else: - self.persistent_history_file = None - - if reformat_prompt is not None and not isinstance(reformat_prompt, str): - raise ValueError("reformat_prompt should be either a format string or None") - self.reformat_prompt = reformat_prompt - - self.shared_attributes = {} if shared_attributes is None else shared_attributes - if require_predefined_shares: - for attr in self.shared_attributes.keys(): - if not hasattr(submenu, attr): - raise AttributeError("The shared attribute '{attr}' is not defined in {cmd}. Either define {attr} " - "in {cmd} or set require_predefined_shares=False." - .format(cmd=submenu.__class__.__name__, attr=attr)) - - self.create_subclass = create_subclass - self.preserve_shares = preserve_shares - - def _get_original_attributes(self): - return { - attr: getattr(self.submenu, attr, AddSubmenu._Nonexistent) - for attr in self.shared_attributes.keys() - } - - def _copy_in_shared_attrs(self, parent_cmd): - for sub_attr, par_attr in self.shared_attributes.items(): - setattr(self.submenu, sub_attr, getattr(parent_cmd, par_attr)) - - def _copy_out_shared_attrs(self, parent_cmd, original_attributes): - if self.preserve_shares: - for sub_attr, par_attr in self.shared_attributes.items(): - setattr(parent_cmd, par_attr, getattr(self.submenu, sub_attr)) - else: - for attr, value in original_attributes.items(): - if attr is not AddSubmenu._Nonexistent: - setattr(self.submenu, attr, value) - else: - delattr(self.submenu, attr) - - def __call__(self, cmd_obj): - """Creates a subclass of Cmd wherein the given submenu can be accessed via the given command""" - def enter_submenu(parent_cmd, statement): - """ - This function will be bound to do_<submenu> and will change the scope of the CLI to that of the - submenu. - """ - submenu = self.submenu - original_attributes = self._get_original_attributes() - history = _pop_readline_history() - - if self.persistent_history_file: - try: - readline.read_history_file(self.persistent_history_file) - except FileNotFoundError: - pass - - try: - # copy over any shared attributes - self._copy_in_shared_attrs(parent_cmd) - - if statement.args: - # Remove the menu argument and execute the command in the submenu - submenu.onecmd_plus_hooks(statement.args) - else: - if self.reformat_prompt is not None: - prompt = submenu.prompt - submenu.prompt = self.reformat_prompt.format( - super_prompt=parent_cmd.prompt, - command=self.command, - sub_prompt=prompt, - ) - submenu.cmdloop() - if self.reformat_prompt is not None: - # noinspection PyUnboundLocalVariable - self.submenu.prompt = prompt - finally: - # copy back original attributes - self._copy_out_shared_attrs(parent_cmd, original_attributes) - - # write submenu history - if self.persistent_history_file: - readline.write_history_file(self.persistent_history_file) - # reset main app history before exit - _push_readline_history(history) - - def complete_submenu(_self, text, line, begidx, endidx): - """ - This function will be bound to complete_<submenu> and will perform the complete commands of the submenu. - """ - submenu = self.submenu - original_attributes = self._get_original_attributes() - try: - # copy over any shared attributes - self._copy_in_shared_attrs(_self) - - # Reset the submenu's tab completion parameters - submenu.allow_appended_space = True - submenu.allow_closing_quote = True - submenu.display_matches = [] - - return _complete_from_cmd(submenu, text, line, begidx, endidx) - finally: - # copy back original attributes - self._copy_out_shared_attrs(_self, original_attributes) - - # Pass the submenu's tab completion parameters back up to the menu that called complete() - _self.allow_appended_space = submenu.allow_appended_space - _self.allow_closing_quote = submenu.allow_closing_quote - _self.display_matches = copy.copy(submenu.display_matches) - - original_do_help = cmd_obj.do_help - original_complete_help = cmd_obj.complete_help - - def help_submenu(_self, line): - """ - This function will be bound to help_<submenu> and will call the help commands of the submenu. - """ - tokens = line.split(None, 1) - if tokens and (tokens[0] == self.command or tokens[0] in self.aliases): - self.submenu.do_help(tokens[1] if len(tokens) == 2 else '') - else: - original_do_help(_self, line) - - def _complete_submenu_help(_self, text, line, begidx, endidx): - """autocomplete to match help_submenu()'s behavior""" - tokens = line.split(None, 1) - if len(tokens) == 2 and ( - not (not tokens[1].startswith(self.command) and not any( - tokens[1].startswith(alias) for alias in self.aliases)) - ): - return self.submenu.complete_help( - text, - tokens[1], - begidx - line.index(tokens[1]), - endidx - line.index(tokens[1]), - ) - else: - return original_complete_help(_self, text, line, begidx, endidx) - - if self.create_subclass: - class _Cmd(cmd_obj): - do_help = help_submenu - complete_help = _complete_submenu_help - else: - _Cmd = cmd_obj - _Cmd.do_help = help_submenu - _Cmd.complete_help = _complete_submenu_help - - # Create bindings in the parent command to the submenus commands. - setattr(_Cmd, 'do_' + self.command, enter_submenu) - setattr(_Cmd, 'complete_' + self.command, complete_submenu) - - # Create additional bindings for aliases - for _alias in self.aliases: - setattr(_Cmd, 'do_' + _alias, enter_submenu) - setattr(_Cmd, 'complete_' + _alias, complete_submenu) - return _Cmd - - class Cmd(cmd.Cmd): """An easy but powerful framework for writing line-oriented command interpreters. @@ -589,7 +338,6 @@ class Cmd(cmd.Cmd): # Attributes used to configure the StatementParser, best not to change these at runtime blankLinesAllowed = False multiline_commands = [] - redirector = '>' # for sending output to file shortcuts = {'?': 'help', '!': 'shell', '@': 'load', '@@': '_relative_load'} aliases = dict() terminators = [';'] @@ -654,7 +402,7 @@ class Cmd(cmd.Cmd): pass # If persistent readline history is enabled, then read history from file and register to write to file at exit - if persistent_history_file: + if persistent_history_file and rl_type != RlType.NONE: persistent_history_file = os.path.expanduser(persistent_history_file) try: readline.read_history_file(persistent_history_file) @@ -1400,29 +1148,26 @@ class Cmd(cmd.Cmd): if len(raw_tokens) > 1: - # Build a list of all redirection tokens - all_redirects = constants.REDIRECTION_CHARS + ['>>'] - # Check if there are redirection strings prior to the token being completed seen_pipe = False has_redirection = False for cur_token in raw_tokens[:-1]: - if cur_token in all_redirects: + if cur_token in constants.REDIRECTION_TOKENS: has_redirection = True - if cur_token == '|': + if cur_token == constants.REDIRECTION_PIPE: seen_pipe = True # Get token prior to the one being completed prior_token = raw_tokens[-2] # If a pipe is right before the token being completed, complete a shell command as the piped process - if prior_token == '|': + if prior_token == constants.REDIRECTION_PIPE: return self.shell_cmd_complete(text, line, begidx, endidx) # Otherwise do path completion either as files to redirectors or arguments to the piped process - elif prior_token in all_redirects or seen_pipe: + elif prior_token in constants.REDIRECTION_TOKENS or seen_pipe: return self.path_complete(text, line, begidx, endidx) # If there were redirection strings anywhere on the command line, then we @@ -1544,7 +1289,7 @@ class Cmd(cmd.Cmd): :param text: str - the current word that user is typing :param state: int - non-negative integer """ - if state == 0: + if state == 0 and rl_type != RlType.NONE: unclosed_quote = '' self.set_completion_defaults() @@ -1577,12 +1322,9 @@ class Cmd(cmd.Cmd): if begidx > 0: # Parse the command line - command, args, expanded_line = self.parseline(line) - - # use these lines instead of the one above - # statement = self.command_parser.parse_command_only(line) - # command = statement.command - # expanded_line = statement.command_and_args + statement = self.statement_parser.parse_command_only(line) + command = statement.command + expanded_line = statement.command_and_args # We overwrote line with a properly formatted but fully stripped version # Restore the end spaces since line is only supposed to be lstripped when @@ -1603,8 +1345,7 @@ class Cmd(cmd.Cmd): tokens, raw_tokens = self.tokens_for_completion(line, begidx, endidx) # Either had a parsing error or are trying to complete the command token - # The latter can happen if default_to_shell is True and parseline() allowed - # assumed something like " or ' was a command. + # The latter can happen if " or ' was entered as the command if tokens is None or len(tokens) == 1: self.completion_matches = [] return None @@ -1924,66 +1665,16 @@ class Cmd(cmd.Cmd): def parseline(self, line): """Parse the line into a command name and a string containing the arguments. - NOTE: This is an override of a parent class method. It is only used by other parent class methods. But - we do need to override it here so that the additional shortcuts present in cmd2 get properly expanded for - purposes of tab completion. + NOTE: This is an override of a parent class method. It is only used by other parent class methods. - Used for command tab completion. Returns a tuple containing (command, args, line). - 'command' and 'args' may be None if the line couldn't be parsed. + Different from the parent class method, this ignores self.identchars. :param line: str - line read by readline :return: (str, str, str) - tuple containing (command, args, line) """ - line = line.strip() - - if not line: - # Deal with empty line or all whitespace line - return None, None, line - # Make a copy of aliases so we can edit it - tmp_aliases = list(self.aliases.keys()) - keep_expanding = len(tmp_aliases) > 0 - - # Expand aliases - while keep_expanding: - for cur_alias in tmp_aliases: - keep_expanding = False - - if line == cur_alias or line.startswith(cur_alias + ' '): - line = line.replace(cur_alias, self.aliases[cur_alias], 1) - - # Do not expand the same alias more than once - tmp_aliases.remove(cur_alias) - keep_expanding = len(tmp_aliases) > 0 - break - - # Expand command shortcut to its full command name - for (shortcut, expansion) in self.shortcuts: - if line.startswith(shortcut): - # If the next character after the shortcut isn't a space, then insert one - shortcut_len = len(shortcut) - if len(line) == shortcut_len or line[shortcut_len] != ' ': - expansion += ' ' - - # Expand the shortcut - line = line.replace(shortcut, expansion, 1) - break - - i, n = 0, len(line) - - # If we are allowing shell commands, then allow any character in the command - if self.default_to_shell: - while i < n and line[i] != ' ': - i += 1 - - # Otherwise only allow those in identchars - else: - while i < n and line[i] in self.identchars: - i += 1 - - command, arg = line[:i], line[i:].strip() - - return command, arg, line + statement = self.statement_parser.parse_command_only(line) + return statement.command, statement.args, statement.command_and_args def onecmd_plus_hooks(self, line): """Top-level function called by cmdloop() to handle parsing a line and running the command and all of its hooks. @@ -2125,7 +1816,7 @@ class Cmd(cmd.Cmd): # We want Popen to raise an exception if it fails to open the process. Thus we don't set shell to True. try: - self.pipe_proc = subprocess.Popen(shlex.split(statement.pipe_to), stdin=subproc_stdin) + self.pipe_proc = subprocess.Popen(statement.pipe_to, stdin=subproc_stdin) except Exception as ex: # Restore stdout to what it was and close the pipe self.stdout.close() @@ -2139,24 +1830,30 @@ class Cmd(cmd.Cmd): raise ex elif statement.output: if (not statement.output_to) and (not can_clip): - raise EnvironmentError('Cannot redirect to paste buffer; install ``xclip`` and re-run to enable') + raise EnvironmentError("Cannot redirect to paste buffer; install 'pyperclip' and re-run to enable") self.kept_state = Statekeeper(self, ('stdout',)) self.kept_sys = Statekeeper(sys, ('stdout',)) self.redirecting = True if statement.output_to: + # going to a file mode = 'w' - if statement.output == 2 * self.redirector: + # statement.output can only contain + # REDIRECTION_APPEND or REDIRECTION_OUTPUT + if statement.output == constants.REDIRECTION_APPEND: mode = 'a' sys.stdout = self.stdout = open(os.path.expanduser(statement.output_to), mode) else: + # going to a paste buffer sys.stdout = self.stdout = tempfile.TemporaryFile(mode="w+") - if statement.output == '>>': + if statement.output == constants.REDIRECTION_APPEND: self.poutput(get_paste_buffer()) def _restore_output(self, statement): - """Handles restoring state after output redirection as well as the actual pipe operation if present. + """Handles restoring state after output redirection as well as + the actual pipe operation if present. - :param statement: Statement object which contains the parsed input from the user + :param statement: Statement object which contains the parsed + input from the user """ # If we have redirected output to a file or the clipboard or piped it to a shell command, then restore state if self.kept_state is not None: @@ -2434,23 +2131,26 @@ Usage: Usage: alias [name] | [<name> <value>] name = arglist[0] value = ' '.join(arglist[1:]) - # Check for a valid name - for cur_char in name: - if cur_char not in self.identchars: - self.perror("Alias names can only contain the following characters: {}".format(self.identchars), - traceback_war=False) - return - - # Set the alias - self.aliases[name] = value - self.poutput("Alias {!r} created".format(name)) + # Validate the alias to ensure it doesn't include weird characters + # like terminators, output redirection, or whitespace + valid, invalidchars = self.statement_parser.is_valid_command(name) + if valid: + # Set the alias + self.aliases[name] = value + self.poutput("Alias {!r} created".format(name)) + else: + errmsg = "Aliases can not contain: {}".format(invalidchars) + self.perror(errmsg, traceback_war=False) def complete_alias(self, text, line, begidx, endidx): """ Tab completion for alias """ + alias_names = set(self.aliases.keys()) + visible_commands = set(self.get_visible_commands()) + index_dict = \ { - 1: self.aliases, - 2: self.get_visible_commands() + 1: alias_names, + 2: list(alias_names | visible_commands) } return self.index_based_complete(text, line, begidx, endidx, index_dict, self.path_complete) @@ -2670,9 +2370,12 @@ Usage: Usage: unalias [-a] name [name ...] self.poutput(' %2d. %s\n' % (idx + 1, text)) while True: response = input(prompt) - hlen = readline.get_current_history_length() - if hlen >= 1 and response != '': - readline.remove_history_item(hlen - 1) + + if rl_type != RlType.NONE: + hlen = readline.get_current_history_length() + if hlen >= 1 and response != '': + readline.remove_history_item(hlen - 1) + try: response = int(response) result = fulloptions[response - 1][0] @@ -3014,34 +2717,7 @@ a..b, a:b, a:, ..b items by indices (inclusive) except Exception as e: self.perror('Saving {!r} - {}'.format(args.output_file, e), traceback_war=False) elif args.transcript: - # Make sure echo is on so commands print to standard out - saved_echo = self.echo - self.echo = True - - # Redirect stdout to the transcript file - saved_self_stdout = self.stdout - self.stdout = open(args.transcript, 'w') - - # Run all of the commands in the history with output redirected to transcript and echo on - self.runcmds_plus_hooks(history) - - # Restore stdout to its original state - self.stdout.close() - self.stdout = saved_self_stdout - - # Set echo back to its original state - self.echo = saved_echo - - # Post-process the file to escape un-escaped "/" regex escapes - with open(args.transcript, 'r') as fin: - data = fin.read() - post_processed_data = data.replace('/', '\/') - with open(args.transcript, 'w') as fout: - fout.write(post_processed_data) - - plural = 's' if len(history) > 1 else '' - self.pfeedback('{} command{} and outputs saved to transcript file {!r}'.format(len(history), plural, - args.transcript)) + self._generate_transcript(history, args.transcript) else: # Display the history items retrieved for hi in history: @@ -3050,6 +2726,68 @@ a..b, a:b, a:, ..b items by indices (inclusive) else: self.poutput(hi.pr()) + def _generate_transcript(self, history, transcript_file): + """Generate a transcript file from a given history of commands.""" + # Save the current echo state, and turn it off. We inject commands into the + # output using a different mechanism + saved_echo = self.echo + self.echo = False + + # Redirect stdout to the transcript file + saved_self_stdout = self.stdout + + # The problem with supporting regular expressions in transcripts + # is that they shouldn't be processed in the command, just the output. + # In addition, when we generate a transcript, any slashes in the output + # are not really intended to indicate regular expressions, so they should + # be escaped. + # + # We have to jump through some hoops here in order to catch the commands + # separately from the output and escape the slashes in the output. + transcript = '' + for history_item in history: + # build the command, complete with prompts. When we replay + # the transcript, we look for the prompts to separate + # the command from the output + first = True + command = '' + for line in history_item.splitlines(): + if first: + command += '{}{}\n'.format(self.prompt, line) + first = False + else: + command += '{}{}\n'.format(self.continuation_prompt, line) + transcript += command + # create a new string buffer and set it to stdout to catch the output + # of the command + membuf = io.StringIO() + self.stdout = membuf + # then run the command and let the output go into our buffer + self.onecmd_plus_hooks(history_item) + # rewind the buffer to the beginning + membuf.seek(0) + # get the output out of the buffer + output = membuf.read() + # and add the regex-escaped output to the transcript + transcript += output.replace('/', '\/') + + # Restore stdout to its original state + self.stdout = saved_self_stdout + # Set echo back to its original state + self.echo = saved_echo + + # finally, we can write the transcript out to the file + with open(transcript_file, 'w') as fout: + fout.write(transcript) + + # and let the user know what we did + if len(history) > 1: + plural = 'commands and their outputs' + else: + plural = 'command and its output' + msg = '{} {} saved to transcript file {!r}' + self.pfeedback(msg.format(len(history), plural, transcript_file)) + @with_argument_list def do_edit(self, arglist): """Edit a file in a text editor. |