summaryrefslogtreecommitdiff
path: root/cmd2/cmd2.py
diff options
context:
space:
mode:
Diffstat (limited to 'cmd2/cmd2.py')
-rw-r--r--cmd2/cmd2.py1562
1 files changed, 971 insertions, 591 deletions
diff --git a/cmd2/cmd2.py b/cmd2/cmd2.py
index fb929078..dec0a04d 100644
--- a/cmd2/cmd2.py
+++ b/cmd2/cmd2.py
@@ -32,25 +32,26 @@ Git repository on GitHub at https://github.com/python-cmd2/cmd2
import argparse
import cmd
import collections
+import colorama
from colorama import Fore
import glob
import inspect
import os
-import platform
import re
import shlex
import sys
-from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple, Type, Union
+import threading
+from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple, Type, Union, IO
from . import constants
from . import utils
from . import plugin
-from .argparse_completer import AutoCompleter, ACArgumentParser
+from .argparse_completer import AutoCompleter, ACArgumentParser, ACTION_ARG_CHOICES
from .clipboard import can_clip, get_paste_buffer, write_to_paste_buffer
-from .parsing import StatementParser, Statement
+from .parsing import StatementParser, Statement, Macro, MacroArg
# Set up readline
-from .rl_utils import rl_type, RlType
+from .rl_utils import rl_type, RlType, rl_get_point, rl_set_prompt, vt100_support, rl_make_safe_prompt
if rl_type == RlType.NONE: # pragma: no cover
rl_warning = "Readline features including tab completion have been disabled since no \n" \
"supported version of readline was found. To resolve this, install \n" \
@@ -104,9 +105,9 @@ except ImportError:
# Python 3.4 require contextlib2 for temporarily redirecting stderr and stdout
if sys.version_info < (3, 5):
- from contextlib2 import redirect_stdout, redirect_stderr
+ from contextlib2 import redirect_stdout
else:
- from contextlib import redirect_stdout, redirect_stderr
+ from contextlib import redirect_stdout
# Detect whether IPython is installed to determine if the built-in "ipy" command should be included
ipython_available = True
@@ -121,6 +122,13 @@ except ImportError: # pragma: no cover
HELP_CATEGORY = 'help_category'
HELP_SUMMARY = 'help_summary'
+INTERNAL_COMMAND_EPILOG = ("Notes:\n"
+ " This command is for internal use and is not intended to be called from the\n"
+ " command line.")
+
+# All command functions start with this
+COMMAND_PREFIX = 'do_'
+
def categorize(func: Union[Callable, Iterable], category: str) -> None:
"""Categorize a function.
@@ -137,19 +145,21 @@ def categorize(func: Union[Callable, Iterable], category: str) -> None:
setattr(func, HELP_CATEGORY, category)
-def parse_quoted_string(cmdline: str) -> List[str]:
- """Parse a quoted string into a list of arguments."""
- if isinstance(cmdline, list):
+def parse_quoted_string(string: str, preserve_quotes: bool) -> List[str]:
+ """
+ Parse a quoted string into a list of arguments
+ :param string: the string being parsed
+ :param preserve_quotes: if True, then quotes will not be stripped
+ """
+ if isinstance(string, list):
# arguments are already a list, return the list we were passed
- lexed_arglist = cmdline
+ lexed_arglist = string
else:
# Use shlex to split the command line into a list of arguments based on shell rules
- lexed_arglist = shlex.split(cmdline, posix=False)
- # strip off outer quotes for convenience
- temp_arglist = []
- for arg in lexed_arglist:
- temp_arglist.append(utils.strip_quotes(arg))
- lexed_arglist = temp_arglist
+ lexed_arglist = shlex.split(string, posix=False)
+
+ if not preserve_quotes:
+ lexed_arglist = [utils.strip_quotes(arg) for arg in lexed_arglist]
return lexed_arglist
@@ -161,7 +171,7 @@ def with_category(category: str) -> Callable:
return cat_decorator
-def with_argument_list(func: Callable) -> Callable:
+def with_argument_list(func: Callable, preserve_quotes: bool=False) -> Callable:
"""A decorator to alter the arguments passed to a do_* cmd2
method. Default passes a string of whatever the user typed.
With this decorator, the decorated method will receive a list
@@ -170,18 +180,19 @@ def with_argument_list(func: Callable) -> Callable:
@functools.wraps(func)
def cmd_wrapper(self, cmdline):
- lexed_arglist = parse_quoted_string(cmdline)
+ lexed_arglist = parse_quoted_string(cmdline, preserve_quotes)
return func(self, lexed_arglist)
cmd_wrapper.__doc__ = func.__doc__
return cmd_wrapper
-def with_argparser_and_unknown_args(argparser: argparse.ArgumentParser) -> Callable:
+def with_argparser_and_unknown_args(argparser: argparse.ArgumentParser, preserve_quotes: bool=False) -> Callable:
"""A decorator to alter a cmd2 method to populate its ``args`` argument by parsing arguments with the given
instance of argparse.ArgumentParser, but also returning unknown args as a list.
- :param argparser: argparse.ArgumentParser - given instance of ArgumentParser
+ :param argparser: given instance of ArgumentParser
+ :param preserve_quotes: if True, then the arguments passed to arparse be maintain their quotes
:return: function that gets passed parsed args and a list of unknown args
"""
import functools
@@ -190,7 +201,7 @@ def with_argparser_and_unknown_args(argparser: argparse.ArgumentParser) -> Calla
def arg_decorator(func: Callable):
@functools.wraps(func)
def cmd_wrapper(instance, cmdline):
- lexed_arglist = parse_quoted_string(cmdline)
+ lexed_arglist = parse_quoted_string(cmdline, preserve_quotes)
try:
args, unknown = argparser.parse_known_args(lexed_arglist)
except SystemExit:
@@ -219,11 +230,12 @@ def with_argparser_and_unknown_args(argparser: argparse.ArgumentParser) -> Calla
return arg_decorator
-def with_argparser(argparser: argparse.ArgumentParser) -> Callable:
+def with_argparser(argparser: argparse.ArgumentParser, preserve_quotes: bool=False) -> Callable:
"""A decorator to alter a cmd2 method to populate its ``args`` argument by parsing arguments
with the given instance of argparse.ArgumentParser.
- :param argparser: argparse.ArgumentParser - given instance of ArgumentParser
+ :param argparser: given instance of ArgumentParser
+ :param preserve_quotes: if True, then the arguments passed to arparse be maintain their quotes
:return: function that gets passed parsed args
"""
import functools
@@ -232,7 +244,7 @@ def with_argparser(argparser: argparse.ArgumentParser) -> Callable:
def arg_decorator(func: Callable):
@functools.wraps(func)
def cmd_wrapper(instance, cmdline):
- lexed_arglist = parse_quoted_string(cmdline)
+ lexed_arglist = parse_quoted_string(cmdline, preserve_quotes)
try:
args = argparser.parse_args(lexed_arglist)
except SystemExit:
@@ -306,7 +318,6 @@ class Cmd(cmd.Cmd):
# Attributes used to configure the StatementParser, best not to change these at runtime
multiline_commands = []
shortcuts = {'?': 'help', '!': 'shell', '@': 'load', '@@': '_relative_load'}
- aliases = dict()
terminators = [';']
# Attributes which are NOT dynamically settable at runtime
@@ -317,7 +328,7 @@ class Cmd(cmd.Cmd):
reserved_words = []
# Attributes which ARE dynamically settable at runtime
- colors = (platform.system() != 'Windows')
+ colors = constants.COLORS_TERMINAL
continuation_prompt = '> '
debug = False
echo = False
@@ -337,7 +348,7 @@ class Cmd(cmd.Cmd):
# To make an attribute settable with the "do_set" command, add it to this ...
# This starts out as a dictionary but gets converted to an OrderedDict sorted alphabetically by key
- settable = {'colors': 'Colorized output (*nix only)',
+ settable = {'colors': 'Allow colorized output (valid values: Terminal, Always, Never)',
'continuation_prompt': 'On 2nd+ line of input',
'debug': 'Show full error stack on error',
'echo': 'Echo command issued into output',
@@ -369,6 +380,9 @@ class Cmd(cmd.Cmd):
except AttributeError:
pass
+ # Override whether ansi codes should be stripped from the output since cmd2 has its own logic for doing this
+ colorama.init(strip=False)
+
# initialize plugin system
# needs to be done before we call __init__(0)
self._initialize_plugin_system()
@@ -376,12 +390,20 @@ class Cmd(cmd.Cmd):
# Call super class constructor
super().__init__(completekey=completekey, stdin=stdin, stdout=stdout)
+ # Get rid of cmd's complete_help() functions so AutoCompleter will complete our help command
+ if getattr(cmd.Cmd, 'complete_help', None) is not None:
+ delattr(cmd.Cmd, 'complete_help')
+
# Commands to exclude from the help menu and tab completion
self.hidden_commands = ['eof', 'eos', '_relative_load']
# Commands to exclude from the history command
self.exclude_from_history = '''history edit eof eos'''.split()
+ # Command aliases and macros
+ self.aliases = dict()
+ self.macros = dict()
+
self._finalize_app_parameters()
self.initial_stdout = sys.stdout
@@ -389,7 +411,7 @@ class Cmd(cmd.Cmd):
self.pystate = {}
self.py_history = []
self.pyscript_name = 'app'
- self.keywords = self.reserved_words + [fname[3:] for fname in dir(self) if fname.startswith('do_')]
+ self.keywords = self.reserved_words + self.get_all_commands()
self.statement_parser = StatementParser(
allow_redirection=self.allow_redirection,
terminators=self.terminators,
@@ -417,13 +439,13 @@ class Cmd(cmd.Cmd):
self._STOP_AND_EXIT = True # cmd convention
self._colorcodes = {'bold': {True: '\x1b[1m', False: '\x1b[22m'},
- 'cyan': {True: '\x1b[36m', False: '\x1b[39m'},
- 'blue': {True: '\x1b[34m', False: '\x1b[39m'},
- 'red': {True: '\x1b[31m', False: '\x1b[39m'},
- 'magenta': {True: '\x1b[35m', False: '\x1b[39m'},
- 'green': {True: '\x1b[32m', False: '\x1b[39m'},
- 'underline': {True: '\x1b[4m', False: '\x1b[24m'},
- 'yellow': {True: '\x1b[33m', False: '\x1b[39m'}}
+ 'cyan': {True: Fore.CYAN, False: Fore.RESET},
+ 'blue': {True: Fore.BLUE, False: Fore.RESET},
+ 'red': {True: Fore.RED, False: Fore.RESET},
+ 'magenta': {True: Fore.MAGENTA, False: Fore.RESET},
+ 'green': {True: Fore.GREEN, False: Fore.RESET},
+ 'underline': {True: '\x1b[4m', False: Fore.RESET},
+ 'yellow': {True: Fore.YELLOW, False: Fore.RESET}}
# Used load command to store the current script dir as a LIFO queue to support _relative_load command
self._script_dir = []
@@ -441,6 +463,7 @@ class Cmd(cmd.Cmd):
self.broken_pipe_warning = ''
# Check if history should persist
+ self.persistent_history_file = ''
if persistent_history_file and rl_type != RlType.NONE:
persistent_history_file = os.path.expanduser(persistent_history_file)
read_err = False
@@ -526,6 +549,11 @@ class Cmd(cmd.Cmd):
# This determines if a non-zero exit code should be used when exiting the application
self.exit_code = None
+ # This lock should be acquired before doing any asynchronous changes to the terminal to
+ # ensure the updates to the terminal don't interfere with the input being typed. It can be
+ # acquired any time there is a readline prompt on screen.
+ self.terminal_lock = threading.RLock()
+
# ----- Methods related to presenting output to the user -----
@property
@@ -547,34 +575,53 @@ class Cmd(cmd.Cmd):
# Make sure settable parameters are sorted alphabetically by key
self.settable = collections.OrderedDict(sorted(self.settable.items(), key=lambda t: t[0]))
- def poutput(self, msg: str, end: str='\n') -> None:
- """Convenient shortcut for self.stdout.write(); by default adds newline to end if not already present.
+ def decolorized_write(self, fileobj: IO, msg: str) -> None:
+ """Write a string to a fileobject, stripping ANSI escape sequences if necessary
- Also handles BrokenPipeError exceptions for when a commands's output has been piped to another process and
- that process terminates before the cmd2 command is finished executing.
+ Honor the current colors setting, which requires us to check whether the
+ fileobject is a tty.
+ """
+ if self.colors.lower() == constants.COLORS_NEVER.lower() or \
+ (self.colors.lower() == constants.COLORS_TERMINAL.lower() and not fileobj.isatty()):
+ msg = utils.strip_ansi(msg)
+ fileobj.write(msg)
- :param msg: message to print to current stdout - anything convertible to a str with '{}'.format() is OK
- :param end: string appended after the end of the message if not already present, default a newline
+ def poutput(self, msg: Any, end: str='\n', color: str='') -> None:
+ """Smarter self.stdout.write(); color aware and adds newline of not present.
+
+ Also handles BrokenPipeError exceptions for when a commands's output has
+ been piped to another process and that process terminates before the
+ cmd2 command is finished executing.
+
+ :param msg: message to print to current stdout (anything convertible to a str with '{}'.format() is OK)
+ :param end: (optional) string appended after the end of the message if not already present, default a newline
+ :param color: (optional) color escape to output this message with
"""
if msg is not None and msg != '':
try:
msg_str = '{}'.format(msg)
- self.stdout.write(msg_str)
if not msg_str.endswith(end):
- self.stdout.write(end)
+ msg_str += end
+ if color:
+ msg_str = color + msg_str + Fore.RESET
+ self.decolorized_write(self.stdout, msg_str)
except BrokenPipeError:
- # This occurs if a command's output is being piped to another process and that process closes before the
- # command is finished. If you would like your application to print a warning message, then set the
- # broken_pipe_warning attribute to the message you want printed.
+ # This occurs if a command's output is being piped to another
+ # process and that process closes before the command is
+ # finished. If you would like your application to print a
+ # warning message, then set the broken_pipe_warning attribute
+ # to the message you want printed.
if self.broken_pipe_warning:
sys.stderr.write(self.broken_pipe_warning)
- def perror(self, err: Union[str, Exception], traceback_war: bool=True) -> None:
+ def perror(self, err: Union[str, Exception], traceback_war: bool=True, err_color: str=Fore.LIGHTRED_EX,
+ war_color: str=Fore.LIGHTYELLOW_EX) -> None:
""" Print error message to sys.stderr and if debug is true, print an exception Traceback if one exists.
:param err: an Exception or error message to print out
:param traceback_war: (optional) if True, print a message to let user know they can enable debug
- :return:
+ :param err_color: (optional) color escape to output error with
+ :param war_color: (optional) color escape to output warning with
"""
if self.debug:
import traceback
@@ -582,14 +629,15 @@ class Cmd(cmd.Cmd):
if isinstance(err, Exception):
err_msg = "EXCEPTION of type '{}' occurred with message: '{}'\n".format(type(err).__name__, err)
- sys.stderr.write(self.colorize(err_msg, 'red'))
else:
- err_msg = self.colorize("ERROR: {}\n".format(err), 'red')
- sys.stderr.write(err_msg)
+ err_msg = "ERROR: {}\n".format(err)
+ err_msg = err_color + err_msg + Fore.RESET
+ self.decolorized_write(sys.stderr, err_msg)
if traceback_war:
war = "To enable full traceback, run the following command: 'set debug true'\n"
- sys.stderr.write(self.colorize(war, 'yellow'))
+ war = war_color + war + Fore.RESET
+ self.decolorized_write(sys.stderr, war)
def pfeedback(self, msg: str) -> None:
"""For printing nonessential feedback. Can be silenced with `quiet`.
@@ -598,7 +646,7 @@ class Cmd(cmd.Cmd):
if self.feedback_to_output:
self.poutput(msg)
else:
- sys.stderr.write("{}\n".format(msg))
+ self.decolorized_write(sys.stderr, "{}\n".format(msg))
def ppaged(self, msg: str, end: str='\n', chop: bool=False) -> None:
"""Print output using a pager if it would go off screen and stdout isn't currently being redirected.
@@ -606,7 +654,7 @@ class Cmd(cmd.Cmd):
Never uses a pager inside of a script (Python or text) or when output is being redirected or piped or when
stdout or stdin are not a fully functional terminal.
- :param msg: message to print to current stdout - anything convertible to a str with '{}'.format() is OK
+ :param msg: message to print to current stdout (anything convertible to a str with '{}'.format() is OK)
:param end: string appended after the end of the message if not already present, default a newline
:param chop: True -> causes lines longer than the screen width to be chopped (truncated) rather than wrapped
- truncated text is still accessible by scrolling with the right & left arrow keys
@@ -634,6 +682,9 @@ class Cmd(cmd.Cmd):
# Don't attempt to use a pager that can block if redirecting or running a script (either text or Python)
# Also only attempt to use a pager if actually running in a real fully functional terminal
if functional_terminal and not self.redirecting and not self._in_py and not self._script_dir:
+ if self.colors.lower() == constants.COLORS_NEVER.lower():
+ msg_str = utils.strip_ansi(msg_str)
+
pager = self.pager
if chop:
pager = self.pager_chop
@@ -658,7 +709,7 @@ class Cmd(cmd.Cmd):
except BrokenPipeError:
# This occurs if a command's output is being piped to another process and that process closes before the
# command is finished. If you would like your application to print a warning message, then set the
- # broken_pipe_warning attribute to the message you want printed.
+ # broken_pipe_warning attribute to the message you want printed.`
if self.broken_pipe_warning:
sys.stderr.write(self.broken_pipe_warning)
@@ -669,7 +720,7 @@ class Cmd(cmd.Cmd):
is running on Windows, will return ``val`` unchanged.
``color`` should be one of the supported strings (or styles):
red/blue/green/cyan/magenta, bold, underline"""
- if self.colors and (self.stdout == self.initial_stdout):
+ if self.colors.lower() != constants.COLORS_NEVER.lower() and (self.stdout == self.initial_stdout):
return self._colorcodes[color][True] + val + self._colorcodes[color][False]
return val
@@ -897,14 +948,13 @@ class Cmd(cmd.Cmd):
:param line: the current input line with leading whitespace removed
:param begidx: the beginning index of the prefix text
:param endidx: the ending index of the prefix text
- :param flag_dict: dict - dictionary whose structure is the following:
- keys - flags (ex: -c, --create) that result in tab completion for the next
- argument in the command line
- values - there are two types of values
- 1. iterable list of strings to match against (dictionaries, lists, etc.)
- 2. function that performs tab completion (ex: path_complete)
- :param all_else: Collection or function - an optional parameter for tab completing any token that isn't preceded
- by a flag in flag_dict
+ :param flag_dict: dictionary whose structure is the following:
+ keys - flags (ex: -c, --create) that result in tab completion for the next
+ argument in the command line
+ values - there are two types of values
+ 1. iterable list of strings to match against (dictionaries, lists, etc.)
+ 2. function that performs tab completion (ex: path_complete)
+ :param all_else: an optional parameter for tab completing any token that isn't preceded by a flag in flag_dict
:return: a list of possible tab completions
"""
# Get all tokens through the one being completed
@@ -940,14 +990,13 @@ class Cmd(cmd.Cmd):
:param line: the current input line with leading whitespace removed
:param begidx: the beginning index of the prefix text
:param endidx: the ending index of the prefix text
- :param index_dict: dict - dictionary whose structure is the following:
- keys - 0-based token indexes into command line that determine which tokens
- perform tab completion
- values - there are two types of values
- 1. iterable list of strings to match against (dictionaries, lists, etc.)
- 2. function that performs tab completion (ex: path_complete)
- :param all_else: Collection or function - an optional parameter for tab completing any token that isn't at an
- index in index_dict
+ :param index_dict: dictionary whose structure is the following:
+ keys - 0-based token indexes into command line that determine which tokens
+ perform tab completion
+ values - there are two types of values
+ 1. iterable list of strings to match against (dictionaries, lists, etc.)
+ 2. function that performs tab completion (ex: path_complete)
+ :param all_else: an optional parameter for tab completing any token that isn't at an index in index_dict
:return: a list of possible tab completions
"""
# Get all tokens through the one being completed
@@ -1093,8 +1142,8 @@ class Cmd(cmd.Cmd):
self.allow_appended_space = False
self.allow_closing_quote = False
- # Sort the matches before any trailing slashes are added
- matches = utils.alphabetical_sort(matches)
+ # Sort the matches alphabetically before any trailing slashes are added
+ matches.sort(key=utils.norm_fold)
self.matches_sorted = True
# Build display_matches and add a slash to directories
@@ -1433,18 +1482,21 @@ class Cmd(cmd.Cmd):
# Check if a valid command was entered
if command in self.get_all_commands():
# Get the completer function for this command
- try:
- compfunc = getattr(self, 'complete_' + command)
- except AttributeError:
+ compfunc = getattr(self, 'complete_' + command, None)
+
+ if compfunc is None:
# There's no completer function, next see if the command uses argparser
- try:
- cmd_func = getattr(self, 'do_' + command)
- argparser = getattr(cmd_func, 'argparser')
- # Command uses argparser, switch to the default argparse completer
- compfunc = functools.partial(self._autocomplete_default, argparser=argparser)
- except AttributeError:
+ func = self.cmd_func(command)
+ if func and hasattr(func, 'argparser'):
+ compfunc = functools.partial(self._autocomplete_default,
+ argparser=getattr(func, 'argparser'))
+ else:
compfunc = self.completedefault
+ # Check if a macro was entered
+ elif command in self.macros:
+ compfunc = self.path_complete
+
# A valid command was not entered
else:
# Check if this command should be run as a shell command
@@ -1512,11 +1564,9 @@ class Cmd(cmd.Cmd):
[shortcut_to_restore + match for match in self.completion_matches]
else:
- # Complete token against aliases and command names
- alias_names = set(self.aliases.keys())
- visible_commands = set(self.get_visible_commands())
- strs_to_match = list(alias_names | visible_commands)
- self.completion_matches = self.basic_complete(text, line, begidx, endidx, strs_to_match)
+ # Complete token against anything a user can run
+ self.completion_matches = self.basic_complete(text, line, begidx, endidx,
+ self.get_commands_aliases_and_macros_for_completion())
# Handle single result
if len(self.completion_matches) == 1:
@@ -1534,8 +1584,8 @@ class Cmd(cmd.Cmd):
# Sort matches alphabetically if they haven't already been sorted
if not self.matches_sorted:
- self.completion_matches = utils.alphabetical_sort(self.completion_matches)
- self.display_matches = utils.alphabetical_sort(self.display_matches)
+ self.completion_matches.sort(key=utils.norm_fold)
+ self.display_matches.sort(key=utils.norm_fold)
self.matches_sorted = True
try:
@@ -1556,8 +1606,8 @@ class Cmd(cmd.Cmd):
def get_all_commands(self) -> List[str]:
"""Returns a list of all commands."""
- return [name[3:] for name in self.get_names()
- if name.startswith('do_') and isinstance(getattr(self, name), Callable)]
+ return [name[len(COMMAND_PREFIX):] for name in self.get_names()
+ if name.startswith(COMMAND_PREFIX) and callable(getattr(self, name))]
def get_visible_commands(self) -> List[str]:
"""Returns a list of commands that have not been hidden."""
@@ -1570,53 +1620,25 @@ class Cmd(cmd.Cmd):
return commands
- def get_help_topics(self) -> List[str]:
- """ Returns a list of help topics """
- return [name[5:] for name in self.get_names()
- if name.startswith('help_') and isinstance(getattr(self, name), Callable)]
-
- def complete_help(self, text: str, line: str, begidx: int, endidx: int) -> List[str]:
- """
- Override of parent class method to handle tab completing subcommands and not showing hidden commands
- Returns a list of possible tab completions
- """
-
- # The command is the token at index 1 in the command line
- cmd_index = 1
-
- # The subcommand is the token at index 2 in the command line
- subcmd_index = 2
-
- # Get all tokens through the one being completed
- tokens, _ = self.tokens_for_completion(line, begidx, endidx)
- if not tokens:
- return []
-
- matches = []
+ def get_alias_names(self) -> List[str]:
+ """Return a list of alias names."""
+ return list(self.aliases)
- # Get the index of the token being completed
- index = len(tokens) - 1
-
- # Check if we are completing a command or help topic
- if index == cmd_index:
+ def get_macro_names(self) -> List[str]:
+ """Return a list of macro names."""
+ return list(self.macros)
- # Complete token against topics and visible commands
- topics = set(self.get_help_topics())
- visible_commands = set(self.get_visible_commands())
- strs_to_match = list(topics | visible_commands)
- matches = self.basic_complete(text, line, begidx, endidx, strs_to_match)
-
- # check if the command uses argparser
- elif index >= subcmd_index:
- try:
- cmd_func = getattr(self, 'do_' + tokens[cmd_index])
- parser = getattr(cmd_func, 'argparser')
- completer = AutoCompleter(parser)
- matches = completer.complete_command_help(tokens[1:], text, line, begidx, endidx)
- except AttributeError:
- pass
+ def get_commands_aliases_and_macros_for_completion(self) -> List[str]:
+ """Return a list of visible commands, aliases, and macros for tab completion"""
+ visible_commands = set(self.get_visible_commands())
+ alias_names = set(self.get_alias_names())
+ macro_names = set(self.get_macro_names())
+ return list(visible_commands | alias_names | macro_names)
- return matches
+ def get_help_topics(self) -> List[str]:
+ """ Returns a list of help topics """
+ return [name[5:] for name in self.get_names()
+ if name.startswith('help_') and callable(getattr(self, name))]
# noinspection PyUnusedLocal
def sigint_handler(self, signum: int, frame) -> None:
@@ -1637,12 +1659,6 @@ class Cmd(cmd.Cmd):
# Re-raise a KeyboardInterrupt so other parts of the code can catch it
raise KeyboardInterrupt("Got a keyboard interrupt")
- def preloop(self) -> None:
- """Hook method executed once when the cmdloop() method is called."""
- import signal
- # Register a default SIGINT signal handler for Ctrl+C
- signal.signal(signal.SIGINT, self.sigint_handler)
-
def precmd(self, statement: Statement) -> Statement:
"""Hook method executed just before the command is processed by ``onecmd()`` and after adding it to the history.
@@ -1651,58 +1667,6 @@ class Cmd(cmd.Cmd):
"""
return statement
- # ----- Methods which are cmd2-specific lifecycle hooks which are not present in cmd -----
-
- # noinspection PyMethodMayBeStatic
- def preparse(self, raw: str) -> str:
- """Hook method executed before user input is parsed.
-
- WARNING: If it's a multiline command, `preparse()` may not get all the
- user input. _complete_statement() really does two things: a) parse the
- user input, and b) accept more input in case it's a multiline command
- the passed string doesn't have a terminator. `preparse()` is currently
- called before we know whether it's a multiline command, and before we
- know whether the user input includes a termination character.
-
- If you want a reliable pre parsing hook method, register a postparsing
- hook, modify the user input, and then reparse it.
-
- :param raw: raw command line input :return: potentially modified raw command line input
- :return: a potentially modified version of the raw input string
- """
- return raw
-
- # noinspection PyMethodMayBeStatic
- def postparsing_precmd(self, statement: Statement) -> Tuple[bool, Statement]:
- """This runs after parsing the command-line, but before anything else; even before adding cmd to history.
-
- NOTE: This runs before precmd() and prior to any potential output redirection or piping.
-
- If you wish to fatally fail this command and exit the application entirely, set stop = True.
-
- If you wish to just fail this command you can do so by raising an exception:
-
- - raise EmptyStatement - will silently fail and do nothing
- - raise <AnyOtherException> - will fail and print an error message
-
- :param statement: - the parsed command-line statement as a Statement object
- :return: (bool, statement) - (stop, statement) containing a potentially modified version of the statement object
- """
- stop = False
- return stop, statement
-
- # noinspection PyMethodMayBeStatic
- def postparsing_postcmd(self, stop: bool) -> bool:
- """This runs after everything else, including after postcmd().
-
- It even runs when an empty line is entered. Thus, if you need to do something like update the prompt due
- to notifications from a background thread, then this is the method you want to override to do it.
-
- :param stop: bool - True implies the entire application should exit.
- :return: bool - True implies the entire application should exit.
- """
- return stop
-
def parseline(self, line: str) -> Tuple[str, str, str]:
"""Parse the line into a command name and a string containing the arguments.
@@ -1742,9 +1706,6 @@ class Cmd(cmd.Cmd):
data = func(data)
if data.stop:
break
- # postparsing_precmd is deprecated
- if not data.stop:
- (data.stop, data.statement) = self.postparsing_precmd(data.statement)
# unpack the data object
statement = data.statement
stop = data.stop
@@ -1809,9 +1770,7 @@ class Cmd(cmd.Cmd):
data = func(data)
# retrieve the final value of stop, ignoring any
# modifications to the statement
- stop = data.stop
- # postparsing_postcmd is deprecated
- return self.postparsing_postcmd(stop)
+ return data.stop
except Exception as ex:
self.perror(ex)
@@ -1865,9 +1824,6 @@ class Cmd(cmd.Cmd):
pipe runs out. We can't refactor it because we need to retain
backwards compatibility with the standard library version of cmd.
"""
- # preparse() is deprecated, use self.register_postparsing_hook() instead
- line = self.preparse(line)
-
while True:
try:
statement = self.statement_parser.parse(line)
@@ -2014,49 +1970,91 @@ class Cmd(cmd.Cmd):
self.redirecting = False
- def _func_named(self, arg: str) -> str:
- """Gets the method name associated with a given command.
+ def cmd_func(self, command: str) -> Optional[Callable]:
+ """
+ Get the function for a command
+ :param command: the name of the command
+ """
+ func_name = self.cmd_func_name(command)
+ if func_name:
+ return getattr(self, func_name)
+
+ def cmd_func_name(self, command: str) -> str:
+ """Get the method name associated with a given command.
- :param arg: command to look up method name which implements it
+ :param command: command to look up method name which implements it
:return: method name which implements the given command
"""
- result = None
- target = 'do_' + arg
- if target in dir(self):
- result = target
- return result
+ target = COMMAND_PREFIX + command
+ return target if callable(getattr(self, target, None)) else ''
- def onecmd(self, statement: Union[Statement, str]) -> Optional[bool]:
+ def onecmd(self, statement: Union[Statement, str]) -> bool:
""" This executes the actual do_* method for a command.
If the command provided doesn't exist, then it executes _default() instead.
- :param statement: Command - intended to be a Statement instance parsed command from the input stream,
- alternative acceptance of a str is present only for backward compatibility with cmd
+ :param statement: intended to be a Statement instance parsed command from the input stream, alternative
+ acceptance of a str is present only for backward compatibility with cmd
:return: a flag indicating whether the interpretation of commands should stop
"""
# For backwards compatibility with cmd, allow a str to be passed in
if not isinstance(statement, Statement):
statement = self._complete_statement(statement)
- funcname = self._func_named(statement.command)
- if not funcname:
- self.default(statement)
- return
+ # Check if this is a macro
+ if statement.command in self.macros:
+ stop = self._run_macro(statement)
+ else:
+ func = self.cmd_func(statement.command)
+ if func:
+ stop = func(statement)
- # Since we have a valid command store it in the history
- if statement.command not in self.exclude_from_history:
- self.history.append(statement.raw)
+ # Since we have a valid command store it in the history
+ if statement.command not in self.exclude_from_history:
+ self.history.append(statement.raw)
- try:
- func = getattr(self, funcname)
- except AttributeError:
- self.default(statement)
- return
+ else:
+ self.default(statement)
+ stop = False
- stop = func(statement)
return stop
+ def _run_macro(self, statement: Statement) -> bool:
+ """
+ Resolve a macro and run the resulting string
+
+ :param statement: the parsed statement from the command line
+ :return: a flag indicating whether the interpretation of commands should stop
+ """
+ if statement.command not in self.macros.keys():
+ raise KeyError('{} is not a macro'.format(statement.command))
+
+ macro = self.macros[statement.command]
+
+ # For macros, every argument must be provided and there can be no extra arguments.
+ if len(statement.arg_list) != macro.required_arg_count:
+ self.perror("The macro '{}' expects {} argument(s)".format(statement.command, macro.required_arg_count),
+ traceback_war=False)
+ return False
+
+ # Resolve the arguments in reverse
+ resolved = macro.value
+ reverse_arg_list = sorted(macro.arg_list, key=lambda ma: ma.start_index, reverse=True)
+
+ for arg in reverse_arg_list:
+ if arg.is_escaped:
+ to_replace = '{{' + arg.number_str + '}}'
+ replacement = '{' + arg.number_str + '}'
+ else:
+ to_replace = '{' + arg.number_str + '}'
+ replacement = statement.argv[int(arg.number_str)]
+
+ parts = resolved.rsplit(to_replace, maxsplit=1)
+ resolved = parts[0] + replacement + parts[1]
+
+ # Run the resolved command
+ return self.onecmd_plus_hooks(resolved)
+
def default(self, statement: Statement) -> None:
"""Executed when the command given isn't a recognized command implemented by a do_* method.
@@ -2072,34 +2070,6 @@ class Cmd(cmd.Cmd):
# Print out a message stating this is an unknown command
self.poutput('*** Unknown syntax: {}\n'.format(arg))
- @staticmethod
- def _surround_ansi_escapes(prompt: str, start: str="\x01", end: str="\x02") -> str:
- """Overcome bug in GNU Readline in relation to calculation of prompt length in presence of ANSI escape codes.
-
- :param prompt: original prompt
- :param start: start code to tell GNU Readline about beginning of invisible characters
- :param end: end code to tell GNU Readline about end of invisible characters
- :return: prompt safe to pass to GNU Readline
- """
- # Windows terminals don't use ANSI escape codes and Windows readline isn't based on GNU Readline
- if sys.platform == "win32":
- return prompt
-
- escaped = False
- result = ""
-
- for c in prompt:
- if c == "\x1b" and not escaped:
- result += start + c
- escaped = True
- elif c.isalpha() and escaped:
- result += c + end
- escaped = False
- else:
- result += c
-
- return result
-
def pseudo_raw_input(self, prompt: str) -> str:
"""Began life as a copy of cmd's cmdloop; like raw_input but
@@ -2108,23 +2078,37 @@ class Cmd(cmd.Cmd):
to decide whether to print the prompt and the input
"""
- # Deal with the vagaries of readline and ANSI escape codes
- safe_prompt = self._surround_ansi_escapes(prompt)
+ # Temporarily save over self.prompt to reflect what will be on screen
+ orig_prompt = self.prompt
+ self.prompt = prompt
if self.use_rawinput:
try:
if sys.stdin.isatty():
+ # Wrap in try since terminal_lock may not be locked when this function is called from unit tests
+ try:
+ # A prompt is about to be drawn. Allow asynchronous changes to the terminal.
+ self.terminal_lock.release()
+ except RuntimeError:
+ pass
+
+ # Deal with the vagaries of readline and ANSI escape codes
+ safe_prompt = rl_make_safe_prompt(prompt)
line = input(safe_prompt)
else:
line = input()
if self.echo:
- sys.stdout.write('{}{}\n'.format(safe_prompt, line))
+ sys.stdout.write('{}{}\n'.format(self.prompt, line))
except EOFError:
line = 'eof'
+ finally:
+ if sys.stdin.isatty():
+ # The prompt is gone. Do not allow asynchronous changes to the terminal.
+ self.terminal_lock.acquire()
else:
if self.stdin.isatty():
# on a tty, print the prompt first, then read the line
- self.poutput(safe_prompt, end='')
+ self.poutput(self.prompt, end='')
self.stdout.flush()
line = self.stdin.readline()
if len(line) == 0:
@@ -2137,9 +2121,13 @@ class Cmd(cmd.Cmd):
if len(line):
# we read something, output the prompt and the something
if self.echo:
- self.poutput('{}{}'.format(safe_prompt, line))
+ self.poutput('{}{}'.format(self.prompt, line))
else:
line = 'eof'
+
+ # Restore prompt
+ self.prompt = orig_prompt
+
return line.strip()
def _cmdloop(self) -> bool:
@@ -2219,147 +2207,423 @@ class Cmd(cmd.Cmd):
return stop
- def do_alias(self, statement: Statement) -> None:
- """Define or display aliases
+ # ----- Alias subcommand functions -----
-Usage: Usage: alias [name] | [<name> <value>]
- Where:
- name - name of the alias being looked up, added, or replaced
- value - what the alias will be resolved to (if adding or replacing)
- this can contain spaces and does not need to be quoted
+ def alias_create(self, args: argparse.Namespace):
+ """ Creates or overwrites an alias """
- Without arguments, 'alias' prints a list of all aliases in a reusable form which
- can be outputted to a startup_script to preserve aliases across sessions.
+ # Validate the alias name
+ args.name = utils.strip_quotes(args.name)
+ valid, errmsg = self.statement_parser.is_valid_command(args.name)
+ if not valid:
+ errmsg = "Invalid alias name: {}".format(errmsg)
+ self.perror(errmsg, traceback_war=False)
+ return
- With one argument, 'alias' shows the value of the specified alias.
- Example: alias ls (Prints the value of the alias called 'ls' if it exists)
+ if args.name in self.macros:
+ errmsg = "Alias cannot have the same name as a macro"
+ self.perror(errmsg, traceback_war=False)
+ return
- With two or more arguments, 'alias' creates or replaces an alias.
+ utils.unquote_redirection_tokens(args.command_args)
- Example: alias ls !ls -lF
+ # Build the alias value string
+ value = args.command
+ if args.command_args:
+ value += ' ' + ' '.join(args.command_args)
- If you want to use redirection or pipes in the alias, then quote them to prevent
- the alias command itself from being redirected
+ # Set the alias
+ result = "overwritten" if args.name in self.aliases else "created"
+ self.aliases[args.name] = value
+ self.poutput("Alias '{}' {}".format(args.name, result))
- Examples:
- alias save_results print_results ">" out.txt
- alias save_results print_results '>' out.txt
-"""
- # Get alias arguments as a list with quotes preserved
- alias_arg_list = statement.arg_list
+ def alias_delete(self, args: argparse.Namespace):
+ """ Deletes aliases """
+ if args.all:
+ self.aliases.clear()
+ self.poutput("All aliases deleted")
+ elif not args.name:
+ self.do_help('alias delete')
+ else:
+ # Get rid of duplicates and strip quotes since the argparse decorator for do_alias() preserves them
+ aliases_to_delete = [utils.strip_quotes(cur_name) for cur_name in utils.remove_duplicates(args.name)]
+
+ for cur_name in aliases_to_delete:
+ if cur_name in self.aliases:
+ del self.aliases[cur_name]
+ self.poutput("Alias '{}' deleted".format(cur_name))
+ else:
+ self.perror("Alias '{}' does not exist".format(cur_name), traceback_war=False)
+
+ def alias_list(self, args: argparse.Namespace):
+ """ Lists some or all aliases """
+ if args.name:
+ # Get rid of duplicates and strip quotes since the argparse decorator for do_alias() preserves them
+ names_to_view = [utils.strip_quotes(cur_name) for cur_name in utils.remove_duplicates(args.name)]
+
+ for cur_name in names_to_view:
+ if cur_name in self.aliases:
+ self.poutput("alias create {} {}".format(cur_name, self.aliases[cur_name]))
+ else:
+ self.perror("Alias '{}' not found".format(cur_name), traceback_war=False)
+ else:
+ sorted_aliases = utils.alphabetical_sort(self.aliases)
+ for cur_alias in sorted_aliases:
+ self.poutput("alias create {} {}".format(cur_alias, self.aliases[cur_alias]))
+
+ # Top-level parser for alias
+ alias_description = ("Manage aliases\n"
+ "\n"
+ "An alias is a command that enables replacement of a word by another string.")
+ alias_epilog = ("See also:\n"
+ " macro")
+ alias_parser = ACArgumentParser(description=alias_description, epilog=alias_epilog, prog='alias')
+
+ # Add subcommands to alias
+ alias_subparsers = alias_parser.add_subparsers()
+
+ # alias -> create
+ alias_create_help = "create or overwrite an alias"
+ alias_create_description = "Create or overwrite an alias"
+
+ alias_create_epilog = ("Notes:\n"
+ " If you want to use redirection or pipes in the alias, then quote them to\n"
+ " prevent the 'alias create' command from being redirected.\n"
+ "\n"
+ " Since aliases are resolved during parsing, tab completion will function as it\n"
+ " would for the actual command the alias resolves to.\n"
+ "\n"
+ "Examples:\n"
+ " alias ls !ls -lF\n"
+ " alias create show_log !cat \"log file.txt\"\n"
+ " alias create save_results print_results \">\" out.txt\n")
+
+ alias_create_parser = alias_subparsers.add_parser('create', help=alias_create_help,
+ description=alias_create_description,
+ epilog=alias_create_epilog)
+ alias_create_parser.add_argument('name', help='name of this alias')
+ setattr(alias_create_parser.add_argument('command', help='what the alias resolves to'),
+ ACTION_ARG_CHOICES, get_commands_aliases_and_macros_for_completion)
+ setattr(alias_create_parser.add_argument('command_args', nargs=argparse.REMAINDER,
+ help='arguments to pass to command'),
+ ACTION_ARG_CHOICES, ('path_complete',))
+ alias_create_parser.set_defaults(func=alias_create)
+
+ # alias -> delete
+ alias_delete_help = "delete aliases"
+ alias_delete_description = "Delete specified aliases or all aliases if --all is used"
+ alias_delete_parser = alias_subparsers.add_parser('delete', help=alias_delete_help,
+ description=alias_delete_description)
+ setattr(alias_delete_parser.add_argument('name', nargs='*', help='alias to delete'),
+ ACTION_ARG_CHOICES, get_alias_names)
+ alias_delete_parser.add_argument('-a', '--all', action='store_true', help="delete all aliases")
+ alias_delete_parser.set_defaults(func=alias_delete)
+
+ # alias -> list
+ alias_list_help = "list aliases"
+ alias_list_description = ("List specified aliases in a reusable form that can be saved to a startup script\n"
+ "to preserve aliases across sessions\n"
+ "\n"
+ "Without arguments, all aliases will be listed.")
+
+ alias_list_parser = alias_subparsers.add_parser('list', help=alias_list_help,
+ description=alias_list_description)
+ setattr(alias_list_parser.add_argument('name', nargs="*", help='alias to list'),
+ ACTION_ARG_CHOICES, get_alias_names)
+ alias_list_parser.set_defaults(func=alias_list)
+
+ # Preserve quotes since we are passing strings to other commands
+ @with_argparser(alias_parser, preserve_quotes=True)
+ def do_alias(self, args: argparse.Namespace):
+ """Manage aliases"""
+ func = getattr(args, 'func', None)
+ if func is not None:
+ # Call whatever subcommand function was selected
+ func(self, args)
+ else:
+ # No subcommand was provided, so call help
+ self.do_help('alias')
+
+ # ----- Macro subcommand functions -----
+
+ def macro_create(self, args: argparse.Namespace):
+ """ Creates or overwrites a macro """
- # If no args were given, then print a list of current aliases
- if not alias_arg_list:
- for cur_alias in self.aliases:
- self.poutput("alias {} {}".format(cur_alias, self.aliases[cur_alias]))
+ # Validate the macro name
+ args.name = utils.strip_quotes(args.name)
+ valid, errmsg = self.statement_parser.is_valid_command(args.name)
+ if not valid:
+ errmsg = "Invalid macro name: {}".format(errmsg)
+ self.perror(errmsg, traceback_war=False)
return
- # Get the alias name
- name = alias_arg_list[0]
+ if args.name in self.get_all_commands():
+ errmsg = "Macro cannot have the same name as a command"
+ self.perror(errmsg, traceback_war=False)
+ return
- # The user is looking up an alias
- if len(alias_arg_list) == 1:
- if name in self.aliases:
- self.poutput("alias {} {}".format(name, self.aliases[name]))
- else:
- self.perror("Alias {!r} not found".format(name), traceback_war=False)
+ if args.name in self.aliases:
+ errmsg = "Macro cannot have the same name as an alias"
+ self.perror(errmsg, traceback_war=False)
+ return
- # The user is creating an alias
- else:
- # Unquote redirection and pipes
- index = 1
- while index < len(alias_arg_list):
- unquoted_arg = utils.strip_quotes(alias_arg_list[index])
- if unquoted_arg in constants.REDIRECTION_TOKENS:
- alias_arg_list[index] = unquoted_arg
- index += 1
-
- # Build the alias value string
- value = ' '.join(alias_arg_list[1:])
-
- # Validate the alias to ensure it doesn't include weird characters
- # like terminators, output redirection, or whitespace
- valid, invalidchars = self.statement_parser.is_valid_command(name)
- if valid:
- # Set the alias
- self.aliases[name] = value
- self.poutput("Alias {!r} created".format(name))
- else:
- errmsg = "Aliases can not contain: {}".format(invalidchars)
- self.perror(errmsg, traceback_war=False)
+ utils.unquote_redirection_tokens(args.command_args)
- def complete_alias(self, text: str, line: str, begidx: int, endidx: int) -> List[str]:
- """ Tab completion for alias """
- alias_names = set(self.aliases.keys())
- visible_commands = set(self.get_visible_commands())
+ # Build the macro value string
+ value = args.command
+ if args.command_args:
+ value += ' ' + ' '.join(args.command_args)
- index_dict = \
- {
- 1: alias_names,
- 2: list(alias_names | visible_commands)
- }
- return self.index_based_complete(text, line, begidx, endidx, index_dict, self.path_complete)
+ # Find all normal arguments
+ arg_list = []
+ normal_matches = re.finditer(MacroArg.macro_normal_arg_pattern, value)
+ max_arg_num = 0
+ arg_nums = set()
- @with_argument_list
- def do_unalias(self, arglist: List[str]) -> None:
- """Unsets aliases
+ while True:
+ try:
+ cur_match = normal_matches.__next__()
-Usage: Usage: unalias [-a] name [name ...]
- Where:
- name - name of the alias being unset
+ # Get the number string between the braces
+ cur_num_str = (re.findall(MacroArg.digit_pattern, cur_match.group())[0])
+ cur_num = int(cur_num_str)
+ if cur_num < 1:
+ self.perror("Argument numbers must be greater than 0", traceback_war=False)
+ return
- Options:
- -a remove all alias definitions
-"""
- if not arglist:
- self.do_help(['unalias'])
+ arg_nums.add(cur_num)
+ if cur_num > max_arg_num:
+ max_arg_num = cur_num
- if '-a' in arglist:
- self.aliases.clear()
- self.poutput("All aliases cleared")
+ arg_list.append(MacroArg(start_index=cur_match.start(), number_str=cur_num_str, is_escaped=False))
+
+ except StopIteration:
+ break
+
+ # Make sure the argument numbers are continuous
+ if len(arg_nums) != max_arg_num:
+ self.perror("Not all numbers between 1 and {} are present "
+ "in the argument placeholders".format(max_arg_num), traceback_war=False)
+ return
+
+ # Find all escaped arguments
+ escaped_matches = re.finditer(MacroArg.macro_escaped_arg_pattern, value)
+
+ while True:
+ try:
+ cur_match = escaped_matches.__next__()
+ # Get the number string between the braces
+ cur_num_str = re.findall(MacroArg.digit_pattern, cur_match.group())[0]
+
+ arg_list.append(MacroArg(start_index=cur_match.start(), number_str=cur_num_str, is_escaped=True))
+ except StopIteration:
+ break
+
+ # Set the macro
+ result = "overwritten" if args.name in self.macros else "created"
+ self.macros[args.name] = Macro(name=args.name, value=value, required_arg_count=max_arg_num, arg_list=arg_list)
+ self.poutput("Macro '{}' {}".format(args.name, result))
+
+ def macro_delete(self, args: argparse.Namespace):
+ """ Deletes macros """
+ if args.all:
+ self.macros.clear()
+ self.poutput("All macros deleted")
+ elif not args.name:
+ self.do_help('macro delete')
else:
- # Get rid of duplicates
- arglist = utils.remove_duplicates(arglist)
+ # Get rid of duplicates and strip quotes since the argparse decorator for do_macro() preserves them
+ macros_to_delete = [utils.strip_quotes(cur_name) for cur_name in utils.remove_duplicates(args.name)]
+
+ for cur_name in macros_to_delete:
+ if cur_name in self.macros:
+ del self.macros[cur_name]
+ self.poutput("Macro '{}' deleted".format(cur_name))
+ else:
+ self.perror("Macro '{}' does not exist".format(cur_name), traceback_war=False)
+
+ def macro_list(self, args: argparse.Namespace):
+ """ Lists some or all macros """
+ if args.name:
+ # Get rid of duplicates and strip quotes since the argparse decorator for do_macro() preserves them
+ names_to_view = [utils.strip_quotes(cur_name) for cur_name in utils.remove_duplicates(args.name)]
- for cur_arg in arglist:
- if cur_arg in self.aliases:
- del self.aliases[cur_arg]
- self.poutput("Alias {!r} cleared".format(cur_arg))
+ for cur_name in names_to_view:
+ if cur_name in self.macros:
+ self.poutput("macro create {} {}".format(cur_name, self.macros[cur_name].value))
else:
- self.perror("Alias {!r} does not exist".format(cur_arg), traceback_war=False)
-
- def complete_unalias(self, text: str, line: str, begidx: int, endidx: int) -> List[str]:
- """ Tab completion for unalias """
- return self.basic_complete(text, line, begidx, endidx, self.aliases)
-
- @with_argument_list
- def do_help(self, arglist: List[str]) -> None:
- """ List available commands with "help" or detailed help with "help cmd" """
- if not arglist or (len(arglist) == 1 and arglist[0] in ('--verbose', '-v')):
- verbose = len(arglist) == 1 and arglist[0] in ('--verbose', '-v')
- self._help_menu(verbose)
+ self.perror("Macro '{}' not found".format(cur_name), traceback_war=False)
+ else:
+ sorted_macros = utils.alphabetical_sort(self.macros)
+ for cur_macro in sorted_macros:
+ self.poutput("macro create {} {}".format(cur_macro, self.macros[cur_macro].value))
+
+ # Top-level parser for macro
+ macro_description = ("Manage macros\n"
+ "\n"
+ "A macro is similar to an alias, but it can take arguments when called.")
+ macro_epilog = ("See also:\n"
+ " alias")
+ macro_parser = ACArgumentParser(description=macro_description, epilog=macro_epilog, prog='macro')
+
+ # Add subcommands to macro
+ macro_subparsers = macro_parser.add_subparsers()
+
+ # macro -> create
+ macro_create_help = "create or overwrite a macro"
+ macro_create_description = "Create or overwrite a macro"
+
+ macro_create_epilog = ("A macro is similar to an alias, but it can take arguments when called.\n"
+ "Arguments are expressed when creating a macro using {#} notation where {1}\n"
+ "means the first argument.\n"
+ "\n"
+ "The following creates a macro called my_macro that expects two arguments:\n"
+ "\n"
+ " macro create my_macro make_dinner -meat {1} -veggie {2}\n"
+ "\n"
+ "When the macro is called, the provided arguments are resolved and the assembled\n"
+ "command is run. For example:\n"
+ "\n"
+ " my_macro beef broccoli ---> make_dinner -meat beef -veggie broccoli\n"
+ "\n"
+ "Notes:\n"
+ " To use the literal string {1} in your command, escape it this way: {{1}}.\n"
+ "\n"
+ " An argument number can be repeated in a macro. In the following example the\n"
+ " first argument will populate both {1} instances.\n"
+ "\n"
+ " macro create ft file_taxes -p {1} -q {2} -r {1}\n"
+ "\n"
+ " To quote an argument in the resolved command, quote it during creation.\n"
+ "\n"
+ " macro create backup !cp \"{1}\" \"{1}.orig\"\n"
+ "\n"
+ " Be careful! Since macros can resolve into commands, aliases, and macros,\n"
+ " it is possible to create a macro that results in infinite recursion.\n"
+ "\n"
+ " If you want to use redirection or pipes in the macro, then quote them as in\n"
+ " this example to prevent the 'macro create' command from being redirected.\n"
+ "\n"
+ " macro create show_results print_results -type {1} \"|\" less\n"
+ "\n"
+ " Because macros do not resolve until after parsing (hitting Enter), tab\n"
+ " completion will only complete paths.")
+
+ macro_create_parser = macro_subparsers.add_parser('create', help=macro_create_help,
+ description=macro_create_description,
+ epilog=macro_create_epilog)
+ macro_create_parser.add_argument('name', help='name of this macro')
+ setattr(macro_create_parser.add_argument('command', help='what the macro resolves to'),
+ ACTION_ARG_CHOICES, get_commands_aliases_and_macros_for_completion)
+ setattr(macro_create_parser.add_argument('command_args', nargs=argparse.REMAINDER,
+ help='arguments to pass to command'),
+ ACTION_ARG_CHOICES, ('path_complete',))
+ macro_create_parser.set_defaults(func=macro_create)
+
+ # macro -> delete
+ macro_delete_help = "delete macros"
+ macro_delete_description = "Delete specified macros or all macros if --all is used"
+ macro_delete_parser = macro_subparsers.add_parser('delete', help=macro_delete_help,
+ description=macro_delete_description)
+ setattr(macro_delete_parser.add_argument('name', nargs='*', help='macro to delete'),
+ ACTION_ARG_CHOICES, get_macro_names)
+ macro_delete_parser.add_argument('-a', '--all', action='store_true', help="delete all macros")
+ macro_delete_parser.set_defaults(func=macro_delete)
+
+ # macro -> list
+ macro_list_help = "list macros"
+ macro_list_description = ("List specified macros in a reusable form that can be saved to a startup script\n"
+ "to preserve macros across sessions\n"
+ "\n"
+ "Without arguments, all macros will be listed.")
+
+ macro_list_parser = macro_subparsers.add_parser('list', help=macro_list_help, description=macro_list_description)
+ setattr(macro_list_parser.add_argument('name', nargs="*", help='macro to list'),
+ ACTION_ARG_CHOICES, get_macro_names)
+ macro_list_parser.set_defaults(func=macro_list)
+
+ # Preserve quotes since we are passing strings to other commands
+ @with_argparser(macro_parser, preserve_quotes=True)
+ def do_macro(self, args: argparse.Namespace):
+ """Manage macros"""
+ func = getattr(args, 'func', None)
+ if func is not None:
+ # Call whatever subcommand function was selected
+ func(self, args)
+ else:
+ # No subcommand was provided, so call help
+ self.do_help('macro')
+
+ def complete_help_command(self, text: str, line: str, begidx: int, endidx: int) -> List[str]:
+ """Completes the command argument of help"""
+
+ # Complete token against topics and visible commands
+ topics = set(self.get_help_topics())
+ visible_commands = set(self.get_visible_commands())
+ strs_to_match = list(topics | visible_commands)
+ return self.basic_complete(text, line, begidx, endidx, strs_to_match)
+
+ def complete_help_subcommand(self, text: str, line: str, begidx: int, endidx: int) -> List[str]:
+ """Completes the subcommand argument of help"""
+
+ # Get all tokens through the one being completed
+ tokens, _ = self.tokens_for_completion(line, begidx, endidx)
+
+ if not tokens:
+ return []
+
+ # Must have at least 3 args for 'help command subcommand'
+ if len(tokens) < 3:
+ return []
+
+ # Find where the command is by skipping past any flags
+ cmd_index = 1
+ for cur_token in tokens[cmd_index:]:
+ if not cur_token.startswith('-'):
+ break
+ cmd_index += 1
+
+ if cmd_index >= len(tokens):
+ return []
+
+ command = tokens[cmd_index]
+ matches = []
+
+ # Check if this is a command with an argparse function
+ func = self.cmd_func(command)
+ if func and hasattr(func, 'argparser'):
+ completer = AutoCompleter(getattr(func, 'argparser'), cmd2_app=self)
+ matches = completer.complete_command_help(tokens[cmd_index:], text, line, begidx, endidx)
+
+ return matches
+
+ help_parser = ACArgumentParser()
+
+ setattr(help_parser.add_argument('command', help="command to retrieve help for", nargs="?"),
+ ACTION_ARG_CHOICES, ('complete_help_command',))
+ setattr(help_parser.add_argument('subcommand', help="subcommand to retrieve help for",
+ nargs=argparse.REMAINDER),
+ ACTION_ARG_CHOICES, ('complete_help_subcommand',))
+ help_parser.add_argument('-v', '--verbose', action='store_true',
+ help="print a list of all commands with descriptions of each")
+
+ @with_argparser(help_parser)
+ def do_help(self, args: argparse.Namespace) -> None:
+ """List available commands or provide detailed help for a specific command"""
+ if not args.command or args.verbose:
+ self._help_menu(args.verbose)
+
else:
# Getting help for a specific command
- funcname = self._func_named(arglist[0])
- if funcname:
- # Check to see if this function was decorated with an argparse ArgumentParser
- func = getattr(self, funcname)
- if hasattr(func, 'argparser'):
- # Function has an argparser, so get help based on all the arguments in case there are sub-commands
- new_arglist = arglist[1:]
- new_arglist.append('-h')
-
- # Temporarily redirect all argparse output to both sys.stdout and sys.stderr to self.stdout
- with redirect_stdout(self.stdout):
- with redirect_stderr(self.stdout):
- func(new_arglist)
- else:
- # No special behavior needed, delegate to cmd base class do_help()
- cmd.Cmd.do_help(self, funcname[3:])
+ func = self.cmd_func(args.command)
+ if func and hasattr(func, 'argparser'):
+ completer = AutoCompleter(getattr(func, 'argparser'), cmd2_app=self)
+ tokens = [args.command] + args.subcommand
+ self.poutput(completer.format_help(tokens))
else:
- # This could be a help topic
- cmd.Cmd.do_help(self, arglist[0])
+ # No special behavior needed, delegate to cmd base class do_help()
+ super().do_help(args.command)
def _help_menu(self, verbose: bool=False) -> None:
"""Show a list of commands which help can be displayed for.
@@ -2375,11 +2639,12 @@ Usage: Usage: unalias [-a] name [name ...]
cmds_cats = {}
for command in visible_commands:
- if command in help_topics or getattr(self, self._func_named(command)).__doc__:
+ func = self.cmd_func(command)
+ if command in help_topics or func.__doc__:
if command in help_topics:
help_topics.remove(command)
- if hasattr(getattr(self, self._func_named(command)), HELP_CATEGORY):
- category = getattr(getattr(self, self._func_named(command)), HELP_CATEGORY)
+ if hasattr(func, HELP_CATEGORY):
+ category = getattr(func, HELP_CATEGORY)
cmds_cats.setdefault(category, [])
cmds_cats[category].append(command)
else:
@@ -2432,12 +2697,13 @@ Usage: Usage: unalias [-a] name [name ...]
func = getattr(self, 'help_' + command)
except AttributeError:
# Couldn't find a help function
+ func = self.cmd_func(command)
try:
# Now see if help_summary has been set
- doc = getattr(self, self._func_named(command)).help_summary
+ doc = func.help_summary
except AttributeError:
# Last, try to directly access the function's doc-string
- doc = getattr(self, self._func_named(command)).__doc__
+ doc = func.__doc__
else:
# we found the help function
result = io.StringIO()
@@ -2458,13 +2724,17 @@ Usage: Usage: unalias [-a] name [name ...]
doc_block = []
found_first = False
for doc_line in doc.splitlines():
- str(doc_line).strip()
- if len(doc_line.strip()) > 0:
- doc_block.append(doc_line.strip())
- found_first = True
- else:
+ stripped_line = doc_line.strip()
+
+ # Don't include :param type lines
+ if stripped_line.startswith(':'):
if found_first:
break
+ elif stripped_line:
+ doc_block.append(stripped_line)
+ found_first = True
+ elif found_first:
+ break
for doc_line in doc_block:
self.stdout.write('{: <{col_width}}{doc}\n'.format(command,
@@ -2473,18 +2743,21 @@ Usage: Usage: unalias [-a] name [name ...]
command = ''
self.stdout.write("\n")
- def do_shortcuts(self, _: str) -> None:
- """Lists shortcuts available"""
+ @with_argparser(ACArgumentParser())
+ def do_shortcuts(self, _: argparse.Namespace) -> None:
+ """List available shortcuts"""
result = "\n".join('%s: %s' % (sc[0], sc[1]) for sc in sorted(self.shortcuts))
self.poutput("Shortcuts for other commands:\n{}\n".format(result))
- def do_eof(self, _: str) -> bool:
+ @with_argparser(ACArgumentParser(epilog=INTERNAL_COMMAND_EPILOG))
+ def do_eof(self, _: argparse.Namespace) -> bool:
"""Called when <Ctrl>-D is pressed"""
# End of script should not exit app, but <Ctrl>-D should.
return self._STOP_AND_EXIT
- def do_quit(self, _: str) -> bool:
- """Exits this application"""
+ @with_argparser(ACArgumentParser())
+ def do_quit(self, _: argparse.Namespace) -> bool:
+ """Exit this application"""
self._should_quit = True
return self._STOP_AND_EXIT
@@ -2514,7 +2787,8 @@ Usage: Usage: unalias [-a] name [name ...]
for (idx, (_, text)) in enumerate(fulloptions):
self.poutput(' %2d. %s\n' % (idx + 1, text))
while True:
- response = input(prompt)
+ safe_prompt = rl_make_safe_prompt(prompt)
+ response = input(safe_prompt)
if rl_type != RlType.NONE:
hlen = readline.get_current_history_length()
@@ -2541,22 +2815,21 @@ Usage: Usage: unalias [-a] name [name ...]
Output redirection and pipes allowed: {}"""
return read_only_settings.format(str(self.terminators), self.allow_cli_args, self.allow_redirection)
- def show(self, args: argparse.Namespace, parameter: str) -> None:
+ def show(self, args: argparse.Namespace, parameter: str='') -> None:
"""Shows current settings of parameters.
:param args: argparse parsed arguments from the set command
- :param parameter:
- :return:
+ :param parameter: optional search parameter
"""
- param = ''
- if parameter:
- param = parameter.strip().lower()
+ param = parameter.strip().lower()
result = {}
maxlen = 0
+
for p in self.settable:
if (not param) or p.startswith(param):
- result[p] = '%s: %s' % (p, str(getattr(self, p)))
+ result[p] = '{}: {}'.format(p, str(getattr(self, p)))
maxlen = max(maxlen, len(result[p]))
+
if result:
for p in sorted(result):
if args.long:
@@ -2568,58 +2841,69 @@ Usage: Usage: unalias [-a] name [name ...]
if args.all:
self.poutput('\nRead only settings:{}'.format(self.cmdenvironment()))
else:
- raise LookupError("Parameter '%s' not supported (type 'set' for list of parameters)." % param)
+ raise LookupError("Parameter '{}' not supported (type 'set' for list of parameters).".format(param))
- set_description = "Sets a settable parameter or shows current settings of parameters.\n"
- set_description += "\n"
- set_description += "Accepts abbreviated parameter names so long as there is no ambiguity.\n"
- set_description += "Call without arguments for a list of settable parameters with their values."
+ set_description = ("Set a settable parameter or show current settings of parameters\n"
+ "\n"
+ "Accepts abbreviated parameter names so long as there is no ambiguity.\n"
+ "Call without arguments for a list of settable parameters with their values.")
set_parser = ACArgumentParser(description=set_description)
set_parser.add_argument('-a', '--all', action='store_true', help='display read-only settings as well')
set_parser.add_argument('-l', '--long', action='store_true', help='describe function of parameter')
- set_parser.add_argument('settable', nargs=(0, 2), help='[param_name] [value]')
+ setattr(set_parser.add_argument('param', nargs='?', help='parameter to set or view'),
+ ACTION_ARG_CHOICES, settable)
+ set_parser.add_argument('value', nargs='?', help='the new value for settable')
@with_argparser(set_parser)
def do_set(self, args: argparse.Namespace) -> None:
- """Sets a settable parameter or shows current settings of parameters"""
- try:
- param_name, val = args.settable
- val = val.strip()
- param_name = param_name.strip().lower()
- if param_name not in self.settable:
- hits = [p for p in self.settable if p.startswith(param_name)]
- if len(hits) == 1:
- param_name = hits[0]
- else:
- return self.show(args, param_name)
- current_val = getattr(self, param_name)
- if (val[0] == val[-1]) and val[0] in ("'", '"'):
- val = val[1:-1]
+ """Set a settable parameter or show current settings of parameters"""
+
+ # Check if param was passed in
+ if not args.param:
+ return self.show(args)
+ param = args.param.strip().lower()
+
+ # Check if value was passed in
+ if not args.value:
+ return self.show(args, param)
+ value = args.value
+
+ # Check if param points to just one settable
+ if param not in self.settable:
+ hits = [p for p in self.settable if p.startswith(param)]
+ if len(hits) == 1:
+ param = hits[0]
else:
- val = utils.cast(current_val, val)
- setattr(self, param_name, val)
- self.poutput('%s - was: %s\nnow: %s\n' % (param_name, current_val, val))
- if current_val != val:
- try:
- onchange_hook = getattr(self, '_onchange_%s' % param_name)
- onchange_hook(old=current_val, new=val)
- except AttributeError:
- pass
- except (ValueError, AttributeError):
- param = ''
- if args.settable:
- param = args.settable[0]
- self.show(args, param)
-
- def do_shell(self, statement: Statement) -> None:
- """Execute a command as if at the OS prompt
-
- Usage: shell <command> [arguments]"""
+ return self.show(args, param)
+
+ # Update the settable's value
+ current_value = getattr(self, param)
+ value = utils.cast(current_value, value)
+ setattr(self, param, value)
+
+ self.poutput('{} - was: {}\nnow: {}\n'.format(param, current_value, value))
+
+ # See if we need to call a change hook for this settable
+ if current_value != value:
+ onchange_hook = getattr(self, '_onchange_{}'.format(param), None)
+ if onchange_hook is not None:
+ onchange_hook(old=current_value, new=value)
+
+ shell_parser = ACArgumentParser()
+ setattr(shell_parser.add_argument('command', help='the command to run'),
+ ACTION_ARG_CHOICES, ('shell_cmd_complete',))
+ setattr(shell_parser.add_argument('command_args', nargs=argparse.REMAINDER,
+ help='arguments to pass to command'),
+ ACTION_ARG_CHOICES, ('path_complete',))
+
+ @with_argparser(shell_parser, preserve_quotes=True)
+ def do_shell(self, args: argparse.Namespace) -> None:
+ """Execute a command as if at the OS prompt"""
import subprocess
- # Get list of arguments to shell with quotes preserved
- tokens = statement.arg_list
+ # Create a list of arguments to shell
+ tokens = [args.command] + args.command_args
# Support expanding ~ in quoted paths
for index, _ in enumerate(tokens):
@@ -2640,18 +2924,6 @@ Usage: Usage: unalias [-a] name [name ...]
proc = subprocess.Popen(expanded_command, stdout=self.stdout, shell=True)
proc.communicate()
- def complete_shell(self, text: str, line: str, begidx: int, endidx: int) -> List[str]:
- """Handles tab completion of executable commands and local file system paths for the shell command
-
- :param text: the string prefix we are attempting to match (all returned matches must begin with it)
- :param line: the current input line with leading whitespace removed
- :param begidx: the beginning index of the prefix text
- :param endidx: the ending index of the prefix text
- :return: a list of possible tab completions
- """
- index_dict = {1: self.shell_cmd_complete}
- return self.index_based_complete(text, line, begidx, endidx, index_dict, self.path_complete)
-
@staticmethod
def _reset_py_display() -> None:
"""
@@ -2676,37 +2948,35 @@ Usage: Usage: unalias [-a] name [name ...]
sys.displayhook = sys.__displayhook__
sys.excepthook = sys.__excepthook__
- def do_py(self, arg: str) -> bool:
- """
- Invoke python command, shell, or script
+ py_parser = ACArgumentParser()
+ py_parser.add_argument('command', help="command to run", nargs='?')
+ py_parser.add_argument('remainder', help="remainder of command", nargs=argparse.REMAINDER)
- py <command>: Executes a Python command.
- py: Enters interactive Python mode.
- End with ``Ctrl-D`` (Unix) / ``Ctrl-Z`` (Windows), ``quit()``, '`exit()``.
- Non-python commands can be issued with ``pyscript_name("your command")``.
- Run python code from external script files with ``run("script.py")``
- """
- from .pyscript_bridge import PyscriptBridge
+ @with_argparser(py_parser)
+ def do_py(self, args: argparse.Namespace) -> bool:
+ """Invoke Python command or shell"""
+ from .pyscript_bridge import PyscriptBridge, CommandResult
if self._in_py:
- self.perror("Recursively entering interactive Python consoles is not allowed.", traceback_war=False)
+ err = "Recursively entering interactive Python consoles is not allowed."
+ self.perror(err, traceback_war=False)
+ self._last_result = CommandResult('', err)
return False
self._in_py = True
# noinspection PyBroadException
try:
- arg = arg.strip()
-
# Support the run command even if called prior to invoking an interactive interpreter
- def run(filename):
+ def run(filename: str):
"""Run a Python script file in the interactive console.
- :param filename: str - filename of *.py script file to run
+ :param filename: filename of *.py script file to run
"""
+ expanded_filename = os.path.expanduser(filename)
try:
- with open(filename) as f:
+ with open(expanded_filename) as f:
interp.runcode(f.read())
except OSError as ex:
- error_msg = "Error opening script file '{}': {}".format(filename, ex)
+ error_msg = "Error opening script file '{}': {}".format(expanded_filename, ex)
self.perror(error_msg, traceback_war=False)
bridge = PyscriptBridge(self)
@@ -2721,8 +2991,12 @@ Usage: Usage: unalias [-a] name [name ...]
interp = InteractiveConsole(locals=localvars)
interp.runcode('import sys, os;sys.path.insert(0, os.getcwd())')
- if arg:
- interp.runcode(arg)
+ if args.command:
+ full_command = utils.quote_string_if_needed(args.command)
+ for cur_token in args.remainder:
+ full_command += ' ' + utils.quote_string_if_needed(cur_token)
+
+ interp.runcode(full_command)
# If there are no args, then we will open an interactive Python console
else:
@@ -2787,11 +3061,14 @@ Usage: Usage: unalias [-a] name [name ...]
sys.stdin = self.stdin
cprt = 'Type "help", "copyright", "credits" or "license" for more information.'
- docstr = self.do_py.__doc__.replace('pyscript_name', self.pyscript_name)
+ instructions = ('End with `Ctrl-D` (Unix) / `Ctrl-Z` (Windows), `quit()`, `exit()`.\n'
+ 'Non-Python commands can be issued with: {}("your command")\n'
+ 'Run Python code from external script files with: run("script.py")'
+ .format(self.pyscript_name))
try:
- interp.interact(banner="Python {} on {}\n{}\n({})\n{}".
- format(sys.version, sys.platform, cprt, self.__class__.__name__, docstr))
+ interp.interact(banner="Python {} on {}\n{}\n\n{}\n".
+ format(sys.version, sys.platform, cprt, instructions))
except EmbeddedConsoleExit:
pass
@@ -2832,30 +3109,22 @@ Usage: Usage: unalias [-a] name [name ...]
self._in_py = False
return self._should_quit
- @with_argument_list
- def do_pyscript(self, arglist: List[str]) -> None:
- """\nRuns a python script file inside the console
-
- Usage: pyscript <script_path> [script_arguments]
+ pyscript_parser = ACArgumentParser()
+ setattr(pyscript_parser.add_argument('script_path', help='path to the script file'),
+ ACTION_ARG_CHOICES, ('path_complete',))
+ pyscript_parser.add_argument('script_arguments', nargs=argparse.REMAINDER,
+ help='arguments to pass to script')
-Console commands can be executed inside this script with cmd("your command")
-However, you cannot run nested "py" or "pyscript" commands from within this script
-Paths or arguments that contain spaces must be enclosed in quotes
-"""
- if not arglist:
- self.perror("pyscript command requires at least 1 argument ...", traceback_war=False)
- self.do_help(['pyscript'])
- return
-
- # Get the absolute path of the script
- script_path = os.path.expanduser(arglist[0])
+ @with_argparser(pyscript_parser)
+ def do_pyscript(self, args: argparse.Namespace) -> None:
+ """Run a Python script file inside the console"""
+ script_path = os.path.expanduser(args.script_path)
# Save current command line arguments
orig_args = sys.argv
# Overwrite sys.argv to allow the script to take command line arguments
- sys.argv = [script_path]
- sys.argv.extend(arglist[1:])
+ sys.argv = [script_path] + args.script_arguments
# Run the script - use repr formatting to escape things which need to be escaped to prevent issues on Windows
self.do_py("run({!r})".format(script_path))
@@ -2863,33 +3132,24 @@ Paths or arguments that contain spaces must be enclosed in quotes
# Restore command line arguments to original state
sys.argv = orig_args
- def complete_pyscript(self, text: str, line: str, begidx: int, endidx: int) -> List[str]:
- """Enable tab-completion for pyscript command."""
- index_dict = {1: self.path_complete}
- return self.index_based_complete(text, line, begidx, endidx, index_dict)
-
# Only include the do_ipy() method if IPython is available on the system
- if ipython_available:
- # noinspection PyMethodMayBeStatic,PyUnusedLocal
- def do_ipy(self, arg: str) -> None:
- """Enters an interactive IPython shell.
-
- Run python code from external files with ``run filename.py``
- End with ``Ctrl-D`` (Unix) / ``Ctrl-Z`` (Windows), ``quit()``, '`exit()``.
- """
+ if ipython_available: # pragma: no cover
+ @with_argparser(ACArgumentParser())
+ def do_ipy(self, _: argparse.Namespace) -> None:
+ """Enter an interactive IPython shell"""
from .pyscript_bridge import PyscriptBridge
bridge = PyscriptBridge(self)
+ banner = ('Entering an embedded IPython shell. Type quit or <Ctrl>-d to exit.\n'
+ 'Run Python code from external files with: run filename.py\n')
+ exit_msg = 'Leaving IPython, back to {}'.format(sys.argv[0])
+
if self.locals_in_py:
def load_ipy(self, app):
- banner = 'Entering an embedded IPython shell type quit() or <Ctrl>-d to exit ...'
- exit_msg = 'Leaving IPython, back to {}'.format(sys.argv[0])
embed(banner1=banner, exit_msg=exit_msg)
load_ipy(self, bridge)
else:
def load_ipy(app):
- banner = 'Entering an embedded IPython shell type quit() or <Ctrl>-d to exit ...'
- exit_msg = 'Leaving IPython, back to {}'.format(sys.argv[0])
embed(banner1=banner, exit_msg=exit_msg)
load_ipy(bridge)
@@ -2898,10 +3158,10 @@ Paths or arguments that contain spaces must be enclosed in quotes
history_parser_group.add_argument('-r', '--run', action='store_true', help='run selected history items')
history_parser_group.add_argument('-e', '--edit', action='store_true',
help='edit and then run selected history items')
- history_parser_group.add_argument('-s', '--script', action='store_true', help='script format; no separation lines')
+ history_parser_group.add_argument('-s', '--script', action='store_true', help='output commands in script format')
history_parser_group.add_argument('-o', '--output-file', metavar='FILE', help='output commands to a script file')
history_parser_group.add_argument('-t', '--transcript', help='output commands and results to a transcript file')
- history_parser_group.add_argument('-c', '--clear', action="store_true", help='clears all history')
+ history_parser_group.add_argument('-c', '--clear', action="store_true", help='clear all history')
_history_arg_help = """empty all history items
a one history item by number
a..b, a:b, a:, ..b items by indices (inclusive)
@@ -3031,7 +3291,7 @@ a..b, a:b, a:, ..b items by indices (inclusive)
# get the output out of the buffer
output = membuf.read()
# and add the regex-escaped output to the transcript
- transcript += output.replace('/', '\/')
+ transcript += output.replace('/', r'\/')
# Restore stdout to its original state
self.stdout = saved_self_stdout
@@ -3053,29 +3313,28 @@ a..b, a:b, a:, ..b items by indices (inclusive)
msg = '{} {} saved to transcript file {!r}'
self.pfeedback(msg.format(len(history), plural, transcript_file))
- @with_argument_list
- def do_edit(self, arglist: List[str]) -> None:
- """Edit a file in a text editor
+ edit_description = ("Edit a file in a text editor\n"
+ "\n"
+ "The editor used is determined by a settable parameter. To set it:\n"
+ "\n"
+ " set editor (program-name)")
-Usage: edit [file_path]
- Where:
- * file_path - path to a file to open in editor
+ edit_parser = ACArgumentParser(description=edit_description)
+ setattr(edit_parser.add_argument('file_path', help="path to a file to open in editor", nargs="?"),
+ ACTION_ARG_CHOICES, ('path_complete',))
-The editor used is determined by the ``editor`` settable parameter.
-"set editor (program-name)" to change or set the EDITOR environment variable.
-"""
+ @with_argparser(edit_parser)
+ def do_edit(self, args: argparse.Namespace) -> None:
+ """Edit a file in a text editor"""
if not self.editor:
raise EnvironmentError("Please use 'set editor' to specify your text editing program of choice.")
- filename = arglist[0] if arglist else ''
- if filename:
- os.system('"{}" "{}"'.format(self.editor, filename))
- else:
- os.system('"{}"'.format(self.editor))
- def complete_edit(self, text: str, line: str, begidx: int, endidx: int) -> List[str]:
- """Enable tab-completion for edit command."""
- index_dict = {1: self.path_complete}
- return self.index_based_complete(text, line, begidx, endidx, index_dict)
+ editor = utils.quote_string_if_needed(self.editor)
+ if args.file_path:
+ expanded_path = utils.quote_string_if_needed(os.path.expanduser(args.file_path))
+ os.system('{} {}'.format(editor, expanded_path))
+ else:
+ os.system('{}'.format(editor))
@property
def _current_script_dir(self) -> Optional[str]:
@@ -3085,54 +3344,25 @@ The editor used is determined by the ``editor`` settable parameter.
else:
return None
- @with_argument_list
- def do__relative_load(self, arglist: List[str]) -> None:
- """Runs commands in script file that is encoded as either ASCII or UTF-8 text
-
- Usage: _relative_load <file_path>
-
- optional argument:
- file_path a file path pointing to a script
-
-Script should contain one command per line, just like command would be typed in console.
-
-If this is called from within an already-running script, the filename will be interpreted
-relative to the already-running script's directory.
-
-NOTE: This command is intended to only be used within text file scripts.
- """
- # If arg is None or arg is an empty string this is an error
- if not arglist:
- self.perror('_relative_load command requires a file path:', traceback_war=False)
- return
-
- file_path = arglist[0].strip()
- # NOTE: Relative path is an absolute path, it is just relative to the current script directory
- relative_path = os.path.join(self._current_script_dir or '', file_path)
- self.do_load([relative_path])
-
- def do_eos(self, _: str) -> None:
- """Handles cleanup when a script has finished executing"""
+ @with_argparser(ACArgumentParser(epilog=INTERNAL_COMMAND_EPILOG))
+ def do_eos(self, _: argparse.Namespace) -> None:
+ """Handle cleanup when a script has finished executing"""
if self._script_dir:
self._script_dir.pop()
- @with_argument_list
- def do_load(self, arglist: List[str]) -> None:
- """Runs commands in script file that is encoded as either ASCII or UTF-8 text
+ load_description = ("Run commands in script file that is encoded as either ASCII or UTF-8 text\n"
+ "\n"
+ "Script should contain one command per line, just like the command would be\n"
+ "typed in the console.")
- Usage: load <file_path>
+ load_parser = ACArgumentParser(description=load_description)
+ setattr(load_parser.add_argument('script_path', help="path to the script file"),
+ ACTION_ARG_CHOICES, ('path_complete',))
- * file_path - a file path pointing to a script
-
-Script should contain one command per line, just like command would be typed in console.
- """
- # If arg is None or arg is an empty string this is an error
- if not arglist:
- self.perror('load command requires a file path', traceback_war=False)
- return
-
- file_path = arglist[0].strip()
- expanded_path = os.path.abspath(os.path.expanduser(file_path))
+ @with_argparser(load_parser)
+ def do_load(self, args: argparse.Namespace) -> None:
+ """Run commands in script file that is encoded as either ASCII or UTF-8 text"""
+ expanded_path = os.path.abspath(os.path.expanduser(args.script_path))
# Make sure the path exists and we can access it
if not os.path.exists(expanded_path):
@@ -3166,10 +3396,24 @@ Script should contain one command per line, just like command would be typed in
self._script_dir.append(os.path.dirname(expanded_path))
- def complete_load(self, text: str, line: str, begidx: int, endidx: int) -> List[str]:
- """Enable tab-completion for load command."""
- index_dict = {1: self.path_complete}
- return self.index_based_complete(text, line, begidx, endidx, index_dict)
+ relative_load_description = load_description
+ relative_load_description += ("\n\n"
+ "If this is called from within an already-running script, the filename will be\n"
+ "interpreted relative to the already-running script's directory.")
+
+ relative_load_epilog = ("Notes:\n"
+ " This command is intended to only be used within text file scripts.")
+
+ relative_load_parser = ACArgumentParser(description=relative_load_description, epilog=relative_load_epilog)
+ relative_load_parser.add_argument('file_path', help='a file path pointing to a script')
+
+ @with_argparser(relative_load_parser)
+ def do__relative_load(self, args: argparse.Namespace) -> None:
+ """"""
+ file_path = args.file_path
+ # NOTE: Relative path is an absolute path, it is just relative to the current script directory
+ relative_path = os.path.join(self._current_script_dir or '', file_path)
+ self.do_load(relative_path)
def run_transcript_tests(self, callargs: List[str]) -> None:
"""Runs transcript tests for provided file(s).
@@ -3191,6 +3435,125 @@ Script should contain one command per line, just like command would be typed in
runner = unittest.TextTestRunner()
runner.run(testcase)
+ def _clear_input_lines_str(self) -> str: # pragma: no cover
+ """
+ Returns a string that if printed will clear the prompt and input lines in the terminal,
+ leaving the cursor at the beginning of the first input line
+ :return: the string to print
+ """
+ if not (vt100_support and self.use_rawinput):
+ return ''
+
+ import shutil
+ import colorama.ansi as ansi
+ from colorama import Cursor
+
+ visible_prompt = self.visible_prompt
+
+ # Get the size of the terminal
+ terminal_size = shutil.get_terminal_size()
+
+ # Figure out how many lines the prompt and user input take up
+ total_str_size = len(visible_prompt) + len(readline.get_line_buffer())
+ num_input_lines = int(total_str_size / terminal_size.columns) + 1
+
+ # Get the cursor's offset from the beginning of the first input line
+ cursor_input_offset = len(visible_prompt) + rl_get_point()
+
+ # Calculate what input line the cursor is on
+ cursor_input_line = int(cursor_input_offset / terminal_size.columns) + 1
+
+ # Create a string that will clear all input lines and print the alert
+ terminal_str = ''
+
+ # Move the cursor down to the last input line
+ if cursor_input_line != num_input_lines:
+ terminal_str += Cursor.DOWN(num_input_lines - cursor_input_line)
+
+ # Clear each input line from the bottom up so that the cursor ends up on the original first input line
+ terminal_str += (ansi.clear_line() + Cursor.UP(1)) * (num_input_lines - 1)
+ terminal_str += ansi.clear_line()
+
+ # Move the cursor to the beginning of the first input line
+ terminal_str += '\r'
+
+ return terminal_str
+
+ def async_alert(self, alert_msg: str, new_prompt: Optional[str] = None) -> None: # pragma: no cover
+ """
+ Used to display an important message to the user while they are at the prompt in between commands.
+ To the user it appears as if an alert message is printed above the prompt and their current input
+ text and cursor location is left alone.
+
+ IMPORTANT: Do not call this unless you have acquired self.terminal_lock
+ first, which ensures a prompt is onscreen
+
+ :param alert_msg: the message to display to the user
+ :param new_prompt: if you also want to change the prompt that is displayed, then include it here
+ see async_update_prompt() docstring for guidance on updating a prompt
+ :raises RuntimeError if called while another thread holds terminal_lock
+ """
+ if not (vt100_support and self.use_rawinput):
+ return
+
+ # Sanity check that can't fail if self.terminal_lock was acquired before calling this function
+ if self.terminal_lock.acquire(blocking=False):
+
+ # Generate a string to clear the prompt and input lines and replace with the alert
+ terminal_str = self._clear_input_lines_str()
+ if alert_msg:
+ terminal_str += alert_msg + '\n'
+
+ # Set the new prompt now that _clear_input_lines_str is done using the old prompt
+ if new_prompt is not None:
+ self.prompt = new_prompt
+ rl_set_prompt(self.prompt)
+
+ # Print terminal_str to erase the lines
+ if rl_type == RlType.GNU:
+ sys.stderr.write(terminal_str)
+ elif rl_type == RlType.PYREADLINE:
+ readline.rl.mode.console.write(terminal_str)
+
+ # Redraw the prompt and input lines
+ rl_force_redisplay()
+
+ self.terminal_lock.release()
+
+ else:
+ raise RuntimeError("another thread holds terminal_lock")
+
+ def async_update_prompt(self, new_prompt: str) -> None: # pragma: no cover
+ """
+ Updates the prompt while the user is still typing at it. This is good for alerting the user to system
+ changes dynamically in between commands. For instance you could alter the color of the prompt to indicate
+ a system status or increase a counter to report an event. If you do alter the actual text of the prompt,
+ it is best to keep the prompt the same width as what's on screen. Otherwise the user's input text will
+ be shifted and the update will not be seamless.
+
+ IMPORTANT: Do not call this unless you have acquired self.terminal_lock
+ first, which ensures a prompt is onscreen
+
+ :param new_prompt: what to change the prompt to
+ """
+ self.async_alert('', new_prompt)
+
+ @staticmethod
+ def set_window_title(title: str) -> None: # pragma: no cover
+ """
+ Sets the terminal window title
+ :param title: the new window title
+ """
+ if not vt100_support:
+ return
+
+ import colorama.ansi as ansi
+ try:
+ sys.stderr.write(ansi.set_title(title))
+ except AttributeError:
+ # Debugging in Pycharm has issues with setting terminal title
+ pass
+
def cmdloop(self, intro: Optional[str]=None) -> None:
"""This is an outer wrapper around _cmdloop() which deals with extra features provided by cmd2.
@@ -3216,6 +3579,14 @@ Script should contain one command per line, just like command would be typed in
if callargs:
self.cmdqueue.extend(callargs)
+ # Register a SIGINT signal handler for Ctrl+C
+ import signal
+ original_sigint_handler = signal.getsignal(signal.SIGINT)
+ signal.signal(signal.SIGINT, self.sigint_handler)
+
+ # Grab terminal lock before the prompt has been drawn by readline
+ self.terminal_lock.acquire()
+
# Always run the preloop first
for func in self._preloop_hooks:
func()
@@ -3241,6 +3612,13 @@ Script should contain one command per line, just like command would be typed in
func()
self.postloop()
+ # Release terminal lock now that postloop code should have stopped any terminal updater threads
+ # This will also zero the lock count in case cmdloop() is called again
+ self.terminal_lock.release()
+
+ # Restore the original signal handler
+ signal.signal(signal.SIGINT, original_sigint_handler)
+
if self.exit_code is not None:
sys.exit(self.exit_code)
@@ -3249,7 +3627,7 @@ Script should contain one command per line, just like command would be typed in
# plugin related functions
#
###
- def _initialize_plugin_system(self):
+ def _initialize_plugin_system(self) -> None:
"""Initialize the plugin system"""
self._preloop_hooks = []
self._postloop_hooks = []
@@ -3259,7 +3637,7 @@ Script should contain one command per line, just like command would be typed in
self._cmdfinalization_hooks = []
@classmethod
- def _validate_callable_param_count(cls, func: Callable, count: int):
+ def _validate_callable_param_count(cls, func: Callable, count: int) -> None:
"""Ensure a function has the given number of parameters."""
signature = inspect.signature(func)
# validate that the callable has the right number of parameters
@@ -3272,7 +3650,7 @@ Script should contain one command per line, just like command would be typed in
))
@classmethod
- def _validate_prepostloop_callable(cls, func: Callable):
+ def _validate_prepostloop_callable(cls, func: Callable[[None], None]) -> None:
"""Check parameter and return types for preloop and postloop hooks."""
cls._validate_callable_param_count(func, 0)
# make sure there is no return notation
@@ -3282,18 +3660,18 @@ Script should contain one command per line, just like command would be typed in
func.__name__,
))
- def register_preloop_hook(self, func: Callable):
+ def register_preloop_hook(self, func: Callable[[None], None]) -> None:
"""Register a function to be called at the beginning of the command loop."""
self._validate_prepostloop_callable(func)
self._preloop_hooks.append(func)
- def register_postloop_hook(self, func: Callable):
+ def register_postloop_hook(self, func: Callable[[None], None]) -> None:
"""Register a function to be called at the end of the command loop."""
self._validate_prepostloop_callable(func)
self._postloop_hooks.append(func)
@classmethod
- def _validate_postparsing_callable(cls, func: Callable):
+ def _validate_postparsing_callable(cls, func: Callable[[plugin.PostparsingData], plugin.PostparsingData]) -> None:
"""Check parameter and return types for postparsing hooks"""
cls._validate_callable_param_count(func, 1)
signature = inspect.signature(func)
@@ -3307,13 +3685,13 @@ Script should contain one command per line, just like command would be typed in
func.__name__
))
- def register_postparsing_hook(self, func: Callable):
+ def register_postparsing_hook(self, func: Callable[[plugin.PostparsingData], plugin.PostparsingData]) -> None:
"""Register a function to be called after parsing user input but before running the command"""
self._validate_postparsing_callable(func)
self._postparsing_hooks.append(func)
@classmethod
- def _validate_prepostcmd_hook(cls, func: Callable, data_type: Type):
+ def _validate_prepostcmd_hook(cls, func: Callable, data_type: Type) -> None:
"""Check parameter and return types for pre and post command hooks."""
signature = inspect.signature(func)
# validate that the callable has the right number of parameters
@@ -3340,18 +3718,19 @@ Script should contain one command per line, just like command would be typed in
data_type,
))
- def register_precmd_hook(self, func: Callable):
+ def register_precmd_hook(self, func: Callable[[plugin.PrecommandData], plugin.PrecommandData]) -> None:
"""Register a hook to be called before the command function."""
self._validate_prepostcmd_hook(func, plugin.PrecommandData)
self._precmd_hooks.append(func)
- def register_postcmd_hook(self, func: Callable):
+ def register_postcmd_hook(self, func: Callable[[plugin.PostcommandData], plugin.PostcommandData]) -> None:
"""Register a hook to be called after the command function."""
self._validate_prepostcmd_hook(func, plugin.PostcommandData)
self._postcmd_hooks.append(func)
@classmethod
- def _validate_cmdfinalization_callable(cls, func: Callable):
+ def _validate_cmdfinalization_callable(cls, func: Callable[[plugin.CommandFinalizationData],
+ plugin.CommandFinalizationData]) -> None:
"""Check parameter and return types for command finalization hooks."""
cls._validate_callable_param_count(func, 1)
signature = inspect.signature(func)
@@ -3363,7 +3742,8 @@ Script should contain one command per line, just like command would be typed in
raise TypeError("{} must declare return a return type of "
"'cmd2.plugin.CommandFinalizationData'".format(func.__name__))
- def register_cmdfinalization_hook(self, func: Callable):
+ def register_cmdfinalization_hook(self, func: Callable[[plugin.CommandFinalizationData],
+ plugin.CommandFinalizationData]) -> None:
"""Register a hook to be called after a command is completed, whether it completes successfully or not."""
self._validate_cmdfinalization_callable(func)
self._cmdfinalization_hooks.append(func)
@@ -3429,7 +3809,7 @@ class History(list):
def get(self, getme: Optional[Union[int, str]]=None) -> List[HistoryItem]:
"""Get an item or items from the History list using 1-based indexing.
- :param getme: item(s) to get - either an integer index or string to search for
+ :param getme: optional item(s) to get (either an integer index or string to search for)
:return: list of HistoryItems matching the retrieval criteria
"""
if not getme: