diff options
Diffstat (limited to 'cmd2/cmd2.py')
-rw-r--r-- | cmd2/cmd2.py | 362 |
1 files changed, 193 insertions, 169 deletions
diff --git a/cmd2/cmd2.py b/cmd2/cmd2.py index e5c2ac44..eb2d4c15 100644 --- a/cmd2/cmd2.py +++ b/cmd2/cmd2.py @@ -27,7 +27,7 @@ Git repository on GitHub at https://github.com/python-cmd2/cmd2 # import this module, many of these imports are lazy-loaded # i.e. we only import the module when we use it # For example, we don't import the 'traceback' module -# until the perror() function is called and the debug +# until the pexcept() function is called and the debug # setting is True import argparse import cmd @@ -40,11 +40,11 @@ import sys import threading from collections import namedtuple from contextlib import redirect_stdout -from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple, Type, Union, IO +from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple, Type, Union import colorama -from colorama import Fore +from . import ansi from . import constants from . import plugin from . import utils @@ -60,7 +60,7 @@ if rl_type == RlType.NONE: # pragma: no cover rl_warning = "Readline features including tab completion have been disabled since no \n" \ "supported version of readline was found. To resolve this, install \n" \ "pyreadline on Windows or gnureadline on Mac.\n\n" - sys.stderr.write(Fore.LIGHTYELLOW_EX + rl_warning + Fore.RESET) + sys.stderr.write(ansi.style_warning(rl_warning)) else: from .rl_utils import rl_force_redisplay, readline @@ -337,20 +337,20 @@ class Cmd(cmd.Cmd): terminators: Optional[List[str]] = None, shortcuts: Optional[Dict[str, str]] = None) -> None: """An easy but powerful framework for writing line-oriented command interpreters, extends Python's cmd package. - :param completekey: (optional) readline name of a completion key, default to Tab - :param stdin: (optional) alternate input file object, if not specified, sys.stdin is used - :param stdout: (optional) alternate output file object, if not specified, sys.stdout is used - :param persistent_history_file: (optional) file path to load a persistent cmd2 command history from - :param persistent_history_length: (optional) max number of history items to write to the persistent history file - :param startup_script: (optional) file path to a script to execute at startup - :param use_ipython: (optional) should the "ipy" command be included for an embedded IPython shell - :param allow_cli_args: (optional) if True, then cmd2 will process command line arguments as either - commands to be run or, if -t is specified, transcript files to run. - This should be set to False if your application parses its own arguments. - :param transcript_files: (optional) allows running transcript tests when allow_cli_args is False - :param allow_redirection: (optional) should output redirection and pipes be allowed - :param multiline_commands: (optional) list of commands allowed to accept multi-line input - :param shortcuts: (optional) dictionary containing shortcuts for commands + :param completekey: readline name of a completion key, default to Tab + :param stdin: alternate input file object, if not specified, sys.stdin is used + :param stdout: alternate output file object, if not specified, sys.stdout is used + :param persistent_history_file: file path to load a persistent cmd2 command history from + :param persistent_history_length: max number of history items to write to the persistent history file + :param startup_script: file path to a script to execute at startup + :param use_ipython: should the "ipy" command be included for an embedded IPython shell + :param allow_cli_args: if True, then cmd2 will process command line arguments as either + commands to be run or, if -t is specified, transcript files to run. + This should be set to False if your application parses its own arguments. + :param transcript_files: allow running transcript tests when allow_cli_args is False + :param allow_redirection: should output redirection and pipes be allowed + :param multiline_commands: list of commands allowed to accept multi-line input + :param shortcuts: dictionary containing shortcuts for commands """ # If use_ipython is False, make sure the do_ipy() method doesn't exit if not use_ipython: @@ -374,7 +374,6 @@ class Cmd(cmd.Cmd): self.quit_on_sigint = False # Quit the loop on interrupt instead of just resetting prompt # Attributes which ARE dynamically settable at runtime - self.colors = constants.COLORS_TERMINAL self.continuation_prompt = '> ' self.debug = False self.echo = False @@ -385,16 +384,23 @@ class Cmd(cmd.Cmd): self.timing = False # Prints elapsed time for each command # To make an attribute settable with the "do_set" command, add it to this ... - self.settable = {'colors': 'Allow colorized output (valid values: Terminal, Always, Never)', - 'continuation_prompt': 'On 2nd+ line of input', - 'debug': 'Show full error stack on error', - 'echo': 'Echo command issued into output', - 'editor': 'Program used by ``edit``', - 'feedback_to_output': 'Include nonessentials in `|`, `>` results', - 'locals_in_py': 'Allow access to your application in py via self', - 'prompt': 'The prompt issued to solicit input', - 'quiet': "Don't print nonessential feedback", - 'timing': 'Report execution times'} + self.settable = \ + { + # allow_ansi is a special case in which it's an application-wide setting defined in ansi.py + 'allow_ansi': ('Allow ANSI escape sequences in output ' + '(valid values: {}, {}, {})'.format(ansi.ANSI_TERMINAL, + ansi.ANSI_ALWAYS, + ansi.ANSI_NEVER)), + 'continuation_prompt': 'On 2nd+ line of input', + 'debug': 'Show full error stack on error', + 'echo': 'Echo command issued into output', + 'editor': 'Program used by ``edit``', + 'feedback_to_output': 'Include nonessentials in `|`, `>` results', + 'locals_in_py': 'Allow access to your application in py via self', + 'prompt': 'The prompt issued to solicit input', + 'quiet': "Don't print nonessential feedback", + 'timing': 'Report execution times' + } # Commands to exclude from the help menu and tab completion self.hidden_commands = ['eof', '_relative_load', '_relative_run_script'] @@ -551,6 +557,25 @@ class Cmd(cmd.Cmd): # ----- Methods related to presenting output to the user ----- @property + def allow_ansi(self) -> str: + """Read-only property needed to support do_set when it reads allow_ansi""" + return ansi.allow_ansi + + @allow_ansi.setter + def allow_ansi(self, new_val: str) -> None: + """Setter property needed to support do_set when it updates allow_ansi""" + new_val = new_val.lower() + if new_val == ansi.ANSI_TERMINAL.lower(): + ansi.allow_ansi = ansi.ANSI_TERMINAL + elif new_val == ansi.ANSI_ALWAYS.lower(): + ansi.allow_ansi = ansi.ANSI_ALWAYS + elif new_val == ansi.ANSI_NEVER.lower(): + ansi.allow_ansi = ansi.ANSI_NEVER + else: + self.perror('Invalid value: {} (valid values: {}, {}, {})'.format(new_val, ansi.ANSI_TERMINAL, + ansi.ANSI_ALWAYS, ansi.ANSI_NEVER)) + + @property def visible_prompt(self) -> str: """Read-only property to get the visible prompt with any ANSI escape codes stripped. @@ -559,7 +584,7 @@ class Cmd(cmd.Cmd): :return: prompt stripped of any ANSI escape codes """ - return utils.strip_ansi(self.prompt) + return ansi.strip_ansi(self.prompt) @property def aliases(self) -> Dict[str, str]: @@ -576,69 +601,67 @@ class Cmd(cmd.Cmd): """Setter for the allow_redirection property that determines whether or not redirection of stdout is allowed.""" self._statement_parser.allow_redirection = value - def _decolorized_write(self, fileobj: IO, msg: str) -> None: - """Write a string to a fileobject, stripping ANSI escape sequences if necessary - - Honor the current colors setting, which requires us to check whether the - fileobject is a tty. - """ - if self.colors.lower() == constants.COLORS_NEVER.lower() or \ - (self.colors.lower() == constants.COLORS_TERMINAL.lower() and not fileobj.isatty()): - msg = utils.strip_ansi(msg) - fileobj.write(msg) - - def poutput(self, msg: Any, end: str = '\n', color: str = '') -> None: - """Smarter self.stdout.write(); color aware and adds newline of not present. + def poutput(self, msg: Any, *, end: str = '\n') -> None: + """Print message to self.stdout and appends a newline by default Also handles BrokenPipeError exceptions for when a commands's output has been piped to another process and that process terminates before the cmd2 command is finished executing. - :param msg: message to print to current stdout (anything convertible to a str with '{}'.format() is OK) - :param end: (optional) string appended after the end of the message if not already present, default a newline - :param color: (optional) color escape to output this message with + :param msg: message to print (anything convertible to a str with '{}'.format() is OK) + :param end: string appended after the end of the message, default a newline """ - if msg is not None and msg != '': - try: - msg_str = '{}'.format(msg) - if not msg_str.endswith(end): - msg_str += end - if color: - msg_str = color + msg_str + Fore.RESET - self._decolorized_write(self.stdout, msg_str) - except BrokenPipeError: - # This occurs if a command's output is being piped to another - # process and that process closes before the command is - # finished. If you would like your application to print a - # warning message, then set the broken_pipe_warning attribute - # to the message you want printed. - if self.broken_pipe_warning: - sys.stderr.write(self.broken_pipe_warning) + try: + ansi.ansi_aware_write(self.stdout, "{}{}".format(msg, end)) + except BrokenPipeError: + # This occurs if a command's output is being piped to another + # process and that process closes before the command is + # finished. If you would like your application to print a + # warning message, then set the broken_pipe_warning attribute + # to the message you want printed. + if self.broken_pipe_warning: + sys.stderr.write(self.broken_pipe_warning) + + def perror(self, msg: Any, *, end: str = '\n', apply_style: bool = True) -> None: + """Print message to sys.stderr + + :param msg: message to print (anything convertible to a str with '{}'.format() is OK) + :param end: string appended after the end of the message, default a newline + :param apply_style: If True, then ansi.style_error will be applied to the message text. Set to False in cases + where the message text already has the desired style. Defaults to True. + """ + if apply_style: + final_msg = ansi.style_error(msg) + else: + final_msg = "{}".format(msg) + ansi.ansi_aware_write(sys.stderr, final_msg + end) - def perror(self, err: Union[str, Exception], traceback_war: bool = True, err_color: str = Fore.LIGHTRED_EX, - war_color: str = Fore.LIGHTYELLOW_EX) -> None: - """ Print error message to sys.stderr and if debug is true, print an exception Traceback if one exists. + def pexcept(self, msg: Any, *, end: str = '\n', apply_style: bool = True) -> None: + """Print Exception message to sys.stderr. If debug is true, print exception traceback if one exists. - :param err: an Exception or error message to print out - :param traceback_war: (optional) if True, print a message to let user know they can enable debug - :param err_color: (optional) color escape to output error with - :param war_color: (optional) color escape to output warning with + :param msg: message or Exception to print + :param end: string appended after the end of the message, default a newline + :param apply_style: If True, then ErrorStyle will be applied to the message text. Set to False in cases + where the message text already has the desired style. Defaults to True. """ if self.debug and sys.exc_info() != (None, None, None): import traceback traceback.print_exc() - if isinstance(err, Exception): - err_msg = "EXCEPTION of type '{}' occurred with message: '{}'\n".format(type(err).__name__, err) + if isinstance(msg, Exception): + final_msg = "EXCEPTION of type '{}' occurred with message: '{}'".format(type(msg).__name__, msg) else: - err_msg = "{}\n".format(err) - err_msg = err_color + err_msg + Fore.RESET - self._decolorized_write(sys.stderr, err_msg) + final_msg = "{}".format(msg) + + if apply_style: + final_msg = ansi.style_error(final_msg) + + if not self.debug: + warning = "\nTo enable full traceback, run the following command: 'set debug true'" + final_msg += ansi.style_warning(warning) - if traceback_war and not self.debug: - war = "To enable full traceback, run the following command: 'set debug true'\n" - war = war_color + war + Fore.RESET - self._decolorized_write(sys.stderr, war) + # Set apply_style to False since style has already been applied + self.perror(final_msg, end=end, apply_style=False) def pfeedback(self, msg: str) -> None: """For printing nonessential feedback. Can be silenced with `quiet`. @@ -647,7 +670,7 @@ class Cmd(cmd.Cmd): if self.feedback_to_output: self.poutput(msg) else: - self._decolorized_write(sys.stderr, "{}\n".format(msg)) + ansi.ansi_aware_write(sys.stderr, "{}\n".format(msg)) def ppaged(self, msg: str, end: str = '\n', chop: bool = False) -> None: """Print output using a pager if it would go off screen and stdout isn't currently being redirected. @@ -683,8 +706,8 @@ class Cmd(cmd.Cmd): # Don't attempt to use a pager that can block if redirecting or running a script (either text or Python) # Also only attempt to use a pager if actually running in a real fully functional terminal if functional_terminal and not self._redirecting and not self._in_py and not self._script_dir: - if self.colors.lower() == constants.COLORS_NEVER.lower(): - msg_str = utils.strip_ansi(msg_str) + if ansi.allow_ansi.lower() == ansi.ANSI_NEVER.lower(): + msg_str = ansi.strip_ansi(msg_str) pager = self.pager if chop: @@ -696,7 +719,7 @@ class Cmd(cmd.Cmd): pipe_proc = subprocess.Popen(pager, shell=True, stdin=subprocess.PIPE) pipe_proc.communicate(msg_str.encode('utf-8', 'replace')) else: - self._decolorized_write(self.stdout, msg_str) + ansi.ansi_aware_write(self.stdout, msg_str) except BrokenPipeError: # This occurs if a command's output is being piped to another process and that process closes before the # command is finished. If you would like your application to print a warning message, then set the @@ -1269,7 +1292,7 @@ class Cmd(cmd.Cmd): longest_match_length = 0 for cur_match in matches_to_display: - cur_length = utils.ansi_safe_wcswidth(cur_match) + cur_length = ansi.ansi_safe_wcswidth(cur_match) if cur_length > longest_match_length: longest_match_length = cur_length else: @@ -1562,7 +1585,7 @@ class Cmd(cmd.Cmd): try: return self._complete_worker(text, state) except Exception as e: - self.perror(e) + self.pexcept(e) return None def _autocomplete_default(self, text: str, line: str, begidx: int, endidx: int, @@ -1676,8 +1699,8 @@ class Cmd(cmd.Cmd): except EmptyStatement: return self._run_cmdfinalization_hooks(stop, None) except ValueError as ex: - # If shlex.split failed on syntax, let user know whats going on - self.perror("Invalid syntax: {}".format(ex), traceback_war=False) + # If shlex.split failed on syntax, let user know what's going on + self.pexcept("Invalid syntax: {}".format(ex)) return stop # now that we have a statement, run it with all the hooks @@ -1762,7 +1785,7 @@ class Cmd(cmd.Cmd): # don't do anything, but do allow command finalization hooks to run pass except Exception as ex: - self.perror(ex) + self.pexcept(ex) finally: return self._run_cmdfinalization_hooks(stop, statement) @@ -1785,7 +1808,7 @@ class Cmd(cmd.Cmd): # modifications to the statement return data.stop except Exception as ex: - self.perror(ex) + self.pexcept(ex) def runcmds_plus_hooks(self, cmds: List[Union[HistoryItem, str]]) -> bool: """ @@ -1925,9 +1948,12 @@ class Cmd(cmd.Cmd): # Make sure enough arguments were passed in if len(statement.arg_list) < macro.minimum_arg_count: - self.perror("The macro '{}' expects at least {} argument(s)".format(statement.command, - macro.minimum_arg_count), - traceback_war=False) + self.perror( + "The macro '{}' expects at least {} argument(s)".format( + statement.command, + macro.minimum_arg_count + ) + ) return None # Resolve the arguments in reverse and read their values from statement.argv since those @@ -2020,8 +2046,7 @@ class Cmd(cmd.Cmd): elif statement.output: import tempfile if (not statement.output_to) and (not self._can_clip): - self.perror("Cannot redirect to paste buffer; install 'pyperclip' and re-run to enable", - traceback_war=False) + self.perror("Cannot redirect to paste buffer; install 'pyperclip' and re-run to enable") redir_error = True elif statement.output_to: @@ -2036,7 +2061,7 @@ class Cmd(cmd.Cmd): saved_state.redirecting = True sys.stdout = self.stdout = new_stdout except OSError as ex: - self.perror('Failed to redirect because - {}'.format(ex), traceback_war=False) + self.pexcept('Failed to redirect because - {}'.format(ex)) redir_error = True else: # going to a paste buffer @@ -2139,7 +2164,7 @@ class Cmd(cmd.Cmd): return self.do_shell(statement.command_and_args) else: err_msg = self.default_error.format(statement.command) - self._decolorized_write(sys.stderr, "{}\n".format(err_msg)) + ansi.ansi_aware_write(sys.stderr, "{}\n".format(err_msg)) def _pseudo_raw_input(self, prompt: str) -> str: """Began life as a copy of cmd's cmdloop; like raw_input but @@ -2268,11 +2293,11 @@ class Cmd(cmd.Cmd): # Validate the alias name valid, errmsg = self._statement_parser.is_valid_command(args.name) if not valid: - self.perror("Invalid alias name: {}".format(errmsg), traceback_war=False) + self.perror("Invalid alias name: {}".format(errmsg)) return if args.name in self.macros: - self.perror("Alias cannot have the same name as a macro", traceback_war=False) + self.perror("Alias cannot have the same name as a macro") return # Unquote redirection and terminator tokens @@ -2303,7 +2328,7 @@ class Cmd(cmd.Cmd): del self.aliases[cur_name] self.poutput("Alias '{}' deleted".format(cur_name)) else: - self.perror("Alias '{}' does not exist".format(cur_name), traceback_war=False) + self.perror("Alias '{}' does not exist".format(cur_name)) def _alias_list(self, args: argparse.Namespace) -> None: """List some or all aliases""" @@ -2312,7 +2337,7 @@ class Cmd(cmd.Cmd): if cur_name in self.aliases: self.poutput("alias create {} {}".format(cur_name, self.aliases[cur_name])) else: - self.perror("Alias '{}' not found".format(cur_name), traceback_war=False) + self.perror("Alias '{}' not found".format(cur_name)) else: sorted_aliases = utils.alphabetical_sort(self.aliases) for cur_alias in sorted_aliases: @@ -2399,15 +2424,15 @@ class Cmd(cmd.Cmd): # Validate the macro name valid, errmsg = self._statement_parser.is_valid_command(args.name) if not valid: - self.perror("Invalid macro name: {}".format(errmsg), traceback_war=False) + self.perror("Invalid macro name: {}".format(errmsg)) return if args.name in self.get_all_commands(): - self.perror("Macro cannot have the same name as a command", traceback_war=False) + self.perror("Macro cannot have the same name as a command") return if args.name in self.aliases: - self.perror("Macro cannot have the same name as an alias", traceback_war=False) + self.perror("Macro cannot have the same name as an alias") return # Unquote redirection and terminator tokens @@ -2434,7 +2459,7 @@ class Cmd(cmd.Cmd): cur_num_str = (re.findall(MacroArg.digit_pattern, cur_match.group())[0]) cur_num = int(cur_num_str) if cur_num < 1: - self.perror("Argument numbers must be greater than 0", traceback_war=False) + self.perror("Argument numbers must be greater than 0") return arg_nums.add(cur_num) @@ -2448,8 +2473,9 @@ class Cmd(cmd.Cmd): # Make sure the argument numbers are continuous if len(arg_nums) != max_arg_num: - self.perror("Not all numbers between 1 and {} are present " - "in the argument placeholders".format(max_arg_num), traceback_war=False) + self.perror( + "Not all numbers between 1 and {} are present " + "in the argument placeholders".format(max_arg_num)) return # Find all escaped arguments @@ -2484,7 +2510,7 @@ class Cmd(cmd.Cmd): del self.macros[cur_name] self.poutput("Macro '{}' deleted".format(cur_name)) else: - self.perror("Macro '{}' does not exist".format(cur_name), traceback_war=False) + self.perror("Macro '{}' does not exist".format(cur_name)) def _macro_list(self, args: argparse.Namespace) -> None: """List some or all macros""" @@ -2493,7 +2519,7 @@ class Cmd(cmd.Cmd): if cur_name in self.macros: self.poutput("macro create {} {}".format(cur_name, self.macros[cur_name].value)) else: - self.perror("Macro '{}' not found".format(cur_name), traceback_war=False) + self.perror("Macro '{}' not found".format(cur_name)) else: sorted_macros = utils.alphabetical_sort(self.macros) for cur_macro in sorted_macros: @@ -2671,7 +2697,7 @@ class Cmd(cmd.Cmd): # If there is no help information then print an error elif help_func is None and (func is None or not func.__doc__): err_msg = self.help_error.format(args.command) - self._decolorized_write(sys.stderr, "{}\n".format(err_msg)) + ansi.ansi_aware_write(sys.stderr, "{}\n".format(err_msg)) # Otherwise delegate to cmd base class do_help() else: @@ -2713,12 +2739,12 @@ class Cmd(cmd.Cmd): if len(cmds_cats) == 0: # No categories found, fall back to standard behavior - self.poutput("{}\n".format(str(self.doc_leader))) + self.poutput("{}".format(str(self.doc_leader))) self._print_topics(self.doc_header, cmds_doc, verbose) else: # Categories found, Organize all commands by category - self.poutput('{}\n'.format(str(self.doc_leader))) - self.poutput('{}\n\n'.format(str(self.doc_header))) + self.poutput('{}'.format(str(self.doc_leader))) + self.poutput('{}'.format(str(self.doc_header)), end="\n\n") for category in sorted(cmds_cats.keys()): self._print_topics(category, cmds_cats[category], verbose) self._print_topics('Other', cmds_doc, verbose) @@ -2738,7 +2764,7 @@ class Cmd(cmd.Cmd): widest = 0 # measure the commands for command in cmds: - width = utils.ansi_safe_wcswidth(command) + width = ansi.ansi_safe_wcswidth(command) if width > widest: widest = width # add a 4-space pad @@ -2806,7 +2832,7 @@ class Cmd(cmd.Cmd): def do_shortcuts(self, _: argparse.Namespace) -> None: """List available shortcuts""" result = "\n".join('%s: %s' % (sc[0], sc[1]) for sc in sorted(self._statement_parser.shortcuts)) - self.poutput("Shortcuts for other commands:\n{}\n".format(result)) + self.poutput("Shortcuts for other commands:\n{}".format(result)) @with_argparser(ACArgumentParser(epilog=INTERNAL_COMMAND_EPILOG)) def do_eof(self, _: argparse.Namespace) -> bool: @@ -2845,7 +2871,7 @@ class Cmd(cmd.Cmd): except IndexError: fulloptions.append((opt[0], opt[0])) for (idx, (_, text)) in enumerate(fulloptions): - self.poutput(' %2d. %s\n' % (idx + 1, text)) + self.poutput(' %2d. %s' % (idx + 1, text)) while True: safe_prompt = rl_make_safe_prompt(prompt) response = input(safe_prompt) @@ -2862,8 +2888,8 @@ class Cmd(cmd.Cmd): result = fulloptions[choice - 1][0] break except (ValueError, IndexError): - self.poutput("{!r} isn't a valid choice. Pick a number between 1 and {}:\n".format(response, - len(fulloptions))) + self.poutput("{!r} isn't a valid choice. Pick a number between 1 and {}:".format( + response, len(fulloptions))) return result def _cmdenvironment(self) -> str: @@ -2902,8 +2928,7 @@ class Cmd(cmd.Cmd): if args.all: self.poutput('\nRead only settings:{}'.format(self._cmdenvironment())) else: - self.perror("Parameter '{}' not supported (type 'set' for list of parameters).".format(param), - traceback_war=False) + self.perror("Parameter '{}' not supported (type 'set' for list of parameters).".format(param)) set_description = ("Set a settable parameter or show current settings of parameters\n" "\n" @@ -2940,17 +2965,20 @@ class Cmd(cmd.Cmd): return self._show(args, param) # Update the settable's value - current_value = getattr(self, param) - value = utils.cast(current_value, value) - setattr(self, param, value) + orig_value = getattr(self, param) + setattr(self, param, utils.cast(orig_value, value)) - self.poutput('{} - was: {}\nnow: {}\n'.format(param, current_value, value)) + # In cases where a Python property is used to validate and update a settable's value, its value will not + # change if the passed in one is invalid. Therefore we should read its actual value back and not assume. + new_value = getattr(self, param) + + self.poutput('{} - was: {}\nnow: {}'.format(param, orig_value, new_value)) # See if we need to call a change hook for this settable - if current_value != value: + if orig_value != new_value: onchange_hook = getattr(self, '_onchange_{}'.format(param), None) if onchange_hook is not None: - onchange_hook(old=current_value, new=value) + onchange_hook(old=orig_value, new=new_value) shell_parser = ACArgumentParser() setattr(shell_parser.add_argument('command', help='the command to run'), @@ -3028,7 +3056,7 @@ class Cmd(cmd.Cmd): from .pyscript_bridge import PyscriptBridge if self._in_py: err = "Recursively entering interactive Python consoles is not allowed." - self.perror(err, traceback_war=False) + self.perror(err) return False bridge = PyscriptBridge(self) @@ -3050,8 +3078,7 @@ class Cmd(cmd.Cmd): with open(expanded_filename) as f: interp.runcode(f.read()) except OSError as ex: - error_msg = "Error opening script file '{}': {}".format(expanded_filename, ex) - self.perror(error_msg, traceback_war=False) + self.pexcept("Error opening script file '{}': {}".format(expanded_filename, ex)) def py_quit(): """Function callable from the interactive Python console to exit that environment""" @@ -3235,9 +3262,9 @@ class Cmd(cmd.Cmd): # Restore command line arguments to original state sys.argv = orig_args if args.__statement__.command == "pyscript": - self.perror("pyscript has been renamed and will be removed in the next release, " - "please use run_pyscript instead\n", - traceback_war=False, err_color=Fore.LIGHTYELLOW_EX) + warning = ("pyscript has been renamed and will be removed in the next release, " + "please use run_pyscript instead\n") + self.perror(ansi.style_warning(warning)) return py_return @@ -3367,9 +3394,8 @@ class Cmd(cmd.Cmd): if args.run: if cowardly_refuse_to_run: - self.perror("Cowardly refusing to run all previously entered commands.", traceback_war=False) - self.perror("If this is what you want to do, specify '1:' as the range of history.", - traceback_war=False) + self.perror("Cowardly refusing to run all previously entered commands.") + self.perror("If this is what you want to do, specify '1:' as the range of history.") else: return self.runcmds_plus_hooks(history) elif args.edit: @@ -3395,9 +3421,10 @@ class Cmd(cmd.Cmd): else: fobj.write('{}\n'.format(item.raw)) plural = 's' if len(history) > 1 else '' + except OSError as e: + self.pexcept('Error saving {!r} - {}'.format(args.output_file, e)) + else: self.pfeedback('{} command{} saved to {}'.format(len(history), plural, args.output_file)) - except Exception as e: - self.perror('Saving {!r} - {}'.format(args.output_file, e), traceback_war=False) elif args.transcript: self._generate_transcript(history, args.transcript) else: @@ -3442,7 +3469,7 @@ class Cmd(cmd.Cmd): pass except OSError as ex: msg = "can not read persistent history file '{}': {}" - self.perror(msg.format(hist_file, ex), traceback_war=False) + self.pexcept(msg.format(hist_file, ex)) return self.history = history @@ -3478,7 +3505,7 @@ class Cmd(cmd.Cmd): pickle.dump(self.history, fobj) except OSError as ex: msg = "can not write persistent history file '{}': {}" - self.perror(msg.format(self.persistent_history_file, ex), traceback_war=False) + self.pexcept(msg.format(self.persistent_history_file, ex)) def _generate_transcript(self, history: List[Union[HistoryItem, str]], transcript_file: str) -> None: """ @@ -3488,8 +3515,7 @@ class Cmd(cmd.Cmd): transcript_path = os.path.abspath(os.path.expanduser(transcript_file)) transcript_dir = os.path.dirname(transcript_path) if not os.path.isdir(transcript_dir) or not os.access(transcript_dir, os.W_OK): - self.perror("{!r} is not a directory or you don't have write access".format(transcript_dir), - traceback_war=False) + self.perror("{!r} is not a directory or you don't have write access".format(transcript_dir)) return commands_run = 0 @@ -3547,14 +3573,14 @@ class Cmd(cmd.Cmd): # Check if all commands ran if commands_run < len(history): warning = "Command {} triggered a stop and ended transcript generation early".format(commands_run) - self.perror(warning, err_color=Fore.LIGHTYELLOW_EX, traceback_war=False) + self.perror(ansi.style_warning(warning)) # finally, we can write the transcript out to the file try: with open(transcript_file, 'w') as fout: fout.write(transcript) except OSError as ex: - self.perror('Failed to save transcript: {}'.format(ex), traceback_war=False) + self.pexcept('Failed to save transcript: {}'.format(ex)) else: # and let the user know what we did if commands_run > 1: @@ -3622,22 +3648,22 @@ class Cmd(cmd.Cmd): try: # Make sure the path exists and we can access it if not os.path.exists(expanded_path): - self.perror("'{}' does not exist or cannot be accessed".format(expanded_path), traceback_war=False) + self.perror("'{}' does not exist or cannot be accessed".format(expanded_path)) return # Make sure expanded_path points to a file if not os.path.isfile(expanded_path): - self.perror("'{}' is not a file".format(expanded_path), traceback_war=False) + self.perror("'{}' is not a file".format(expanded_path)) return # Make sure the file is not empty if os.path.getsize(expanded_path) == 0: - self.perror("'{}' is empty".format(expanded_path), traceback_war=False) + self.perror("'{}' is empty".format(expanded_path)) return # Make sure the file is ASCII or UTF-8 encoded text if not utils.is_text_file(expanded_path): - self.perror("'{}' is not an ASCII or UTF-8 encoded text file".format(expanded_path), traceback_war=False) + self.perror("'{}' is not an ASCII or UTF-8 encoded text file".format(expanded_path)) return try: @@ -3645,7 +3671,7 @@ class Cmd(cmd.Cmd): with open(expanded_path, encoding='utf-8') as target: script_commands = target.read().splitlines() except OSError as ex: # pragma: no cover - self.perror("Problem accessing script from '{}': {}".format(expanded_path, ex)) + self.pexcept("Problem accessing script from '{}': {}".format(expanded_path, ex)) return orig_script_dir_count = len(self._script_dir) @@ -3665,9 +3691,9 @@ class Cmd(cmd.Cmd): self._script_dir.pop() finally: if args.__statement__.command == "load": - self.perror("load has been renamed and will be removed in the next release, " - "please use run_script instead\n", - traceback_war=False, err_color=Fore.LIGHTYELLOW_EX) + warning = ("load has been renamed and will be removed in the next release, " + "please use run_script instead\n") + self.perror(ansi.style_warning(warning)) # load has been deprecated do_load = do_run_script @@ -3692,9 +3718,9 @@ class Cmd(cmd.Cmd): :return: True if running of commands should stop """ if args.__statement__.command == "_relative_load": - self.perror("_relative_load has been renamed and will be removed in the next release, " - "please use _relative_run_script instead\n", - traceback_war=False, err_color=Fore.LIGHTYELLOW_EX) + warning = ("_relative_load has been renamed and will be removed in the next release, " + "please use _relative_run_script instead\n") + self.perror(ansi.style_warning(warning)) file_path = args.file_path # NOTE: Relative path is an absolute path, it is just relative to the current script directory @@ -3715,7 +3741,6 @@ class Cmd(cmd.Cmd): import time import unittest import cmd2 - from colorama import Style from .transcript import Cmd2TestCase class TestMyAppCase(Cmd2TestCase): @@ -3724,19 +3749,19 @@ class Cmd(cmd.Cmd): # Validate that there is at least one transcript file transcripts_expanded = utils.files_from_glob_patterns(transcript_paths, access=os.R_OK) if not transcripts_expanded: - self.perror('No test files found - nothing to test', traceback_war=False) + self.perror('No test files found - nothing to test') self.exit_code = -1 return verinfo = ".".join(map(str, sys.version_info[:3])) num_transcripts = len(transcripts_expanded) plural = '' if len(transcripts_expanded) == 1 else 's' - self.poutput(Style.BRIGHT + utils.center_text('cmd2 transcript test', pad='=') + Style.RESET_ALL) + self.poutput(ansi.style(utils.center_text('cmd2 transcript test', pad='='), bold=True)) self.poutput('platform {} -- Python {}, cmd2-{}, readline-{}'.format(sys.platform, verinfo, cmd2.__version__, rl_type)) self.poutput('cwd: {}'.format(os.getcwd())) self.poutput('cmd2 app: {}'.format(sys.argv[0])) - self.poutput(Style.BRIGHT + 'collected {} transcript{}\n'.format(num_transcripts, plural) + Style.RESET_ALL) + self.poutput(ansi.style('collected {} transcript{}'.format(num_transcripts, plural), bold=True)) self.__class__.testfiles = transcripts_expanded sys.argv = [sys.argv[0]] # the --test argument upsets unittest.main() @@ -3747,9 +3772,10 @@ class Cmd(cmd.Cmd): test_results = runner.run(testcase) execution_time = time.time() - start_time if test_results.wasSuccessful(): - self._decolorized_write(sys.stderr, stream.read()) + ansi.ansi_aware_write(sys.stderr, stream.read()) finish_msg = '{0} transcript{1} passed in {2:.3f} seconds'.format(num_transcripts, plural, execution_time) - self.poutput(Style.BRIGHT + utils.center_text(finish_msg, pad='=') + Style.RESET_ALL, color=Fore.GREEN) + finish_msg = ansi.style_success(utils.center_text(finish_msg, pad='=')) + self.poutput(finish_msg) else: # Strip off the initial traceback which isn't particularly useful for end users error_str = stream.read() @@ -3758,7 +3784,7 @@ class Cmd(cmd.Cmd): start = end_of_trace + file_offset # But print the transcript file name and line number followed by what was expected and what was observed - self.perror(error_str[start:], traceback_war=False) + self.perror(error_str[start:]) # Return a failure error code to support automated transcript-based testing self.exit_code = -1 @@ -3782,7 +3808,6 @@ class Cmd(cmd.Cmd): return import shutil - import colorama.ansi as ansi from colorama import Cursor # Sanity check that can't fail if self.terminal_lock was acquired before calling this function @@ -3818,14 +3843,14 @@ class Cmd(cmd.Cmd): # That will be included in the input lines calculations since that is where the cursor is. num_prompt_terminal_lines = 0 for line in prompt_lines[:-1]: - line_width = utils.ansi_safe_wcswidth(line) + line_width = ansi.ansi_safe_wcswidth(line) num_prompt_terminal_lines += int(line_width / terminal_size.columns) + 1 # Now calculate how many terminal lines are take up by the input last_prompt_line = prompt_lines[-1] - last_prompt_line_width = utils.ansi_safe_wcswidth(last_prompt_line) + last_prompt_line_width = ansi.ansi_safe_wcswidth(last_prompt_line) - input_width = last_prompt_line_width + utils.ansi_safe_wcswidth(readline.get_line_buffer()) + input_width = last_prompt_line_width + ansi.ansi_safe_wcswidth(readline.get_line_buffer()) num_input_terminal_lines = int(input_width / terminal_size.columns) + 1 @@ -3844,10 +3869,10 @@ class Cmd(cmd.Cmd): # Clear each line from the bottom up so that the cursor ends up on the first prompt line total_lines = num_prompt_terminal_lines + num_input_terminal_lines - terminal_str += (ansi.clear_line() + Cursor.UP(1)) * (total_lines - 1) + terminal_str += (colorama.ansi.clear_line() + Cursor.UP(1)) * (total_lines - 1) # Clear the first prompt line - terminal_str += ansi.clear_line() + terminal_str += colorama.ansi.clear_line() # Move the cursor to the beginning of the first prompt line and print the alert terminal_str += '\r' + alert_msg @@ -3904,8 +3929,7 @@ class Cmd(cmd.Cmd): # Sanity check that can't fail if self.terminal_lock was acquired before calling this function if self.terminal_lock.acquire(blocking=False): try: - import colorama.ansi as ansi - sys.stderr.write(ansi.set_title(title)) + sys.stderr.write(colorama.ansi.set_title(title)) except AttributeError: # Debugging in Pycharm has issues with setting terminal title pass @@ -4007,7 +4031,7 @@ class Cmd(cmd.Cmd): :param message_to_print: the message reporting that the command is disabled :param kwargs: not used """ - self._decolorized_write(sys.stderr, "{}\n".format(message_to_print)) + ansi.ansi_aware_write(sys.stderr, "{}\n".format(message_to_print)) def cmdloop(self, intro: Optional[str] = None) -> int: """This is an outer wrapper around _cmdloop() which deals with extra features provided by cmd2. @@ -4048,7 +4072,7 @@ class Cmd(cmd.Cmd): # Print the intro, if there is one, right after the preloop if self.intro is not None: - self.poutput(str(self.intro) + "\n") + self.poutput(self.intro) # And then call _cmdloop() to enter the main loop self._cmdloop() |