diff options
-rw-r--r-- | cmd2/cmd2.py | 258 | ||||
-rw-r--r-- | cmd2/rl_utils.py | 106 | ||||
-rw-r--r-- | docs/unfreefeatures.rst | 23 | ||||
-rwxr-xr-x | examples/async_printing.py | 197 | ||||
-rw-r--r-- | tests/test_cmd2.py | 6 |
5 files changed, 534 insertions, 56 deletions
diff --git a/cmd2/cmd2.py b/cmd2/cmd2.py index c64966a1..704773cc 100644 --- a/cmd2/cmd2.py +++ b/cmd2/cmd2.py @@ -40,6 +40,7 @@ import platform import re import shlex import sys +import threading from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple, Type, Union from . import constants @@ -50,7 +51,7 @@ from .clipboard import can_clip, get_paste_buffer, write_to_paste_buffer from .parsing import StatementParser, Statement # Set up readline -from .rl_utils import rl_type, RlType +from .rl_utils import rl_type, RlType, rl_get_point, rl_set_prompt, vt100_support, rl_make_safe_prompt if rl_type == RlType.NONE: # pragma: no cover rl_warning = "Readline features including tab completion have been disabled since no \n" \ "supported version of readline was found. To resolve this, install \n" \ @@ -526,6 +527,11 @@ class Cmd(cmd.Cmd): # This determines if a non-zero exit code should be used when exiting the application self.exit_code = None + # This lock should be acquired before doing any asynchronous changes to the terminal to + # ensure the updates to the terminal don't interfere with the input being typed. It can be + # acquired any time there is a readline prompt on screen. + self.terminal_lock = threading.RLock() + # ----- Methods related to presenting output to the user ----- @property @@ -1635,12 +1641,6 @@ class Cmd(cmd.Cmd): # Re-raise a KeyboardInterrupt so other parts of the code can catch it raise KeyboardInterrupt("Got a keyboard interrupt") - def preloop(self) -> None: - """Hook method executed once when the cmdloop() method is called.""" - import signal - # Register a default SIGINT signal handler for Ctrl+C - signal.signal(signal.SIGINT, self.sigint_handler) - def precmd(self, statement: Statement) -> Statement: """Hook method executed just before the command is processed by ``onecmd()`` and after adding it to the history. @@ -2010,34 +2010,6 @@ class Cmd(cmd.Cmd): # Print out a message stating this is an unknown command self.poutput('*** Unknown syntax: {}\n'.format(arg)) - @staticmethod - def _surround_ansi_escapes(prompt: str, start: str="\x01", end: str="\x02") -> str: - """Overcome bug in GNU Readline in relation to calculation of prompt length in presence of ANSI escape codes. - - :param prompt: original prompt - :param start: start code to tell GNU Readline about beginning of invisible characters - :param end: end code to tell GNU Readline about end of invisible characters - :return: prompt safe to pass to GNU Readline - """ - # Windows terminals don't use ANSI escape codes and Windows readline isn't based on GNU Readline - if sys.platform == "win32": - return prompt - - escaped = False - result = "" - - for c in prompt: - if c == "\x1b" and not escaped: - result += start + c - escaped = True - elif c.isalpha() and escaped: - result += c + end - escaped = False - else: - result += c - - return result - def pseudo_raw_input(self, prompt: str) -> str: """Began life as a copy of cmd's cmdloop; like raw_input but @@ -2046,23 +2018,37 @@ class Cmd(cmd.Cmd): to decide whether to print the prompt and the input """ - # Deal with the vagaries of readline and ANSI escape codes - safe_prompt = self._surround_ansi_escapes(prompt) + # Temporarily save over self.prompt to reflect what will be on screen + orig_prompt = self.prompt + self.prompt = prompt if self.use_rawinput: try: if sys.stdin.isatty(): + # Wrap in try since terminal_lock may not be locked when this function is called from unit tests + try: + # A prompt is about to be drawn. Allow asynchronous changes to the terminal. + self.terminal_lock.release() + except RuntimeError: + pass + + # Deal with the vagaries of readline and ANSI escape codes + safe_prompt = rl_make_safe_prompt(prompt) line = input(safe_prompt) else: line = input() if self.echo: - sys.stdout.write('{}{}\n'.format(safe_prompt, line)) + sys.stdout.write('{}{}\n'.format(self.prompt, line)) except EOFError: line = 'eof' + finally: + if sys.stdin.isatty(): + # The prompt is gone. Do not allow asynchronous changes to the terminal. + self.terminal_lock.acquire() else: if self.stdin.isatty(): # on a tty, print the prompt first, then read the line - self.poutput(safe_prompt, end='') + self.poutput(self.prompt, end='') self.stdout.flush() line = self.stdin.readline() if len(line) == 0: @@ -2075,9 +2061,13 @@ class Cmd(cmd.Cmd): if len(line): # we read something, output the prompt and the something if self.echo: - self.poutput('{}{}'.format(safe_prompt, line)) + self.poutput('{}{}'.format(self.prompt, line)) else: line = 'eof' + + # Restore prompt + self.prompt = orig_prompt + return line.strip() def _cmdloop(self) -> bool: @@ -2457,7 +2447,8 @@ class Cmd(cmd.Cmd): for (idx, (_, text)) in enumerate(fulloptions): self.poutput(' %2d. %s\n' % (idx + 1, text)) while True: - response = input(prompt) + safe_prompt = rl_make_safe_prompt(prompt) + response = input(safe_prompt) if rl_type != RlType.NONE: hlen = readline.get_current_history_length() @@ -3140,6 +3131,150 @@ Script should contain one command per line, just like command would be typed in runner = unittest.TextTestRunner() runner.run(testcase) + def _clear_input_lines_str(self) -> str: + """ + Returns a string that if printed will clear the prompt and input lines in the terminal, + leaving the cursor at the beginning of the first input line + :return: the string to print + """ + if not (vt100_support and self.use_rawinput): + return '' + + import shutil + import colorama.ansi as ansi + from colorama import Cursor + + visible_prompt = self.visible_prompt + + # Get the size of the terminal + terminal_size = shutil.get_terminal_size() + + # Figure out how many lines the prompt and user input take up + total_str_size = len(visible_prompt) + len(readline.get_line_buffer()) + num_input_lines = int(total_str_size / terminal_size.columns) + 1 + + # Get the cursor's offset from the beginning of the first input line + cursor_input_offset = len(visible_prompt) + rl_get_point() + + # Calculate what input line the cursor is on + cursor_input_line = int(cursor_input_offset / terminal_size.columns) + 1 + + # Create a string that will clear all input lines and print the alert + terminal_str = '' + + # Move the cursor down to the last input line + if cursor_input_line != num_input_lines: + terminal_str += Cursor.DOWN(num_input_lines - cursor_input_line) + + # Clear each input line from the bottom up so that the cursor ends up on the original first input line + terminal_str += (ansi.clear_line() + Cursor.UP(1)) * (num_input_lines - 1) + terminal_str += ansi.clear_line() + + # Move the cursor to the beginning of the first input line + terminal_str += '\r' + + return terminal_str + + def async_alert(self, alert_msg: str, new_prompt: Optional[str] = None) -> None: + """ + Used to display an important message to the user while they are at the prompt in between commands. + To the user it appears as if an alert message is printed above the prompt and their current input + text and cursor location is left alone. + + IMPORTANT: Do not call this unless you have acquired self.terminal_lock + first, which ensures a prompt is onscreen + + :param alert_msg: the message to display to the user + :param new_prompt: if you also want to change the prompt that is displayed, then include it here + see async_update_prompt() docstring for guidance on updating a prompt + :raises RuntimeError if called while another thread holds terminal_lock + """ + if not (vt100_support and self.use_rawinput): + return + + # Sanity check that can't fail if self.terminal_lock was acquired before calling this function + if self.terminal_lock.acquire(blocking=False): + + # Generate a string to clear the prompt and input lines and replace with the alert + terminal_str = self._clear_input_lines_str() + terminal_str += alert_msg + '\n' + + # Set the new prompt now that _clear_input_lines_str is done using the old prompt + if new_prompt is not None: + self.prompt = new_prompt + rl_set_prompt(self.prompt) + + # Print terminal_str to erase the lines + if rl_type == RlType.GNU: + sys.stderr.write(terminal_str) + elif rl_type == RlType.PYREADLINE: + readline.rl.mode.console.write(terminal_str) + + # Redraw the prompt and input lines + rl_force_redisplay() + + self.terminal_lock.release() + + else: + raise RuntimeError("another thread holds terminal_lock") + + def async_update_prompt(self, new_prompt: str) -> None: + """ + Updates the prompt while the user is still typing at it. This is good for alerting the user to system + changes dynamically in between commands. For instance you could alter the color of the prompt to indicate + a system status or increase a counter to report an event. If you do alter the actual text of the prompt, + it is best to keep the prompt the same width as what's on screen. Otherwise the user's input text will + be shifted and the update will not be seamless. + + IMPORTANT: Do not call this unless you have acquired self.terminal_lock + first, which ensures a prompt is onscreen + + :param new_prompt: what to change the prompt to + :raises RuntimeError if called while another thread holds terminal_lock + """ + if not (vt100_support and self.use_rawinput): + return + + # Sanity check that can't fail if self.terminal_lock was acquired before calling this function + if self.terminal_lock.acquire(blocking=False): + + # Generate a string to clear the prompt and input lines + terminal_str = self._clear_input_lines_str() + + # Set the new prompt now that _clear_input_lines_str is done using the old prompt + self.prompt = new_prompt + rl_set_prompt(self.prompt) + + # Print terminal_str to erase the lines + if rl_type == RlType.GNU: + sys.stderr.write(terminal_str) + elif rl_type == RlType.PYREADLINE: + readline.rl.mode.console.write(terminal_str) + + # Redraw the prompt and input lines + rl_force_redisplay() + + self.terminal_lock.release() + + else: + raise RuntimeError("another thread holds terminal_lock") + + @staticmethod + def set_window_title(title: str) -> None: + """ + Sets the terminal window title + :param title: the new window title + """ + if not vt100_support: + return + + import colorama.ansi as ansi + try: + sys.stderr.write(ansi.set_title(title)) + except AttributeError: + # Debugging in Pycharm has issues with setting terminal title + pass + def cmdloop(self, intro: Optional[str]=None) -> None: """This is an outer wrapper around _cmdloop() which deals with extra features provided by cmd2. @@ -3165,6 +3300,14 @@ Script should contain one command per line, just like command would be typed in if callargs: self.cmdqueue.extend(callargs) + # Register a SIGINT signal handler for Ctrl+C + import signal + original_sigint_handler = signal.getsignal(signal.SIGINT) + signal.signal(signal.SIGINT, self.sigint_handler) + + # Grab terminal lock before the prompt has been drawn by readline + self.terminal_lock.acquire() + # Always run the preloop first for func in self._preloop_hooks: func() @@ -3190,6 +3333,13 @@ Script should contain one command per line, just like command would be typed in func() self.postloop() + # Release terminal lock now that postloop code should have stopped any terminal updater threads + # This will also zero the lock count in case cmdloop() is called again + self.terminal_lock.release() + + # Restore the original signal handler + signal.signal(signal.SIGINT, original_sigint_handler) + if self.exit_code is not None: sys.exit(self.exit_code) @@ -3198,7 +3348,7 @@ Script should contain one command per line, just like command would be typed in # plugin related functions # ### - def _initialize_plugin_system(self): + def _initialize_plugin_system(self) -> None: """Initialize the plugin system""" self._preloop_hooks = [] self._postloop_hooks = [] @@ -3208,7 +3358,7 @@ Script should contain one command per line, just like command would be typed in self._cmdfinalization_hooks = [] @classmethod - def _validate_callable_param_count(cls, func: Callable, count: int): + def _validate_callable_param_count(cls, func: Callable, count: int) -> None: """Ensure a function has the given number of parameters.""" signature = inspect.signature(func) # validate that the callable has the right number of parameters @@ -3221,7 +3371,7 @@ Script should contain one command per line, just like command would be typed in )) @classmethod - def _validate_prepostloop_callable(cls, func: Callable): + def _validate_prepostloop_callable(cls, func: Callable[[None], None]) -> None: """Check parameter and return types for preloop and postloop hooks.""" cls._validate_callable_param_count(func, 0) # make sure there is no return notation @@ -3231,18 +3381,18 @@ Script should contain one command per line, just like command would be typed in func.__name__, )) - def register_preloop_hook(self, func: Callable): + def register_preloop_hook(self, func: Callable[[None], None]) -> None: """Register a function to be called at the beginning of the command loop.""" self._validate_prepostloop_callable(func) self._preloop_hooks.append(func) - def register_postloop_hook(self, func: Callable): + def register_postloop_hook(self, func: Callable[[None], None]) -> None: """Register a function to be called at the end of the command loop.""" self._validate_prepostloop_callable(func) self._postloop_hooks.append(func) @classmethod - def _validate_postparsing_callable(cls, func: Callable): + def _validate_postparsing_callable(cls, func: Callable[[plugin.PostparsingData], plugin.PostparsingData]) -> None: """Check parameter and return types for postparsing hooks""" cls._validate_callable_param_count(func, 1) signature = inspect.signature(func) @@ -3256,13 +3406,13 @@ Script should contain one command per line, just like command would be typed in func.__name__ )) - def register_postparsing_hook(self, func: Callable): + def register_postparsing_hook(self, func: Callable[[plugin.PostparsingData], plugin.PostparsingData]) -> None: """Register a function to be called after parsing user input but before running the command""" self._validate_postparsing_callable(func) self._postparsing_hooks.append(func) @classmethod - def _validate_prepostcmd_hook(cls, func: Callable, data_type: Type): + def _validate_prepostcmd_hook(cls, func: Callable, data_type: Type) -> None: """Check parameter and return types for pre and post command hooks.""" signature = inspect.signature(func) # validate that the callable has the right number of parameters @@ -3289,18 +3439,19 @@ Script should contain one command per line, just like command would be typed in data_type, )) - def register_precmd_hook(self, func: Callable): + def register_precmd_hook(self, func: Callable[[plugin.PrecommandData], plugin.PrecommandData]) -> None: """Register a hook to be called before the command function.""" self._validate_prepostcmd_hook(func, plugin.PrecommandData) self._precmd_hooks.append(func) - def register_postcmd_hook(self, func: Callable): + def register_postcmd_hook(self, func: Callable[[plugin.PostcommandData], plugin.PostcommandData]) -> None: """Register a hook to be called after the command function.""" self._validate_prepostcmd_hook(func, plugin.PostcommandData) self._postcmd_hooks.append(func) @classmethod - def _validate_cmdfinalization_callable(cls, func: Callable): + def _validate_cmdfinalization_callable(cls, func: Callable[[plugin.CommandFinalizationData], + plugin.CommandFinalizationData]) -> None: """Check parameter and return types for command finalization hooks.""" cls._validate_callable_param_count(func, 1) signature = inspect.signature(func) @@ -3312,7 +3463,8 @@ Script should contain one command per line, just like command would be typed in raise TypeError("{} must declare return a return type of " "'cmd2.plugin.CommandFinalizationData'".format(func.__name__)) - def register_cmdfinalization_hook(self, func: Callable): + def register_cmdfinalization_hook(self, func: Callable[[plugin.CommandFinalizationData], + plugin.CommandFinalizationData]) -> None: """Register a hook to be called after a command is completed, whether it completes successfully or not.""" self._validate_cmdfinalization_callable(func) self._cmdfinalization_hooks.append(func) diff --git a/cmd2/rl_utils.py b/cmd2/rl_utils.py index 7e49ea47..569ba8cf 100644 --- a/cmd2/rl_utils.py +++ b/cmd2/rl_utils.py @@ -26,13 +26,54 @@ class RlType(Enum): # Check what implementation of readline we are using - rl_type = RlType.NONE +# Tells if the terminal we are running in supports vt100 control characters +vt100_support = False + # The order of this check matters since importing pyreadline will also show readline in the modules list if 'pyreadline' in sys.modules: rl_type = RlType.PYREADLINE + from ctypes import byref + from ctypes.wintypes import DWORD, HANDLE + import atexit + + # Check if we are running in a terminal + if sys.stdout.isatty(): + # noinspection PyPep8Naming + def enable_win_vt100(handle: HANDLE) -> bool: + """ + Enables VT100 character sequences in a Windows console + This only works on Windows 10 and up + :param handle: the handle on which to enable vt100 + :return: True if vt100 characters are enabled for the handle + """ + ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x0004 + + # Get the current mode for this handle in the console + cur_mode = DWORD(0) + readline.rl.console.GetConsoleMode(handle, byref(cur_mode)) + + retVal = False + + # Check if ENABLE_VIRTUAL_TERMINAL_PROCESSING is already enabled + if (cur_mode.value & ENABLE_VIRTUAL_TERMINAL_PROCESSING) != 0: + retVal = True + + elif readline.rl.console.SetConsoleMode(handle, cur_mode.value | ENABLE_VIRTUAL_TERMINAL_PROCESSING): + # Restore the original mode when we exit + atexit.register(readline.rl.console.SetConsoleMode, handle, cur_mode) + retVal = True + + return retVal + + # Enable VT100 sequences for stdout and stderr + STD_OUT_HANDLE = -11 + STD_ERROR_HANDLE = -12 + vt100_support = enable_win_vt100(readline.rl.console.GetStdHandle(STD_OUT_HANDLE)) and \ + enable_win_vt100(readline.rl.console.GetStdHandle(STD_ERROR_HANDLE)) + ############################################################################################################ # pyreadline is incomplete in terms of the Python readline API. Add the missing functions we need. ############################################################################################################ @@ -74,6 +115,10 @@ elif 'gnureadline' in sys.modules or 'readline' in sys.modules: import ctypes readline_lib = ctypes.CDLL(readline.__file__) + # Check if we are running in a terminal + if sys.stdout.isatty(): + vt100_support = True + # noinspection PyProtectedMember def rl_force_redisplay() -> None: @@ -96,3 +141,62 @@ def rl_force_redisplay() -> None: # Call _print_prompt() first to set the new location of the prompt readline.rl.mode._print_prompt() readline.rl.mode._update_line() + + +# noinspection PyProtectedMember +def rl_get_point() -> int: + """ + Returns the offset of the current cursor position in rl_line_buffer + """ + if rl_type == RlType.GNU: # pragma: no cover + return ctypes.c_int.in_dll(readline_lib, "rl_point").value + + elif rl_type == RlType.PYREADLINE: # pragma: no cover + return readline.rl.mode.l_buffer.point + + else: # pragma: no cover + return 0 + + +# noinspection PyProtectedMember +def rl_set_prompt(prompt: str) -> None: + """ + Sets readline's prompt + :param prompt: the new prompt value + """ + safe_prompt = rl_make_safe_prompt(prompt) + + if rl_type == RlType.GNU: # pragma: no cover + encoded_prompt = bytes(safe_prompt, encoding='utf-8') + readline_lib.rl_set_prompt(encoded_prompt) + + elif rl_type == RlType.PYREADLINE: # pragma: no cover + readline.rl._set_prompt(safe_prompt) + + +def rl_make_safe_prompt(prompt: str, start: str = "\x01", end: str = "\x02") -> str: + """Overcome bug in GNU Readline in relation to calculation of prompt length in presence of ANSI escape codes. + + :param prompt: original prompt + :param start: start code to tell GNU Readline about beginning of invisible characters + :param end: end code to tell GNU Readline about end of invisible characters + :return: prompt safe to pass to GNU Readline + """ + if rl_type == RlType.GNU: + escaped = False + result = "" + + for c in prompt: + if c == "\x1b" and not escaped: + result += start + c + escaped = True + elif c.isalpha() and escaped: + result += c + end + escaped = False + else: + result += c + + return result + + else: + return prompt diff --git a/docs/unfreefeatures.rst b/docs/unfreefeatures.rst index cd27745d..1b8adb8d 100644 --- a/docs/unfreefeatures.rst +++ b/docs/unfreefeatures.rst @@ -189,3 +189,26 @@ Exit code to shell The ``self.exit_code`` attribute of your ``cmd2`` application controls what exit code is sent to the shell when your application exits from ``cmdloop()``. + + +Asynchronous printing +===================== +``cmd2`` provides two functions to provide asynchronous feedback to the user without interfering with +the command line. This means the feedback is provided to the user when they are still entering text at +the prompt. To use this functionality, the application must be running in any terminal that supports +VT100 control characters and readline. Linux, Mac, and Windows 10 and greater all support these. + +async_alert() + Used to display an important message to the user while they are at the prompt in between commands. + To the user it appears as if an alert message is printed above the prompt and their current input + text and cursor location is left alone. + +async_update_prompt() + Updates the prompt while the user is still typing at it. This is good for alerting the user to system + changes dynamically in between commands. For instance you could alter the color of the prompt to indicate + a system status or increase a counter to report an event. + + +The easiest way to understand these function is to see the AsyncPrinting_ example for a demonstration. + +.. _AsyncPrinting: https://github.com/python-cmd2/cmd2/blob/master/examples/async_printing.py diff --git a/examples/async_printing.py b/examples/async_printing.py new file mode 100755 index 00000000..feda24c0 --- /dev/null +++ b/examples/async_printing.py @@ -0,0 +1,197 @@ +#!/usr/bin/env python +# coding=utf-8 +"""A simple example demonstrating an application that asynchronously prints alerts and updates the prompt""" + +import random +import threading +import time +from typing import List + +from colorama import Fore + +import cmd2 + +ALERTS = ["Watch as this application prints alerts and updates the prompt", + "This will only happen when the prompt is present", + "Notice how it doesn't interfere with your typing or cursor location", + "Go ahead and type some stuff and move the cursor throughout the line", + "Keep typing...", + "Move that cursor...", + "Pretty seamless, eh?", + "You can stop and start the alerts by typing stop_alerts and start_alerts", + "This demo will now continue to print alerts at random intervals" + ] + + +class AlerterApp(cmd2.Cmd): + """ An app that shows off async_alert() and async_update_prompt() """ + + def __init__(self, *args, **kwargs) -> None: + """ Initializer """ + + super().__init__(*args, **kwargs) + + self.prompt = "(APR)> " + + # The thread that will asynchronously alert the user of events + self._stop_thread = False + self._alerter_thread = threading.Thread() + self._alert_count = 0 + self._next_alert_time = 0 + + # Create some hooks to handle the starting and stopping of our thread + self.register_preloop_hook(self._preloop_hook) + self.register_postloop_hook(self._postloop_hook) + + def _preloop_hook(self) -> None: + """ Start the alerter thread """ + # This runs after cmdloop() acquires self.terminal_lock, which will be locked until the prompt appears. + # Therefore this is the best place to start the alerter thread since there is no risk of it alerting + # before the prompt is displayed. You can also start it via a command if its not something that should + # be running during the entire application. See do_start_alerts(). + self._stop_thread = False + + self._alerter_thread = threading.Thread(name='alerter', target=self._alerter_thread_func) + self._alerter_thread.start() + + def _postloop_hook(self) -> None: + """ Stops the alerter thread """ + + # After this function returns, cmdloop() releases self.terminal_lock which could make the alerter + # thread think the prompt is on screen. Therefore this is the best place to stop the alerter thread. + # You can also stop it via a command. See do_stop_alerts(). + self._stop_thread = True + if self._alerter_thread.is_alive(): + self._alerter_thread.join() + + def do_start_alerts(self, _): + """ Starts the alerter thread """ + if self._alerter_thread.is_alive(): + print("The alert thread is already started") + else: + self._stop_thread = False + self._alerter_thread = threading.Thread(name='alerter', target=self._alerter_thread_func) + self._alerter_thread.start() + + def do_stop_alerts(self, _): + """ Stops the alerter thread """ + self._stop_thread = True + if self._alerter_thread.is_alive(): + self._alerter_thread.join() + else: + print("The alert thread is already stopped") + + def _get_alerts(self) -> List[str]: + """ + Reports alerts + :return: the list of alerts + """ + global ALERTS + + cur_time = time.monotonic() + if cur_time < self._next_alert_time: + return [] + + alerts = [] + + if self._alert_count < len(ALERTS): + alerts.append(ALERTS[self._alert_count]) + self._alert_count += 1 + self._next_alert_time = cur_time + 4 + + else: + rand_num = random.randint(1, 20) + if rand_num > 2: + return [] + + for i in range(0, rand_num): + alerts.append("Alert {}".format(self._alert_count)) + self._alert_count += 1 + + self._next_alert_time = 0 + + return alerts + + def _generate_alert_str(self) -> str: + """ + Combines alerts into one string that can be printed to the terminal + :return: the alert string + """ + global ALERTS + + alert_str = '' + alerts = self._get_alerts() + + longest_alert = max(ALERTS, key=len) + num_astericks = len(longest_alert) + 8 + + for i, cur_alert in enumerate(alerts): + # Use padding to center the alert + padding = ' ' * int((num_astericks - len(cur_alert)) / 2) + + if i > 0: + alert_str += '\n' + alert_str += '*' * num_astericks + '\n' + alert_str += padding + cur_alert + padding + '\n' + alert_str += '*' * num_astericks + '\n' + + return alert_str + + def _generate_colored_prompt(self) -> str: + """ + Randomly generates a colored the prompt + :return: the new prompt + """ + rand_num = random.randint(1, 20) + + status_color = Fore.RESET + + if rand_num == 1: + status_color = Fore.LIGHTRED_EX + elif rand_num == 2: + status_color = Fore.LIGHTYELLOW_EX + elif rand_num == 3: + status_color = Fore.CYAN + elif rand_num == 4: + status_color = Fore.LIGHTGREEN_EX + elif rand_num == 5: + status_color = Fore.LIGHTBLUE_EX + + return status_color + self.visible_prompt + Fore.RESET + + def _alerter_thread_func(self) -> None: + """ Prints alerts and updates the prompt any time the prompt is showing """ + + self._alert_count = 0 + self._next_alert_time = 0 + + while not self._stop_thread: + # Always acquire terminal_lock before printing alerts or updating the prompt + # To keep the app responsive, do not block on this call + if self.terminal_lock.acquire(blocking=False): + + # Get any alerts that need to be printed + alert_str = self._generate_alert_str() + + # Generate a new prompt + new_prompt = self._generate_colored_prompt() + + # Check if we have alerts to print + if alert_str: + # new_prompt is an optional parameter to async_alert() + self.async_alert(alert_str, new_prompt) + + # No alerts needed to be printed, check if the prompt changed + elif new_prompt != self.prompt: + self.async_update_prompt(new_prompt) + + # Don't forget to release the lock + self.terminal_lock.release() + + time.sleep(0.5) + + +if __name__ == '__main__': + app = AlerterApp() + app.set_window_title("Asynchronous Printer Test") + app.cmdloop() diff --git a/tests/test_cmd2.py b/tests/test_cmd2.py index e3992c7b..c6a90fdf 100644 --- a/tests/test_cmd2.py +++ b/tests/test_cmd2.py @@ -1101,11 +1101,13 @@ def test_default_to_shell_failure(capsys): def test_ansi_prompt_not_esacped(base_app): + from cmd2.rl_utils import rl_make_safe_prompt prompt = '(Cmd) ' - assert base_app._surround_ansi_escapes(prompt) == prompt + assert rl_make_safe_prompt(prompt) == prompt def test_ansi_prompt_escaped(): + from cmd2.rl_utils import rl_make_safe_prompt app = cmd2.Cmd() color = 'cyan' prompt = 'InColor' @@ -1114,7 +1116,7 @@ def test_ansi_prompt_escaped(): readline_hack_start = "\x01" readline_hack_end = "\x02" - readline_safe_prompt = app._surround_ansi_escapes(color_prompt) + readline_safe_prompt = rl_make_safe_prompt(color_prompt) if sys.platform.startswith('win'): # colorize() does nothing on Windows due to lack of ANSI color support assert prompt == color_prompt |