diff options
author | kmvanbrunt <kmvanbrunt@gmail.com> | 2018-09-25 23:03:32 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-09-25 23:03:32 -0400 |
commit | dcd79f34466f0fa31734bbbcfcd0fb7b44bca98d (patch) | |
tree | bbf9a315b4c36a8f3789bc07f700e11cf2194424 | |
parent | d63f5673cfa5441cc4dddfb02c1bda89978d4f26 (diff) | |
parent | 151c67e74211465aa93e31e130fea19a4e32f320 (diff) | |
download | cmd2-git-dcd79f34466f0fa31734bbbcfcd0fb7b44bca98d.tar.gz |
Merge pull request #539 from python-cmd2/alert_printer
Alert printer
-rw-r--r-- | CHANGELOG.md | 4 | ||||
-rwxr-xr-x | README.md | 1 | ||||
-rw-r--r-- | cmd2/cmd2.py | 233 | ||||
-rw-r--r-- | cmd2/rl_utils.py | 116 | ||||
-rw-r--r-- | docs/unfreefeatures.rst | 30 | ||||
-rwxr-xr-x | examples/async_printing.py | 203 | ||||
-rw-r--r-- | tests/test_cmd2.py | 6 |
7 files changed, 534 insertions, 59 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index e48c1d8d..f5dff203 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,10 @@ for formatting help/description text * Aliases are now sorted alphabetically * The **set** command now tab-completes settable parameter names + * Added ``async_alert``, ``async_update_prompt``, and ``set_window_title`` functions + * These allow you to provide feedback to the user in an asychronous fashion, meaning alerts can + display when the user is still entering text at the prompt. See [async_printing.py](https://github.com/python-cmd2/cmd2/blob/master/examples/async_printing.py) + for an example. * Deletions * The ``preparse``, ``postparsing_precmd``, and ``postparsing_postcmd`` methods *deprecated* in the previous release have been deleted @@ -40,6 +40,7 @@ Main Features - Trivial to provide built-in help for all commands - Built-in regression testing framework for your applications (transcript-based testing) - Transcripts for use with built-in regression can be automatically generated from `history -t` +- Alerts that seamlessly print while user enters text at prompt Python 2.7 support is EOL ------------------------- diff --git a/cmd2/cmd2.py b/cmd2/cmd2.py index 0ea3d8cd..ed478b0d 100644 --- a/cmd2/cmd2.py +++ b/cmd2/cmd2.py @@ -40,6 +40,7 @@ import platform import re import shlex import sys +import threading from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple, Type, Union from . import constants @@ -50,7 +51,7 @@ from .clipboard import can_clip, get_paste_buffer, write_to_paste_buffer from .parsing import StatementParser, Statement # Set up readline -from .rl_utils import rl_type, RlType +from .rl_utils import rl_type, RlType, rl_get_point, rl_set_prompt, vt100_support, rl_make_safe_prompt if rl_type == RlType.NONE: # pragma: no cover rl_warning = "Readline features including tab completion have been disabled since no \n" \ "supported version of readline was found. To resolve this, install \n" \ @@ -527,6 +528,11 @@ class Cmd(cmd.Cmd): # This determines if a non-zero exit code should be used when exiting the application self.exit_code = None + # This lock should be acquired before doing any asynchronous changes to the terminal to + # ensure the updates to the terminal don't interfere with the input being typed. It can be + # acquired any time there is a readline prompt on screen. + self.terminal_lock = threading.RLock() + # ----- Methods related to presenting output to the user ----- @property @@ -1636,12 +1642,6 @@ class Cmd(cmd.Cmd): # Re-raise a KeyboardInterrupt so other parts of the code can catch it raise KeyboardInterrupt("Got a keyboard interrupt") - def preloop(self) -> None: - """Hook method executed once when the cmdloop() method is called.""" - import signal - # Register a default SIGINT signal handler for Ctrl+C - signal.signal(signal.SIGINT, self.sigint_handler) - def precmd(self, statement: Statement) -> Statement: """Hook method executed just before the command is processed by ``onecmd()`` and after adding it to the history. @@ -2011,34 +2011,6 @@ class Cmd(cmd.Cmd): # Print out a message stating this is an unknown command self.poutput('*** Unknown syntax: {}\n'.format(arg)) - @staticmethod - def _surround_ansi_escapes(prompt: str, start: str="\x01", end: str="\x02") -> str: - """Overcome bug in GNU Readline in relation to calculation of prompt length in presence of ANSI escape codes. - - :param prompt: original prompt - :param start: start code to tell GNU Readline about beginning of invisible characters - :param end: end code to tell GNU Readline about end of invisible characters - :return: prompt safe to pass to GNU Readline - """ - # Windows terminals don't use ANSI escape codes and Windows readline isn't based on GNU Readline - if sys.platform == "win32": - return prompt - - escaped = False - result = "" - - for c in prompt: - if c == "\x1b" and not escaped: - result += start + c - escaped = True - elif c.isalpha() and escaped: - result += c + end - escaped = False - else: - result += c - - return result - def pseudo_raw_input(self, prompt: str) -> str: """Began life as a copy of cmd's cmdloop; like raw_input but @@ -2047,23 +2019,37 @@ class Cmd(cmd.Cmd): to decide whether to print the prompt and the input """ - # Deal with the vagaries of readline and ANSI escape codes - safe_prompt = self._surround_ansi_escapes(prompt) + # Temporarily save over self.prompt to reflect what will be on screen + orig_prompt = self.prompt + self.prompt = prompt if self.use_rawinput: try: if sys.stdin.isatty(): + # Wrap in try since terminal_lock may not be locked when this function is called from unit tests + try: + # A prompt is about to be drawn. Allow asynchronous changes to the terminal. + self.terminal_lock.release() + except RuntimeError: + pass + + # Deal with the vagaries of readline and ANSI escape codes + safe_prompt = rl_make_safe_prompt(prompt) line = input(safe_prompt) else: line = input() if self.echo: - sys.stdout.write('{}{}\n'.format(safe_prompt, line)) + sys.stdout.write('{}{}\n'.format(self.prompt, line)) except EOFError: line = 'eof' + finally: + if sys.stdin.isatty(): + # The prompt is gone. Do not allow asynchronous changes to the terminal. + self.terminal_lock.acquire() else: if self.stdin.isatty(): # on a tty, print the prompt first, then read the line - self.poutput(safe_prompt, end='') + self.poutput(self.prompt, end='') self.stdout.flush() line = self.stdin.readline() if len(line) == 0: @@ -2076,9 +2062,13 @@ class Cmd(cmd.Cmd): if len(line): # we read something, output the prompt and the something if self.echo: - self.poutput('{}{}'.format(safe_prompt, line)) + self.poutput('{}{}'.format(self.prompt, line)) else: line = 'eof' + + # Restore prompt + self.prompt = orig_prompt + return line.strip() def _cmdloop(self) -> bool: @@ -2453,7 +2443,8 @@ class Cmd(cmd.Cmd): for (idx, (_, text)) in enumerate(fulloptions): self.poutput(' %2d. %s\n' % (idx + 1, text)) while True: - response = input(prompt) + safe_prompt = rl_make_safe_prompt(prompt) + response = input(safe_prompt) if rl_type != RlType.NONE: hlen = readline.get_current_history_length() @@ -3136,6 +3127,125 @@ Script should contain one command per line, just like command would be typed in runner = unittest.TextTestRunner() runner.run(testcase) + def _clear_input_lines_str(self) -> str: # pragma: no cover + """ + Returns a string that if printed will clear the prompt and input lines in the terminal, + leaving the cursor at the beginning of the first input line + :return: the string to print + """ + if not (vt100_support and self.use_rawinput): + return '' + + import shutil + import colorama.ansi as ansi + from colorama import Cursor + + visible_prompt = self.visible_prompt + + # Get the size of the terminal + terminal_size = shutil.get_terminal_size() + + # Figure out how many lines the prompt and user input take up + total_str_size = len(visible_prompt) + len(readline.get_line_buffer()) + num_input_lines = int(total_str_size / terminal_size.columns) + 1 + + # Get the cursor's offset from the beginning of the first input line + cursor_input_offset = len(visible_prompt) + rl_get_point() + + # Calculate what input line the cursor is on + cursor_input_line = int(cursor_input_offset / terminal_size.columns) + 1 + + # Create a string that will clear all input lines and print the alert + terminal_str = '' + + # Move the cursor down to the last input line + if cursor_input_line != num_input_lines: + terminal_str += Cursor.DOWN(num_input_lines - cursor_input_line) + + # Clear each input line from the bottom up so that the cursor ends up on the original first input line + terminal_str += (ansi.clear_line() + Cursor.UP(1)) * (num_input_lines - 1) + terminal_str += ansi.clear_line() + + # Move the cursor to the beginning of the first input line + terminal_str += '\r' + + return terminal_str + + def async_alert(self, alert_msg: str, new_prompt: Optional[str] = None) -> None: # pragma: no cover + """ + Used to display an important message to the user while they are at the prompt in between commands. + To the user it appears as if an alert message is printed above the prompt and their current input + text and cursor location is left alone. + + IMPORTANT: Do not call this unless you have acquired self.terminal_lock + first, which ensures a prompt is onscreen + + :param alert_msg: the message to display to the user + :param new_prompt: if you also want to change the prompt that is displayed, then include it here + see async_update_prompt() docstring for guidance on updating a prompt + :raises RuntimeError if called while another thread holds terminal_lock + """ + if not (vt100_support and self.use_rawinput): + return + + # Sanity check that can't fail if self.terminal_lock was acquired before calling this function + if self.terminal_lock.acquire(blocking=False): + + # Generate a string to clear the prompt and input lines and replace with the alert + terminal_str = self._clear_input_lines_str() + if alert_msg: + terminal_str += alert_msg + '\n' + + # Set the new prompt now that _clear_input_lines_str is done using the old prompt + if new_prompt is not None: + self.prompt = new_prompt + rl_set_prompt(self.prompt) + + # Print terminal_str to erase the lines + if rl_type == RlType.GNU: + sys.stderr.write(terminal_str) + elif rl_type == RlType.PYREADLINE: + readline.rl.mode.console.write(terminal_str) + + # Redraw the prompt and input lines + rl_force_redisplay() + + self.terminal_lock.release() + + else: + raise RuntimeError("another thread holds terminal_lock") + + def async_update_prompt(self, new_prompt: str) -> None: # pragma: no cover + """ + Updates the prompt while the user is still typing at it. This is good for alerting the user to system + changes dynamically in between commands. For instance you could alter the color of the prompt to indicate + a system status or increase a counter to report an event. If you do alter the actual text of the prompt, + it is best to keep the prompt the same width as what's on screen. Otherwise the user's input text will + be shifted and the update will not be seamless. + + IMPORTANT: Do not call this unless you have acquired self.terminal_lock + first, which ensures a prompt is onscreen + + :param new_prompt: what to change the prompt to + """ + self.async_alert('', new_prompt) + + @staticmethod + def set_window_title(title: str) -> None: # pragma: no cover + """ + Sets the terminal window title + :param title: the new window title + """ + if not vt100_support: + return + + import colorama.ansi as ansi + try: + sys.stderr.write(ansi.set_title(title)) + except AttributeError: + # Debugging in Pycharm has issues with setting terminal title + pass + def cmdloop(self, intro: Optional[str]=None) -> None: """This is an outer wrapper around _cmdloop() which deals with extra features provided by cmd2. @@ -3161,6 +3271,14 @@ Script should contain one command per line, just like command would be typed in if callargs: self.cmdqueue.extend(callargs) + # Register a SIGINT signal handler for Ctrl+C + import signal + original_sigint_handler = signal.getsignal(signal.SIGINT) + signal.signal(signal.SIGINT, self.sigint_handler) + + # Grab terminal lock before the prompt has been drawn by readline + self.terminal_lock.acquire() + # Always run the preloop first for func in self._preloop_hooks: func() @@ -3186,6 +3304,13 @@ Script should contain one command per line, just like command would be typed in func() self.postloop() + # Release terminal lock now that postloop code should have stopped any terminal updater threads + # This will also zero the lock count in case cmdloop() is called again + self.terminal_lock.release() + + # Restore the original signal handler + signal.signal(signal.SIGINT, original_sigint_handler) + if self.exit_code is not None: sys.exit(self.exit_code) @@ -3194,7 +3319,7 @@ Script should contain one command per line, just like command would be typed in # plugin related functions # ### - def _initialize_plugin_system(self): + def _initialize_plugin_system(self) -> None: """Initialize the plugin system""" self._preloop_hooks = [] self._postloop_hooks = [] @@ -3204,7 +3329,7 @@ Script should contain one command per line, just like command would be typed in self._cmdfinalization_hooks = [] @classmethod - def _validate_callable_param_count(cls, func: Callable, count: int): + def _validate_callable_param_count(cls, func: Callable, count: int) -> None: """Ensure a function has the given number of parameters.""" signature = inspect.signature(func) # validate that the callable has the right number of parameters @@ -3217,7 +3342,7 @@ Script should contain one command per line, just like command would be typed in )) @classmethod - def _validate_prepostloop_callable(cls, func: Callable): + def _validate_prepostloop_callable(cls, func: Callable[[None], None]) -> None: """Check parameter and return types for preloop and postloop hooks.""" cls._validate_callable_param_count(func, 0) # make sure there is no return notation @@ -3227,18 +3352,18 @@ Script should contain one command per line, just like command would be typed in func.__name__, )) - def register_preloop_hook(self, func: Callable): + def register_preloop_hook(self, func: Callable[[None], None]) -> None: """Register a function to be called at the beginning of the command loop.""" self._validate_prepostloop_callable(func) self._preloop_hooks.append(func) - def register_postloop_hook(self, func: Callable): + def register_postloop_hook(self, func: Callable[[None], None]) -> None: """Register a function to be called at the end of the command loop.""" self._validate_prepostloop_callable(func) self._postloop_hooks.append(func) @classmethod - def _validate_postparsing_callable(cls, func: Callable): + def _validate_postparsing_callable(cls, func: Callable[[plugin.PostparsingData], plugin.PostparsingData]) -> None: """Check parameter and return types for postparsing hooks""" cls._validate_callable_param_count(func, 1) signature = inspect.signature(func) @@ -3252,13 +3377,13 @@ Script should contain one command per line, just like command would be typed in func.__name__ )) - def register_postparsing_hook(self, func: Callable): + def register_postparsing_hook(self, func: Callable[[plugin.PostparsingData], plugin.PostparsingData]) -> None: """Register a function to be called after parsing user input but before running the command""" self._validate_postparsing_callable(func) self._postparsing_hooks.append(func) @classmethod - def _validate_prepostcmd_hook(cls, func: Callable, data_type: Type): + def _validate_prepostcmd_hook(cls, func: Callable, data_type: Type) -> None: """Check parameter and return types for pre and post command hooks.""" signature = inspect.signature(func) # validate that the callable has the right number of parameters @@ -3285,18 +3410,19 @@ Script should contain one command per line, just like command would be typed in data_type, )) - def register_precmd_hook(self, func: Callable): + def register_precmd_hook(self, func: Callable[[plugin.PrecommandData], plugin.PrecommandData]) -> None: """Register a hook to be called before the command function.""" self._validate_prepostcmd_hook(func, plugin.PrecommandData) self._precmd_hooks.append(func) - def register_postcmd_hook(self, func: Callable): + def register_postcmd_hook(self, func: Callable[[plugin.PostcommandData], plugin.PostcommandData]) -> None: """Register a hook to be called after the command function.""" self._validate_prepostcmd_hook(func, plugin.PostcommandData) self._postcmd_hooks.append(func) @classmethod - def _validate_cmdfinalization_callable(cls, func: Callable): + def _validate_cmdfinalization_callable(cls, func: Callable[[plugin.CommandFinalizationData], + plugin.CommandFinalizationData]) -> None: """Check parameter and return types for command finalization hooks.""" cls._validate_callable_param_count(func, 1) signature = inspect.signature(func) @@ -3308,7 +3434,8 @@ Script should contain one command per line, just like command would be typed in raise TypeError("{} must declare return a return type of " "'cmd2.plugin.CommandFinalizationData'".format(func.__name__)) - def register_cmdfinalization_hook(self, func: Callable): + def register_cmdfinalization_hook(self, func: Callable[[plugin.CommandFinalizationData], + plugin.CommandFinalizationData]) -> None: """Register a hook to be called after a command is completed, whether it completes successfully or not.""" self._validate_cmdfinalization_callable(func) self._cmdfinalization_hooks.append(func) diff --git a/cmd2/rl_utils.py b/cmd2/rl_utils.py index 7e49ea47..0819232d 100644 --- a/cmd2/rl_utils.py +++ b/cmd2/rl_utils.py @@ -26,13 +26,54 @@ class RlType(Enum): # Check what implementation of readline we are using - rl_type = RlType.NONE +# Tells if the terminal we are running in supports vt100 control characters +vt100_support = False + # The order of this check matters since importing pyreadline will also show readline in the modules list if 'pyreadline' in sys.modules: rl_type = RlType.PYREADLINE + from ctypes import byref + from ctypes.wintypes import DWORD, HANDLE + import atexit + + # Check if we are running in a terminal + if sys.stdout.isatty(): # pragma: no cover + # noinspection PyPep8Naming + def enable_win_vt100(handle: HANDLE) -> bool: + """ + Enables VT100 character sequences in a Windows console + This only works on Windows 10 and up + :param handle: the handle on which to enable vt100 + :return: True if vt100 characters are enabled for the handle + """ + ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x0004 + + # Get the current mode for this handle in the console + cur_mode = DWORD(0) + readline.rl.console.GetConsoleMode(handle, byref(cur_mode)) + + retVal = False + + # Check if ENABLE_VIRTUAL_TERMINAL_PROCESSING is already enabled + if (cur_mode.value & ENABLE_VIRTUAL_TERMINAL_PROCESSING) != 0: + retVal = True + + elif readline.rl.console.SetConsoleMode(handle, cur_mode.value | ENABLE_VIRTUAL_TERMINAL_PROCESSING): + # Restore the original mode when we exit + atexit.register(readline.rl.console.SetConsoleMode, handle, cur_mode) + retVal = True + + return retVal + + # Enable VT100 sequences for stdout and stderr + STD_OUT_HANDLE = -11 + STD_ERROR_HANDLE = -12 + vt100_support = (enable_win_vt100(readline.rl.console.GetStdHandle(STD_OUT_HANDLE)) and + enable_win_vt100(readline.rl.console.GetStdHandle(STD_ERROR_HANDLE))) + ############################################################################################################ # pyreadline is incomplete in terms of the Python readline API. Add the missing functions we need. ############################################################################################################ @@ -74,9 +115,13 @@ elif 'gnureadline' in sys.modules or 'readline' in sys.modules: import ctypes readline_lib = ctypes.CDLL(readline.__file__) + # Check if we are running in a terminal + if sys.stdout.isatty(): + vt100_support = True + # noinspection PyProtectedMember -def rl_force_redisplay() -> None: +def rl_force_redisplay() -> None: # pragma: no cover """ Causes readline to display the prompt and input text wherever the cursor is and start reading input from this location. This is the proper way to restore the input line after @@ -85,14 +130,77 @@ def rl_force_redisplay() -> None: if not sys.stdout.isatty(): return - if rl_type == RlType.GNU: # pragma: no cover + if rl_type == RlType.GNU: readline_lib.rl_forced_update_display() # After manually updating the display, readline asks that rl_display_fixed be set to 1 for efficiency display_fixed = ctypes.c_int.in_dll(readline_lib, "rl_display_fixed") display_fixed.value = 1 - elif rl_type == RlType.PYREADLINE: # pragma: no cover + elif rl_type == RlType.PYREADLINE: # Call _print_prompt() first to set the new location of the prompt readline.rl.mode._print_prompt() readline.rl.mode._update_line() + + +# noinspection PyProtectedMember +def rl_get_point() -> int: # pragma: no cover + """ + Returns the offset of the current cursor position in rl_line_buffer + """ + if rl_type == RlType.GNU: + return ctypes.c_int.in_dll(readline_lib, "rl_point").value + + elif rl_type == RlType.PYREADLINE: + return readline.rl.mode.l_buffer.point + + else: + return 0 + + +# noinspection PyProtectedMember +def rl_set_prompt(prompt: str) -> None: # pragma: no cover + """ + Sets readline's prompt + :param prompt: the new prompt value + """ + safe_prompt = rl_make_safe_prompt(prompt) + + if rl_type == RlType.GNU: + encoded_prompt = bytes(safe_prompt, encoding='utf-8') + readline_lib.rl_set_prompt(encoded_prompt) + + elif rl_type == RlType.PYREADLINE: + readline.rl._set_prompt(safe_prompt) + + +def rl_make_safe_prompt(prompt: str) -> str: # pragma: no cover + """Overcome bug in GNU Readline in relation to calculation of prompt length in presence of ANSI escape codes. + + :param prompt: original prompt + :return: prompt safe to pass to GNU Readline + """ + if rl_type == RlType.GNU: + # start code to tell GNU Readline about beginning of invisible characters + start = "\x01" + + # end code to tell GNU Readline about end of invisible characters + end = "\x02" + + escaped = False + result = "" + + for c in prompt: + if c == "\x1b" and not escaped: + result += start + c + escaped = True + elif c.isalpha() and escaped: + result += c + end + escaped = False + else: + result += c + + return result + + else: + return prompt diff --git a/docs/unfreefeatures.rst b/docs/unfreefeatures.rst index cd27745d..b5f9415d 100644 --- a/docs/unfreefeatures.rst +++ b/docs/unfreefeatures.rst @@ -189,3 +189,33 @@ Exit code to shell The ``self.exit_code`` attribute of your ``cmd2`` application controls what exit code is sent to the shell when your application exits from ``cmdloop()``. + + +Asynchronous Feedback +===================== +``cmd2`` provides two functions to provide asynchronous feedback to the user without interfering with +the command line. This means the feedback is provided to the user when they are still entering text at +the prompt. To use this functionality, the application must be running in a terminal that supports +VT100 control characters and readline. Linux, Mac, and Windows 10 and greater all support these. + +async_alert() + Used to display an important message to the user while they are at the prompt in between commands. + To the user it appears as if an alert message is printed above the prompt and their current input + text and cursor location is left alone. + +async_update_prompt() + Updates the prompt while the user is still typing at it. This is good for alerting the user to system + changes dynamically in between commands. For instance you could alter the color of the prompt to indicate + a system status or increase a counter to report an event. + +``cmd2`` also provides a function to change the title of the terminal window. This feature requires the +application be running in a terminal that supports VT100 control characters. Linux, Mac, and Windows 10 and +greater all support these. + +set_window_title() + Sets the terminal window title + + +The easiest way to understand these functions is to see the AsyncPrinting_ example for a demonstration. + +.. _AsyncPrinting: https://github.com/python-cmd2/cmd2/blob/master/examples/async_printing.py diff --git a/examples/async_printing.py b/examples/async_printing.py new file mode 100755 index 00000000..a4165ae8 --- /dev/null +++ b/examples/async_printing.py @@ -0,0 +1,203 @@ +#!/usr/bin/env python +# coding=utf-8 +""" +A simple example demonstrating an application that asynchronously prints alerts, updates the prompt +and changes the window title +""" + +import random +import threading +import time +from typing import List + +from colorama import Fore + +import cmd2 + +ALERTS = ["Watch as this application prints alerts and updates the prompt", + "This will only happen when the prompt is present", + "Notice how it doesn't interfere with your typing or cursor location", + "Go ahead and type some stuff and move the cursor throughout the line", + "Keep typing...", + "Move that cursor...", + "Pretty seamless, eh?", + "Feedback can also be given in the window title. Notice the arg count up there?", + "You can stop and start the alerts by typing stop_alerts and start_alerts", + "This demo will now continue to print alerts at random intervals" + ] + + +class AlerterApp(cmd2.Cmd): + """ An app that shows off async_alert() and async_update_prompt() """ + + def __init__(self, *args, **kwargs) -> None: + """ Initializer """ + + super().__init__(*args, **kwargs) + + self.prompt = "(APR)> " + + # The thread that will asynchronously alert the user of events + self._stop_thread = False + self._alerter_thread = threading.Thread() + self._alert_count = 0 + self._next_alert_time = 0 + + # Create some hooks to handle the starting and stopping of our thread + self.register_preloop_hook(self._preloop_hook) + self.register_postloop_hook(self._postloop_hook) + + def _preloop_hook(self) -> None: + """ Start the alerter thread """ + # This runs after cmdloop() acquires self.terminal_lock, which will be locked until the prompt appears. + # Therefore this is the best place to start the alerter thread since there is no risk of it alerting + # before the prompt is displayed. You can also start it via a command if its not something that should + # be running during the entire application. See do_start_alerts(). + self._stop_thread = False + + self._alerter_thread = threading.Thread(name='alerter', target=self._alerter_thread_func) + self._alerter_thread.start() + + def _postloop_hook(self) -> None: + """ Stops the alerter thread """ + + # After this function returns, cmdloop() releases self.terminal_lock which could make the alerter + # thread think the prompt is on screen. Therefore this is the best place to stop the alerter thread. + # You can also stop it via a command. See do_stop_alerts(). + self._stop_thread = True + if self._alerter_thread.is_alive(): + self._alerter_thread.join() + + def do_start_alerts(self, _): + """ Starts the alerter thread """ + if self._alerter_thread.is_alive(): + print("The alert thread is already started") + else: + self._stop_thread = False + self._alerter_thread = threading.Thread(name='alerter', target=self._alerter_thread_func) + self._alerter_thread.start() + + def do_stop_alerts(self, _): + """ Stops the alerter thread """ + self._stop_thread = True + if self._alerter_thread.is_alive(): + self._alerter_thread.join() + else: + print("The alert thread is already stopped") + + def _get_alerts(self) -> List[str]: + """ + Reports alerts + :return: the list of alerts + """ + global ALERTS + + cur_time = time.monotonic() + if cur_time < self._next_alert_time: + return [] + + alerts = [] + + if self._alert_count < len(ALERTS): + alerts.append(ALERTS[self._alert_count]) + self._alert_count += 1 + self._next_alert_time = cur_time + 4 + + else: + rand_num = random.randint(1, 20) + if rand_num > 2: + return [] + + for i in range(0, rand_num): + self._alert_count += 1 + alerts.append("Alert {}".format(self._alert_count)) + + self._next_alert_time = 0 + + return alerts + + def _generate_alert_str(self) -> str: + """ + Combines alerts into one string that can be printed to the terminal + :return: the alert string + """ + global ALERTS + + alert_str = '' + alerts = self._get_alerts() + + longest_alert = max(ALERTS, key=len) + num_astericks = len(longest_alert) + 8 + + for i, cur_alert in enumerate(alerts): + # Use padding to center the alert + padding = ' ' * int((num_astericks - len(cur_alert)) / 2) + + if i > 0: + alert_str += '\n' + alert_str += '*' * num_astericks + '\n' + alert_str += padding + cur_alert + padding + '\n' + alert_str += '*' * num_astericks + '\n' + + return alert_str + + def _generate_colored_prompt(self) -> str: + """ + Randomly generates a colored the prompt + :return: the new prompt + """ + rand_num = random.randint(1, 20) + + status_color = Fore.RESET + + if rand_num == 1: + status_color = Fore.LIGHTRED_EX + elif rand_num == 2: + status_color = Fore.LIGHTYELLOW_EX + elif rand_num == 3: + status_color = Fore.CYAN + elif rand_num == 4: + status_color = Fore.LIGHTGREEN_EX + elif rand_num == 5: + status_color = Fore.LIGHTBLUE_EX + + return status_color + self.visible_prompt + Fore.RESET + + def _alerter_thread_func(self) -> None: + """ Prints alerts and updates the prompt any time the prompt is showing """ + + self._alert_count = 0 + self._next_alert_time = 0 + + while not self._stop_thread: + # Always acquire terminal_lock before printing alerts or updating the prompt + # To keep the app responsive, do not block on this call + if self.terminal_lock.acquire(blocking=False): + + # Get any alerts that need to be printed + alert_str = self._generate_alert_str() + + # Generate a new prompt + new_prompt = self._generate_colored_prompt() + + # Check if we have alerts to print + if alert_str: + # new_prompt is an optional parameter to async_alert() + self.async_alert(alert_str, new_prompt) + new_title = "Alerts Printed: {}".format(self._alert_count) + self.set_window_title(new_title) + + # No alerts needed to be printed, check if the prompt changed + elif new_prompt != self.prompt: + self.async_update_prompt(new_prompt) + + # Don't forget to release the lock + self.terminal_lock.release() + + time.sleep(0.5) + + +if __name__ == '__main__': + app = AlerterApp() + app.set_window_title("Asynchronous Printer Test") + app.cmdloop() diff --git a/tests/test_cmd2.py b/tests/test_cmd2.py index e3992c7b..c6a90fdf 100644 --- a/tests/test_cmd2.py +++ b/tests/test_cmd2.py @@ -1101,11 +1101,13 @@ def test_default_to_shell_failure(capsys): def test_ansi_prompt_not_esacped(base_app): + from cmd2.rl_utils import rl_make_safe_prompt prompt = '(Cmd) ' - assert base_app._surround_ansi_escapes(prompt) == prompt + assert rl_make_safe_prompt(prompt) == prompt def test_ansi_prompt_escaped(): + from cmd2.rl_utils import rl_make_safe_prompt app = cmd2.Cmd() color = 'cyan' prompt = 'InColor' @@ -1114,7 +1116,7 @@ def test_ansi_prompt_escaped(): readline_hack_start = "\x01" readline_hack_end = "\x02" - readline_safe_prompt = app._surround_ansi_escapes(color_prompt) + readline_safe_prompt = rl_make_safe_prompt(color_prompt) if sys.platform.startswith('win'): # colorize() does nothing on Windows due to lack of ANSI color support assert prompt == color_prompt |