diff options
-rw-r--r-- | cmd2/cmd2.py | 76 | ||||
-rw-r--r-- | cmd2/rl_utils.py | 56 |
2 files changed, 98 insertions, 34 deletions
diff --git a/cmd2/cmd2.py b/cmd2/cmd2.py index e22ff9d6..d3e674de 100644 --- a/cmd2/cmd2.py +++ b/cmd2/cmd2.py @@ -40,6 +40,7 @@ import platform import re import shlex import sys +import threading from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple, Type, Union from . import constants @@ -50,7 +51,7 @@ from .clipboard import can_clip, get_paste_buffer, write_to_paste_buffer from .parsing import StatementParser, Statement # Set up readline -from .rl_utils import rl_type, RlType, rl_get_point, rl_set_prompt +from .rl_utils import rl_type, RlType, rl_get_point, rl_set_prompt, vt100_support if rl_type == RlType.NONE: # pragma: no cover rl_warning = "Readline features including tab completion have been disabled since no \n" \ "supported version of readline was found. To resolve this, install \n" \ @@ -526,6 +527,11 @@ class Cmd(cmd.Cmd): # This determines if a non-zero exit code should be used when exiting the application self.exit_code = None + # This lock should be acquired before doing any asynchronous changes to the terminal to + # ensure the updates to the terminal don't interfere with the input being typed. It can be + # acquired any time there is a readline prompt on screen. + self._terminal_lock = threading.RLock() + # ----- Methods related to presenting output to the user ----- @property @@ -1643,6 +1649,9 @@ class Cmd(cmd.Cmd): # Register a default SIGINT signal handler for Ctrl+C signal.signal(signal.SIGINT, self.sigint_handler) + # Grab the terminal lock before the prompt has been drawn by readline + self._terminal_lock.acquire() + def precmd(self, statement: Statement) -> Statement: """Hook method executed just before the command is processed by ``onecmd()`` and after adding it to the history. @@ -2114,6 +2123,8 @@ class Cmd(cmd.Cmd): if self.use_rawinput: try: if sys.stdin.isatty(): + # A prompt is about to be drawn. Allow asynchronous changes to the terminal. + self._terminal_lock.release() line = input(safe_prompt) else: line = input() @@ -2121,6 +2132,10 @@ class Cmd(cmd.Cmd): sys.stdout.write('{}{}\n'.format(safe_prompt, line)) except EOFError: line = 'eof' + finally: + if sys.stdin.isatty(): + # The prompt is gone. Do not allow asynchronous changes to the terminal. + self._terminal_lock.acquire() else: if self.stdin.isatty(): # on a tty, print the prompt first, then read the line @@ -3197,8 +3212,8 @@ Script should contain one command per line, just like command would be typed in leaving the cursor at the beginning of the first input line :return: the string to print """ - if rl_type == RlType.NONE: - return + if not (vt100_support and self.use_rawinput): + return '' import shutil import colorama.ansi as ansi @@ -3235,35 +3250,52 @@ Script should contain one command per line, just like command would be typed in return terminal_str - def print_alert(self, alert_msg: str) -> None: + def async_alert(self, alert_msg: str, new_prompt: Optional[str] = None) -> None: """ - Used to display an important message to the user while they are at the prompt. - To the user it appears as if an alert message is printed above the prompt. + Used to display an important message to the user while they are at the prompt in between commands. + To the user it appears as if an alert message is printed above the prompt and their current input + text and cursor location is left alone. + + IMPORTANT: Do not call this unless you have acquired self._terminal_lock + first, which ensures a prompt is onscreen :param alert_msg: the message to display to the user + :param new_prompt: if you also want to change the prompt that is displayed, then include it here + see async_update_prompt() docstring for guidance on updating a prompt """ - if rl_type == RlType.NONE: + if not (vt100_support and self.use_rawinput): return + if new_prompt is not None: + rl_set_prompt(new_prompt) + self.prompt = new_prompt + # Clear the prompt and input lines and replace with the alert terminal_str = self._clear_input_lines_str() terminal_str += alert_msg + '\n' if rl_type == RlType.GNU: - sys.stdout.write(terminal_str) + sys.stderr.write(terminal_str) elif rl_type == RlType.PYREADLINE: readline.rl.mode.console.write(terminal_str) # Redraw the prompt and input lines rl_force_redisplay() - def update_prompt(self, new_prompt: str) -> None: + def async_update_prompt(self, new_prompt: str) -> None: """ - Dynamically alters the prompt in the terminal + Updates the prompt while the user is still typing at it. This is good for alerting the user to system + changes dynamically in between commands. For instance you could alter the color of the prompt to indicate + a system status or increase a counter to report an event. If you do alter the actual text of the prompt, + it is best to keep the prompt the same width as what's on screen. Otherwise the user's input text will + be shifted and the update will not be seamless. + + IMPORTANT: Do not call this unless you have acquired self._terminal_lock + first, which ensures a prompt is onscreen :param new_prompt: what to change the prompt to """ - if rl_type == RlType.NONE: + if not (vt100_support and self.use_rawinput): return # Set the new prompt @@ -3273,13 +3305,29 @@ Script should contain one command per line, just like command would be typed in # Clear the prompt and input lines terminal_str = self._clear_input_lines_str() if rl_type == RlType.GNU: - sys.stdout.write(terminal_str) + sys.stderr.write(terminal_str) elif rl_type == RlType.PYREADLINE: readline.rl.mode.console.write(terminal_str) # Redraw the prompt and input lines rl_force_redisplay() + @staticmethod + def set_window_title(title: str) -> None: + """ + Sets the terminal window title + :param title: the new window title + """ + if not vt100_support: + return + + import colorama.ansi as ansi + try: + sys.stderr.write(ansi.set_title(title)) + except AttributeError: + # Debugging in Pycharm has issues with setting terminal title + pass + def alerter(self): import time while True: @@ -3287,7 +3335,7 @@ Script should contain one command per line, just like command would be typed in " Message failed to send\n" \ "***********************************************" time.sleep(5) - self.print_alert(alert_msg) + self.async_alert(alert_msg) def do_alert(self, args): import threading @@ -3299,7 +3347,7 @@ Script should contain one command per line, just like command would be typed in counter = 1 while True: time.sleep(1) - self.update_prompt("({:>3}) ".format(counter)) + self.async_update_prompt("({:>3}) ".format(counter)) counter += 1 def do_update_prompt(self, args): diff --git a/cmd2/rl_utils.py b/cmd2/rl_utils.py index 58bd6377..96a74b67 100644 --- a/cmd2/rl_utils.py +++ b/cmd2/rl_utils.py @@ -27,6 +27,8 @@ class RlType(Enum): # Check what implementation of readline we are using rl_type = RlType.NONE + +# Tells if the terminal we are running in supports vt100 control characters vt100_support = False # The order of this check matters since importing pyreadline will also show readline in the modules list @@ -37,30 +39,40 @@ if 'pyreadline' in sys.modules: from ctypes.wintypes import DWORD, HANDLE import atexit - # noinspection PyPep8Naming - def enable_win_vt100(handle: HANDLE) -> None: - """ - Enables VT100 character sequences in a Windows console - This only works on Windows 10 and up - """ - ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x0004 + # Check if we are running in a terminal + if sys.stdout.isatty(): + # noinspection PyPep8Naming + def enable_win_vt100(handle: HANDLE) -> bool: + """ + Enables VT100 character sequences in a Windows console + This only works on Windows 10 and up + :param handle: the handle on which to enable vt100 + :return: True if vt100 characters are enabled for the handle + """ + ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x0004 + + # Get the current mode for this handle in the console + cur_mode = DWORD(0) + readline.rl.console.GetConsoleMode(handle, byref(cur_mode)) + + retVal = False - # Get the current mode for this handle in the console - cur_mode = DWORD(0) - readline.rl.console.GetConsoleMode(handle, byref(cur_mode)) + # Check if ENABLE_VIRTUAL_TERMINAL_PROCESSING is already enabled + if (cur_mode.value & ENABLE_VIRTUAL_TERMINAL_PROCESSING) != 0: + retVal = True - # If ENABLE_VIRTUAL_TERMINAL_PROCESSING is not enabled, then enable it now - if (cur_mode.value & ENABLE_VIRTUAL_TERMINAL_PROCESSING) == 0: - readline.rl.console.SetConsoleMode(handle, cur_mode.value | ENABLE_VIRTUAL_TERMINAL_PROCESSING) + elif readline.rl.console.SetConsoleMode(handle, cur_mode.value | ENABLE_VIRTUAL_TERMINAL_PROCESSING): + # Restore the original mode when we exit + atexit.register(readline.rl.console.SetConsoleMode, handle, cur_mode) + retVal = True - # Restore the original mode when we exit - atexit.register(readline.rl.console.SetConsoleMode, handle, cur_mode) + return retVal - # Enable VT100 sequences for stdout and stderr - STD_OUT_HANDLE = -11 - STD_ERROR_HANDLE = -12 - enable_win_vt100(readline.rl.console.GetStdHandle(STD_OUT_HANDLE)) - enable_win_vt100(readline.rl.console.GetStdHandle(STD_ERROR_HANDLE)) + # Enable VT100 sequences for stdout and stderr + STD_OUT_HANDLE = -11 + STD_ERROR_HANDLE = -12 + vt100_support = enable_win_vt100(readline.rl.console.GetStdHandle(STD_OUT_HANDLE)) and \ + enable_win_vt100(readline.rl.console.GetStdHandle(STD_ERROR_HANDLE)) ############################################################################################################ # pyreadline is incomplete in terms of the Python readline API. Add the missing functions we need. @@ -103,6 +115,10 @@ elif 'gnureadline' in sys.modules or 'readline' in sys.modules: import ctypes readline_lib = ctypes.CDLL(readline.__file__) + # Check if we are running in a terminal + if sys.stdout.isatty(): + vt100_support = True + # noinspection PyProtectedMember def rl_force_redisplay() -> None: |