diff options
Diffstat (limited to 'cmd2/cmd2.py')
-rw-r--r-- | cmd2/cmd2.py | 233 |
1 files changed, 180 insertions, 53 deletions
diff --git a/cmd2/cmd2.py b/cmd2/cmd2.py index 0ea3d8cd..ed478b0d 100644 --- a/cmd2/cmd2.py +++ b/cmd2/cmd2.py @@ -40,6 +40,7 @@ import platform import re import shlex import sys +import threading from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple, Type, Union from . import constants @@ -50,7 +51,7 @@ from .clipboard import can_clip, get_paste_buffer, write_to_paste_buffer from .parsing import StatementParser, Statement # Set up readline -from .rl_utils import rl_type, RlType +from .rl_utils import rl_type, RlType, rl_get_point, rl_set_prompt, vt100_support, rl_make_safe_prompt if rl_type == RlType.NONE: # pragma: no cover rl_warning = "Readline features including tab completion have been disabled since no \n" \ "supported version of readline was found. To resolve this, install \n" \ @@ -527,6 +528,11 @@ class Cmd(cmd.Cmd): # This determines if a non-zero exit code should be used when exiting the application self.exit_code = None + # This lock should be acquired before doing any asynchronous changes to the terminal to + # ensure the updates to the terminal don't interfere with the input being typed. It can be + # acquired any time there is a readline prompt on screen. + self.terminal_lock = threading.RLock() + # ----- Methods related to presenting output to the user ----- @property @@ -1636,12 +1642,6 @@ class Cmd(cmd.Cmd): # Re-raise a KeyboardInterrupt so other parts of the code can catch it raise KeyboardInterrupt("Got a keyboard interrupt") - def preloop(self) -> None: - """Hook method executed once when the cmdloop() method is called.""" - import signal - # Register a default SIGINT signal handler for Ctrl+C - signal.signal(signal.SIGINT, self.sigint_handler) - def precmd(self, statement: Statement) -> Statement: """Hook method executed just before the command is processed by ``onecmd()`` and after adding it to the history. @@ -2011,34 +2011,6 @@ class Cmd(cmd.Cmd): # Print out a message stating this is an unknown command self.poutput('*** Unknown syntax: {}\n'.format(arg)) - @staticmethod - def _surround_ansi_escapes(prompt: str, start: str="\x01", end: str="\x02") -> str: - """Overcome bug in GNU Readline in relation to calculation of prompt length in presence of ANSI escape codes. - - :param prompt: original prompt - :param start: start code to tell GNU Readline about beginning of invisible characters - :param end: end code to tell GNU Readline about end of invisible characters - :return: prompt safe to pass to GNU Readline - """ - # Windows terminals don't use ANSI escape codes and Windows readline isn't based on GNU Readline - if sys.platform == "win32": - return prompt - - escaped = False - result = "" - - for c in prompt: - if c == "\x1b" and not escaped: - result += start + c - escaped = True - elif c.isalpha() and escaped: - result += c + end - escaped = False - else: - result += c - - return result - def pseudo_raw_input(self, prompt: str) -> str: """Began life as a copy of cmd's cmdloop; like raw_input but @@ -2047,23 +2019,37 @@ class Cmd(cmd.Cmd): to decide whether to print the prompt and the input """ - # Deal with the vagaries of readline and ANSI escape codes - safe_prompt = self._surround_ansi_escapes(prompt) + # Temporarily save over self.prompt to reflect what will be on screen + orig_prompt = self.prompt + self.prompt = prompt if self.use_rawinput: try: if sys.stdin.isatty(): + # Wrap in try since terminal_lock may not be locked when this function is called from unit tests + try: + # A prompt is about to be drawn. Allow asynchronous changes to the terminal. + self.terminal_lock.release() + except RuntimeError: + pass + + # Deal with the vagaries of readline and ANSI escape codes + safe_prompt = rl_make_safe_prompt(prompt) line = input(safe_prompt) else: line = input() if self.echo: - sys.stdout.write('{}{}\n'.format(safe_prompt, line)) + sys.stdout.write('{}{}\n'.format(self.prompt, line)) except EOFError: line = 'eof' + finally: + if sys.stdin.isatty(): + # The prompt is gone. Do not allow asynchronous changes to the terminal. + self.terminal_lock.acquire() else: if self.stdin.isatty(): # on a tty, print the prompt first, then read the line - self.poutput(safe_prompt, end='') + self.poutput(self.prompt, end='') self.stdout.flush() line = self.stdin.readline() if len(line) == 0: @@ -2076,9 +2062,13 @@ class Cmd(cmd.Cmd): if len(line): # we read something, output the prompt and the something if self.echo: - self.poutput('{}{}'.format(safe_prompt, line)) + self.poutput('{}{}'.format(self.prompt, line)) else: line = 'eof' + + # Restore prompt + self.prompt = orig_prompt + return line.strip() def _cmdloop(self) -> bool: @@ -2453,7 +2443,8 @@ class Cmd(cmd.Cmd): for (idx, (_, text)) in enumerate(fulloptions): self.poutput(' %2d. %s\n' % (idx + 1, text)) while True: - response = input(prompt) + safe_prompt = rl_make_safe_prompt(prompt) + response = input(safe_prompt) if rl_type != RlType.NONE: hlen = readline.get_current_history_length() @@ -3136,6 +3127,125 @@ Script should contain one command per line, just like command would be typed in runner = unittest.TextTestRunner() runner.run(testcase) + def _clear_input_lines_str(self) -> str: # pragma: no cover + """ + Returns a string that if printed will clear the prompt and input lines in the terminal, + leaving the cursor at the beginning of the first input line + :return: the string to print + """ + if not (vt100_support and self.use_rawinput): + return '' + + import shutil + import colorama.ansi as ansi + from colorama import Cursor + + visible_prompt = self.visible_prompt + + # Get the size of the terminal + terminal_size = shutil.get_terminal_size() + + # Figure out how many lines the prompt and user input take up + total_str_size = len(visible_prompt) + len(readline.get_line_buffer()) + num_input_lines = int(total_str_size / terminal_size.columns) + 1 + + # Get the cursor's offset from the beginning of the first input line + cursor_input_offset = len(visible_prompt) + rl_get_point() + + # Calculate what input line the cursor is on + cursor_input_line = int(cursor_input_offset / terminal_size.columns) + 1 + + # Create a string that will clear all input lines and print the alert + terminal_str = '' + + # Move the cursor down to the last input line + if cursor_input_line != num_input_lines: + terminal_str += Cursor.DOWN(num_input_lines - cursor_input_line) + + # Clear each input line from the bottom up so that the cursor ends up on the original first input line + terminal_str += (ansi.clear_line() + Cursor.UP(1)) * (num_input_lines - 1) + terminal_str += ansi.clear_line() + + # Move the cursor to the beginning of the first input line + terminal_str += '\r' + + return terminal_str + + def async_alert(self, alert_msg: str, new_prompt: Optional[str] = None) -> None: # pragma: no cover + """ + Used to display an important message to the user while they are at the prompt in between commands. + To the user it appears as if an alert message is printed above the prompt and their current input + text and cursor location is left alone. + + IMPORTANT: Do not call this unless you have acquired self.terminal_lock + first, which ensures a prompt is onscreen + + :param alert_msg: the message to display to the user + :param new_prompt: if you also want to change the prompt that is displayed, then include it here + see async_update_prompt() docstring for guidance on updating a prompt + :raises RuntimeError if called while another thread holds terminal_lock + """ + if not (vt100_support and self.use_rawinput): + return + + # Sanity check that can't fail if self.terminal_lock was acquired before calling this function + if self.terminal_lock.acquire(blocking=False): + + # Generate a string to clear the prompt and input lines and replace with the alert + terminal_str = self._clear_input_lines_str() + if alert_msg: + terminal_str += alert_msg + '\n' + + # Set the new prompt now that _clear_input_lines_str is done using the old prompt + if new_prompt is not None: + self.prompt = new_prompt + rl_set_prompt(self.prompt) + + # Print terminal_str to erase the lines + if rl_type == RlType.GNU: + sys.stderr.write(terminal_str) + elif rl_type == RlType.PYREADLINE: + readline.rl.mode.console.write(terminal_str) + + # Redraw the prompt and input lines + rl_force_redisplay() + + self.terminal_lock.release() + + else: + raise RuntimeError("another thread holds terminal_lock") + + def async_update_prompt(self, new_prompt: str) -> None: # pragma: no cover + """ + Updates the prompt while the user is still typing at it. This is good for alerting the user to system + changes dynamically in between commands. For instance you could alter the color of the prompt to indicate + a system status or increase a counter to report an event. If you do alter the actual text of the prompt, + it is best to keep the prompt the same width as what's on screen. Otherwise the user's input text will + be shifted and the update will not be seamless. + + IMPORTANT: Do not call this unless you have acquired self.terminal_lock + first, which ensures a prompt is onscreen + + :param new_prompt: what to change the prompt to + """ + self.async_alert('', new_prompt) + + @staticmethod + def set_window_title(title: str) -> None: # pragma: no cover + """ + Sets the terminal window title + :param title: the new window title + """ + if not vt100_support: + return + + import colorama.ansi as ansi + try: + sys.stderr.write(ansi.set_title(title)) + except AttributeError: + # Debugging in Pycharm has issues with setting terminal title + pass + def cmdloop(self, intro: Optional[str]=None) -> None: """This is an outer wrapper around _cmdloop() which deals with extra features provided by cmd2. @@ -3161,6 +3271,14 @@ Script should contain one command per line, just like command would be typed in if callargs: self.cmdqueue.extend(callargs) + # Register a SIGINT signal handler for Ctrl+C + import signal + original_sigint_handler = signal.getsignal(signal.SIGINT) + signal.signal(signal.SIGINT, self.sigint_handler) + + # Grab terminal lock before the prompt has been drawn by readline + self.terminal_lock.acquire() + # Always run the preloop first for func in self._preloop_hooks: func() @@ -3186,6 +3304,13 @@ Script should contain one command per line, just like command would be typed in func() self.postloop() + # Release terminal lock now that postloop code should have stopped any terminal updater threads + # This will also zero the lock count in case cmdloop() is called again + self.terminal_lock.release() + + # Restore the original signal handler + signal.signal(signal.SIGINT, original_sigint_handler) + if self.exit_code is not None: sys.exit(self.exit_code) @@ -3194,7 +3319,7 @@ Script should contain one command per line, just like command would be typed in # plugin related functions # ### - def _initialize_plugin_system(self): + def _initialize_plugin_system(self) -> None: """Initialize the plugin system""" self._preloop_hooks = [] self._postloop_hooks = [] @@ -3204,7 +3329,7 @@ Script should contain one command per line, just like command would be typed in self._cmdfinalization_hooks = [] @classmethod - def _validate_callable_param_count(cls, func: Callable, count: int): + def _validate_callable_param_count(cls, func: Callable, count: int) -> None: """Ensure a function has the given number of parameters.""" signature = inspect.signature(func) # validate that the callable has the right number of parameters @@ -3217,7 +3342,7 @@ Script should contain one command per line, just like command would be typed in )) @classmethod - def _validate_prepostloop_callable(cls, func: Callable): + def _validate_prepostloop_callable(cls, func: Callable[[None], None]) -> None: """Check parameter and return types for preloop and postloop hooks.""" cls._validate_callable_param_count(func, 0) # make sure there is no return notation @@ -3227,18 +3352,18 @@ Script should contain one command per line, just like command would be typed in func.__name__, )) - def register_preloop_hook(self, func: Callable): + def register_preloop_hook(self, func: Callable[[None], None]) -> None: """Register a function to be called at the beginning of the command loop.""" self._validate_prepostloop_callable(func) self._preloop_hooks.append(func) - def register_postloop_hook(self, func: Callable): + def register_postloop_hook(self, func: Callable[[None], None]) -> None: """Register a function to be called at the end of the command loop.""" self._validate_prepostloop_callable(func) self._postloop_hooks.append(func) @classmethod - def _validate_postparsing_callable(cls, func: Callable): + def _validate_postparsing_callable(cls, func: Callable[[plugin.PostparsingData], plugin.PostparsingData]) -> None: """Check parameter and return types for postparsing hooks""" cls._validate_callable_param_count(func, 1) signature = inspect.signature(func) @@ -3252,13 +3377,13 @@ Script should contain one command per line, just like command would be typed in func.__name__ )) - def register_postparsing_hook(self, func: Callable): + def register_postparsing_hook(self, func: Callable[[plugin.PostparsingData], plugin.PostparsingData]) -> None: """Register a function to be called after parsing user input but before running the command""" self._validate_postparsing_callable(func) self._postparsing_hooks.append(func) @classmethod - def _validate_prepostcmd_hook(cls, func: Callable, data_type: Type): + def _validate_prepostcmd_hook(cls, func: Callable, data_type: Type) -> None: """Check parameter and return types for pre and post command hooks.""" signature = inspect.signature(func) # validate that the callable has the right number of parameters @@ -3285,18 +3410,19 @@ Script should contain one command per line, just like command would be typed in data_type, )) - def register_precmd_hook(self, func: Callable): + def register_precmd_hook(self, func: Callable[[plugin.PrecommandData], plugin.PrecommandData]) -> None: """Register a hook to be called before the command function.""" self._validate_prepostcmd_hook(func, plugin.PrecommandData) self._precmd_hooks.append(func) - def register_postcmd_hook(self, func: Callable): + def register_postcmd_hook(self, func: Callable[[plugin.PostcommandData], plugin.PostcommandData]) -> None: """Register a hook to be called after the command function.""" self._validate_prepostcmd_hook(func, plugin.PostcommandData) self._postcmd_hooks.append(func) @classmethod - def _validate_cmdfinalization_callable(cls, func: Callable): + def _validate_cmdfinalization_callable(cls, func: Callable[[plugin.CommandFinalizationData], + plugin.CommandFinalizationData]) -> None: """Check parameter and return types for command finalization hooks.""" cls._validate_callable_param_count(func, 1) signature = inspect.signature(func) @@ -3308,7 +3434,8 @@ Script should contain one command per line, just like command would be typed in raise TypeError("{} must declare return a return type of " "'cmd2.plugin.CommandFinalizationData'".format(func.__name__)) - def register_cmdfinalization_hook(self, func: Callable): + def register_cmdfinalization_hook(self, func: Callable[[plugin.CommandFinalizationData], + plugin.CommandFinalizationData]) -> None: """Register a hook to be called after a command is completed, whether it completes successfully or not.""" self._validate_cmdfinalization_callable(func) self._cmdfinalization_hooks.append(func) |