summaryrefslogtreecommitdiff
path: root/cmd2/cmd2.py
diff options
context:
space:
mode:
authorkmvanbrunt <kmvanbrunt@gmail.com>2018-09-25 23:03:32 -0400
committerGitHub <noreply@github.com>2018-09-25 23:03:32 -0400
commitdcd79f34466f0fa31734bbbcfcd0fb7b44bca98d (patch)
treebbf9a315b4c36a8f3789bc07f700e11cf2194424 /cmd2/cmd2.py
parentd63f5673cfa5441cc4dddfb02c1bda89978d4f26 (diff)
parent151c67e74211465aa93e31e130fea19a4e32f320 (diff)
downloadcmd2-git-dcd79f34466f0fa31734bbbcfcd0fb7b44bca98d.tar.gz
Merge pull request #539 from python-cmd2/alert_printer
Alert printer
Diffstat (limited to 'cmd2/cmd2.py')
-rw-r--r--cmd2/cmd2.py233
1 files changed, 180 insertions, 53 deletions
diff --git a/cmd2/cmd2.py b/cmd2/cmd2.py
index 0ea3d8cd..ed478b0d 100644
--- a/cmd2/cmd2.py
+++ b/cmd2/cmd2.py
@@ -40,6 +40,7 @@ import platform
import re
import shlex
import sys
+import threading
from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple, Type, Union
from . import constants
@@ -50,7 +51,7 @@ from .clipboard import can_clip, get_paste_buffer, write_to_paste_buffer
from .parsing import StatementParser, Statement
# Set up readline
-from .rl_utils import rl_type, RlType
+from .rl_utils import rl_type, RlType, rl_get_point, rl_set_prompt, vt100_support, rl_make_safe_prompt
if rl_type == RlType.NONE: # pragma: no cover
rl_warning = "Readline features including tab completion have been disabled since no \n" \
"supported version of readline was found. To resolve this, install \n" \
@@ -527,6 +528,11 @@ class Cmd(cmd.Cmd):
# This determines if a non-zero exit code should be used when exiting the application
self.exit_code = None
+ # This lock should be acquired before doing any asynchronous changes to the terminal to
+ # ensure the updates to the terminal don't interfere with the input being typed. It can be
+ # acquired any time there is a readline prompt on screen.
+ self.terminal_lock = threading.RLock()
+
# ----- Methods related to presenting output to the user -----
@property
@@ -1636,12 +1642,6 @@ class Cmd(cmd.Cmd):
# Re-raise a KeyboardInterrupt so other parts of the code can catch it
raise KeyboardInterrupt("Got a keyboard interrupt")
- def preloop(self) -> None:
- """Hook method executed once when the cmdloop() method is called."""
- import signal
- # Register a default SIGINT signal handler for Ctrl+C
- signal.signal(signal.SIGINT, self.sigint_handler)
-
def precmd(self, statement: Statement) -> Statement:
"""Hook method executed just before the command is processed by ``onecmd()`` and after adding it to the history.
@@ -2011,34 +2011,6 @@ class Cmd(cmd.Cmd):
# Print out a message stating this is an unknown command
self.poutput('*** Unknown syntax: {}\n'.format(arg))
- @staticmethod
- def _surround_ansi_escapes(prompt: str, start: str="\x01", end: str="\x02") -> str:
- """Overcome bug in GNU Readline in relation to calculation of prompt length in presence of ANSI escape codes.
-
- :param prompt: original prompt
- :param start: start code to tell GNU Readline about beginning of invisible characters
- :param end: end code to tell GNU Readline about end of invisible characters
- :return: prompt safe to pass to GNU Readline
- """
- # Windows terminals don't use ANSI escape codes and Windows readline isn't based on GNU Readline
- if sys.platform == "win32":
- return prompt
-
- escaped = False
- result = ""
-
- for c in prompt:
- if c == "\x1b" and not escaped:
- result += start + c
- escaped = True
- elif c.isalpha() and escaped:
- result += c + end
- escaped = False
- else:
- result += c
-
- return result
-
def pseudo_raw_input(self, prompt: str) -> str:
"""Began life as a copy of cmd's cmdloop; like raw_input but
@@ -2047,23 +2019,37 @@ class Cmd(cmd.Cmd):
to decide whether to print the prompt and the input
"""
- # Deal with the vagaries of readline and ANSI escape codes
- safe_prompt = self._surround_ansi_escapes(prompt)
+ # Temporarily save over self.prompt to reflect what will be on screen
+ orig_prompt = self.prompt
+ self.prompt = prompt
if self.use_rawinput:
try:
if sys.stdin.isatty():
+ # Wrap in try since terminal_lock may not be locked when this function is called from unit tests
+ try:
+ # A prompt is about to be drawn. Allow asynchronous changes to the terminal.
+ self.terminal_lock.release()
+ except RuntimeError:
+ pass
+
+ # Deal with the vagaries of readline and ANSI escape codes
+ safe_prompt = rl_make_safe_prompt(prompt)
line = input(safe_prompt)
else:
line = input()
if self.echo:
- sys.stdout.write('{}{}\n'.format(safe_prompt, line))
+ sys.stdout.write('{}{}\n'.format(self.prompt, line))
except EOFError:
line = 'eof'
+ finally:
+ if sys.stdin.isatty():
+ # The prompt is gone. Do not allow asynchronous changes to the terminal.
+ self.terminal_lock.acquire()
else:
if self.stdin.isatty():
# on a tty, print the prompt first, then read the line
- self.poutput(safe_prompt, end='')
+ self.poutput(self.prompt, end='')
self.stdout.flush()
line = self.stdin.readline()
if len(line) == 0:
@@ -2076,9 +2062,13 @@ class Cmd(cmd.Cmd):
if len(line):
# we read something, output the prompt and the something
if self.echo:
- self.poutput('{}{}'.format(safe_prompt, line))
+ self.poutput('{}{}'.format(self.prompt, line))
else:
line = 'eof'
+
+ # Restore prompt
+ self.prompt = orig_prompt
+
return line.strip()
def _cmdloop(self) -> bool:
@@ -2453,7 +2443,8 @@ class Cmd(cmd.Cmd):
for (idx, (_, text)) in enumerate(fulloptions):
self.poutput(' %2d. %s\n' % (idx + 1, text))
while True:
- response = input(prompt)
+ safe_prompt = rl_make_safe_prompt(prompt)
+ response = input(safe_prompt)
if rl_type != RlType.NONE:
hlen = readline.get_current_history_length()
@@ -3136,6 +3127,125 @@ Script should contain one command per line, just like command would be typed in
runner = unittest.TextTestRunner()
runner.run(testcase)
+ def _clear_input_lines_str(self) -> str: # pragma: no cover
+ """
+ Returns a string that if printed will clear the prompt and input lines in the terminal,
+ leaving the cursor at the beginning of the first input line
+ :return: the string to print
+ """
+ if not (vt100_support and self.use_rawinput):
+ return ''
+
+ import shutil
+ import colorama.ansi as ansi
+ from colorama import Cursor
+
+ visible_prompt = self.visible_prompt
+
+ # Get the size of the terminal
+ terminal_size = shutil.get_terminal_size()
+
+ # Figure out how many lines the prompt and user input take up
+ total_str_size = len(visible_prompt) + len(readline.get_line_buffer())
+ num_input_lines = int(total_str_size / terminal_size.columns) + 1
+
+ # Get the cursor's offset from the beginning of the first input line
+ cursor_input_offset = len(visible_prompt) + rl_get_point()
+
+ # Calculate what input line the cursor is on
+ cursor_input_line = int(cursor_input_offset / terminal_size.columns) + 1
+
+ # Create a string that will clear all input lines and print the alert
+ terminal_str = ''
+
+ # Move the cursor down to the last input line
+ if cursor_input_line != num_input_lines:
+ terminal_str += Cursor.DOWN(num_input_lines - cursor_input_line)
+
+ # Clear each input line from the bottom up so that the cursor ends up on the original first input line
+ terminal_str += (ansi.clear_line() + Cursor.UP(1)) * (num_input_lines - 1)
+ terminal_str += ansi.clear_line()
+
+ # Move the cursor to the beginning of the first input line
+ terminal_str += '\r'
+
+ return terminal_str
+
+ def async_alert(self, alert_msg: str, new_prompt: Optional[str] = None) -> None: # pragma: no cover
+ """
+ Used to display an important message to the user while they are at the prompt in between commands.
+ To the user it appears as if an alert message is printed above the prompt and their current input
+ text and cursor location is left alone.
+
+ IMPORTANT: Do not call this unless you have acquired self.terminal_lock
+ first, which ensures a prompt is onscreen
+
+ :param alert_msg: the message to display to the user
+ :param new_prompt: if you also want to change the prompt that is displayed, then include it here
+ see async_update_prompt() docstring for guidance on updating a prompt
+ :raises RuntimeError if called while another thread holds terminal_lock
+ """
+ if not (vt100_support and self.use_rawinput):
+ return
+
+ # Sanity check that can't fail if self.terminal_lock was acquired before calling this function
+ if self.terminal_lock.acquire(blocking=False):
+
+ # Generate a string to clear the prompt and input lines and replace with the alert
+ terminal_str = self._clear_input_lines_str()
+ if alert_msg:
+ terminal_str += alert_msg + '\n'
+
+ # Set the new prompt now that _clear_input_lines_str is done using the old prompt
+ if new_prompt is not None:
+ self.prompt = new_prompt
+ rl_set_prompt(self.prompt)
+
+ # Print terminal_str to erase the lines
+ if rl_type == RlType.GNU:
+ sys.stderr.write(terminal_str)
+ elif rl_type == RlType.PYREADLINE:
+ readline.rl.mode.console.write(terminal_str)
+
+ # Redraw the prompt and input lines
+ rl_force_redisplay()
+
+ self.terminal_lock.release()
+
+ else:
+ raise RuntimeError("another thread holds terminal_lock")
+
+ def async_update_prompt(self, new_prompt: str) -> None: # pragma: no cover
+ """
+ Updates the prompt while the user is still typing at it. This is good for alerting the user to system
+ changes dynamically in between commands. For instance you could alter the color of the prompt to indicate
+ a system status or increase a counter to report an event. If you do alter the actual text of the prompt,
+ it is best to keep the prompt the same width as what's on screen. Otherwise the user's input text will
+ be shifted and the update will not be seamless.
+
+ IMPORTANT: Do not call this unless you have acquired self.terminal_lock
+ first, which ensures a prompt is onscreen
+
+ :param new_prompt: what to change the prompt to
+ """
+ self.async_alert('', new_prompt)
+
+ @staticmethod
+ def set_window_title(title: str) -> None: # pragma: no cover
+ """
+ Sets the terminal window title
+ :param title: the new window title
+ """
+ if not vt100_support:
+ return
+
+ import colorama.ansi as ansi
+ try:
+ sys.stderr.write(ansi.set_title(title))
+ except AttributeError:
+ # Debugging in Pycharm has issues with setting terminal title
+ pass
+
def cmdloop(self, intro: Optional[str]=None) -> None:
"""This is an outer wrapper around _cmdloop() which deals with extra features provided by cmd2.
@@ -3161,6 +3271,14 @@ Script should contain one command per line, just like command would be typed in
if callargs:
self.cmdqueue.extend(callargs)
+ # Register a SIGINT signal handler for Ctrl+C
+ import signal
+ original_sigint_handler = signal.getsignal(signal.SIGINT)
+ signal.signal(signal.SIGINT, self.sigint_handler)
+
+ # Grab terminal lock before the prompt has been drawn by readline
+ self.terminal_lock.acquire()
+
# Always run the preloop first
for func in self._preloop_hooks:
func()
@@ -3186,6 +3304,13 @@ Script should contain one command per line, just like command would be typed in
func()
self.postloop()
+ # Release terminal lock now that postloop code should have stopped any terminal updater threads
+ # This will also zero the lock count in case cmdloop() is called again
+ self.terminal_lock.release()
+
+ # Restore the original signal handler
+ signal.signal(signal.SIGINT, original_sigint_handler)
+
if self.exit_code is not None:
sys.exit(self.exit_code)
@@ -3194,7 +3319,7 @@ Script should contain one command per line, just like command would be typed in
# plugin related functions
#
###
- def _initialize_plugin_system(self):
+ def _initialize_plugin_system(self) -> None:
"""Initialize the plugin system"""
self._preloop_hooks = []
self._postloop_hooks = []
@@ -3204,7 +3329,7 @@ Script should contain one command per line, just like command would be typed in
self._cmdfinalization_hooks = []
@classmethod
- def _validate_callable_param_count(cls, func: Callable, count: int):
+ def _validate_callable_param_count(cls, func: Callable, count: int) -> None:
"""Ensure a function has the given number of parameters."""
signature = inspect.signature(func)
# validate that the callable has the right number of parameters
@@ -3217,7 +3342,7 @@ Script should contain one command per line, just like command would be typed in
))
@classmethod
- def _validate_prepostloop_callable(cls, func: Callable):
+ def _validate_prepostloop_callable(cls, func: Callable[[None], None]) -> None:
"""Check parameter and return types for preloop and postloop hooks."""
cls._validate_callable_param_count(func, 0)
# make sure there is no return notation
@@ -3227,18 +3352,18 @@ Script should contain one command per line, just like command would be typed in
func.__name__,
))
- def register_preloop_hook(self, func: Callable):
+ def register_preloop_hook(self, func: Callable[[None], None]) -> None:
"""Register a function to be called at the beginning of the command loop."""
self._validate_prepostloop_callable(func)
self._preloop_hooks.append(func)
- def register_postloop_hook(self, func: Callable):
+ def register_postloop_hook(self, func: Callable[[None], None]) -> None:
"""Register a function to be called at the end of the command loop."""
self._validate_prepostloop_callable(func)
self._postloop_hooks.append(func)
@classmethod
- def _validate_postparsing_callable(cls, func: Callable):
+ def _validate_postparsing_callable(cls, func: Callable[[plugin.PostparsingData], plugin.PostparsingData]) -> None:
"""Check parameter and return types for postparsing hooks"""
cls._validate_callable_param_count(func, 1)
signature = inspect.signature(func)
@@ -3252,13 +3377,13 @@ Script should contain one command per line, just like command would be typed in
func.__name__
))
- def register_postparsing_hook(self, func: Callable):
+ def register_postparsing_hook(self, func: Callable[[plugin.PostparsingData], plugin.PostparsingData]) -> None:
"""Register a function to be called after parsing user input but before running the command"""
self._validate_postparsing_callable(func)
self._postparsing_hooks.append(func)
@classmethod
- def _validate_prepostcmd_hook(cls, func: Callable, data_type: Type):
+ def _validate_prepostcmd_hook(cls, func: Callable, data_type: Type) -> None:
"""Check parameter and return types for pre and post command hooks."""
signature = inspect.signature(func)
# validate that the callable has the right number of parameters
@@ -3285,18 +3410,19 @@ Script should contain one command per line, just like command would be typed in
data_type,
))
- def register_precmd_hook(self, func: Callable):
+ def register_precmd_hook(self, func: Callable[[plugin.PrecommandData], plugin.PrecommandData]) -> None:
"""Register a hook to be called before the command function."""
self._validate_prepostcmd_hook(func, plugin.PrecommandData)
self._precmd_hooks.append(func)
- def register_postcmd_hook(self, func: Callable):
+ def register_postcmd_hook(self, func: Callable[[plugin.PostcommandData], plugin.PostcommandData]) -> None:
"""Register a hook to be called after the command function."""
self._validate_prepostcmd_hook(func, plugin.PostcommandData)
self._postcmd_hooks.append(func)
@classmethod
- def _validate_cmdfinalization_callable(cls, func: Callable):
+ def _validate_cmdfinalization_callable(cls, func: Callable[[plugin.CommandFinalizationData],
+ plugin.CommandFinalizationData]) -> None:
"""Check parameter and return types for command finalization hooks."""
cls._validate_callable_param_count(func, 1)
signature = inspect.signature(func)
@@ -3308,7 +3434,8 @@ Script should contain one command per line, just like command would be typed in
raise TypeError("{} must declare return a return type of "
"'cmd2.plugin.CommandFinalizationData'".format(func.__name__))
- def register_cmdfinalization_hook(self, func: Callable):
+ def register_cmdfinalization_hook(self, func: Callable[[plugin.CommandFinalizationData],
+ plugin.CommandFinalizationData]) -> None:
"""Register a hook to be called after a command is completed, whether it completes successfully or not."""
self._validate_cmdfinalization_callable(func)
self._cmdfinalization_hooks.append(func)