summaryrefslogtreecommitdiff
path: root/cmd2
diff options
context:
space:
mode:
authorTodd Leonhardt <todd.leonhardt@gmail.com>2019-06-15 15:00:59 -0400
committerTodd Leonhardt <todd.leonhardt@gmail.com>2019-06-15 15:00:59 -0400
commit70bf9e1a12b89bb913c11fb07893ab4b9cab2576 (patch)
tree27bb62898d635bcaa8e6e8182d52f5105210b3f6 /cmd2
parentc12ba0ff11b3a8fd083c641cb9149aff6494bbf9 (diff)
downloadcmd2-git-70bf9e1a12b89bb913c11fb07893ab4b9cab2576.tar.gz
Began work to minimize public API
Diffstat (limited to 'cmd2')
-rw-r--r--cmd2/cmd2.py310
-rw-r--r--cmd2/constants.py2
-rw-r--r--cmd2/parsing.py4
-rw-r--r--cmd2/pyscript_bridge.py2
-rw-r--r--cmd2/utils.py31
5 files changed, 170 insertions, 179 deletions
diff --git a/cmd2/cmd2.py b/cmd2/cmd2.py
index 46b098c5..062e6582 100644
--- a/cmd2/cmd2.py
+++ b/cmd2/cmd2.py
@@ -171,7 +171,7 @@ def with_argument_list(*args: List[Callable], preserve_quotes: bool = False) ->
def arg_decorator(func: Callable):
@functools.wraps(func)
def cmd_wrapper(cmd2_instance, statement: Union[Statement, str]):
- _, parsed_arglist = cmd2_instance.statement_parser.get_command_arg_list(command_name,
+ _, parsed_arglist = cmd2_instance._statement_parser.get_command_arg_list(command_name,
statement,
preserve_quotes)
@@ -210,7 +210,7 @@ def with_argparser_and_unknown_args(argparser: argparse.ArgumentParser, *,
def arg_decorator(func: Callable):
@functools.wraps(func)
def cmd_wrapper(cmd2_instance, statement: Union[Statement, str]):
- statement, parsed_arglist = cmd2_instance.statement_parser.get_command_arg_list(command_name,
+ statement, parsed_arglist = cmd2_instance._statement_parser.get_command_arg_list(command_name,
statement,
preserve_quotes)
@@ -268,7 +268,7 @@ def with_argparser(argparser: argparse.ArgumentParser, *,
def arg_decorator(func: Callable):
@functools.wraps(func)
def cmd_wrapper(cmd2_instance, statement: Union[Statement, str]):
- statement, parsed_arglist = cmd2_instance.statement_parser.get_command_arg_list(command_name,
+ statement, parsed_arglist = cmd2_instance._statement_parser.get_command_arg_list(command_name,
statement,
preserve_quotes)
@@ -327,7 +327,6 @@ class Cmd(cmd.Cmd):
Line-oriented command interpreters are often useful for test harnesses, internal tools, and rapid prototypes.
"""
- DEFAULT_SHORTCUTS = {'?': 'help', '!': 'shell', '@': 'run_script', '@@': '_relative_run_script'}
DEFAULT_EDITOR = utils.find_editor()
def __init__(self, completekey: str = 'tab', stdin=None, stdout=None, *,
@@ -402,51 +401,46 @@ class Cmd(cmd.Cmd):
# Commands to exclude from the history command
# initialize history
- self.persistent_history_length = persistent_history_length
+ self._persistent_history_length = persistent_history_length
self._initialize_history(persistent_history_file)
self.exclude_from_history = '''history edit eof'''.split()
# Command aliases and macros
self.macros = dict()
- self.initial_stdout = sys.stdout
- self.pystate = {}
- self.py_history = []
+ self._pystate = {}
+ self._py_history = []
self.pyscript_name = 'app'
if shortcuts is None:
- shortcuts = self.DEFAULT_SHORTCUTS
+ shortcuts = constants.DEFAULT_SHORTCUTS
shortcuts = sorted(shortcuts.items(), reverse=True)
- self.statement_parser = StatementParser(allow_redirection=allow_redirection,
- terminators=terminators,
- multiline_commands=multiline_commands,
- shortcuts=shortcuts)
+ self._statement_parser = StatementParser(allow_redirection=allow_redirection,
+ terminators=terminators,
+ multiline_commands=multiline_commands,
+ shortcuts=shortcuts)
# True if running inside a Python script or interactive console, False otherwise
self._in_py = False
- # Stores results from the last command run to enable usage of results in a Python script or interactive console
- # Built-in commands don't make use of this. It is purely there for user-defined commands and convenience.
- self._last_result = None
-
# Used by run_script command to store current script dir as a LIFO queue to support _relative_run_script command
self._script_dir = []
# Context manager used to protect critical sections in the main thread from stopping due to a KeyboardInterrupt
- self.sigint_protection = utils.ContextFlag()
+ self._sigint_protection = utils.ContextFlag()
# If the current command created a process to pipe to, then this will be a ProcReader object.
# Otherwise it will be None. Its used to know when a pipe process can be killed and/or waited upon.
- self.cur_pipe_proc_reader = None
+ self._cur_pipe_proc_reader = None
# Used by complete() for readline tab completion
- self.completion_matches = []
+ self._completion_matches = []
- # Used to keep track of whether we are redirecting or piping output
- self.redirecting = False
+ # Used to keep track of whether we are _redirecting or piping output
+ self._redirecting = False
# Used to keep track of whether a continuation prompt is being displayed
- self.at_continuation_prompt = False
+ self._at_continuation_prompt = False
# The error that prints when no help information can be found
self.help_error = "No help on {}"
@@ -535,7 +529,7 @@ class Cmd(cmd.Cmd):
self.pager_chop = 'less -SRXF'
# This boolean flag determines whether or not the cmd2 application can interact with the clipboard
- self.can_clip = can_clip
+ self._can_clip = can_clip
# This determines the value returned by cmdloop() when exiting the application
self.exit_code = 0
@@ -543,7 +537,7 @@ class Cmd(cmd.Cmd):
# This lock should be acquired before doing any asynchronous changes to the terminal to
# ensure the updates to the terminal don't interfere with the input being typed or output
# being printed by a command.
- self.terminal_lock = threading.RLock()
+ self._terminal_lock = threading.RLock()
# Commands that have been disabled from use. This is to support commands that are only available
# during specific states of the application. This dictionary's keys are the command names and its
@@ -566,24 +560,19 @@ class Cmd(cmd.Cmd):
@property
def aliases(self) -> Dict[str, str]:
"""Read-only property to access the aliases stored in the StatementParser."""
- return self.statement_parser.aliases
-
- @property
- def shortcuts(self) -> Tuple[Tuple[str, str]]:
- """Read-only property to access the shortcuts stored in the StatementParser."""
- return self.statement_parser.shortcuts
+ return self._statement_parser.aliases
@property
def allow_redirection(self) -> bool:
"""Getter for the allow_redirection property that determines whether or not redirection of stdout is allowed."""
- return self.statement_parser.allow_redirection
+ return self._statement_parser.allow_redirection
@allow_redirection.setter
def allow_redirection(self, value: bool) -> None:
"""Setter for the allow_redirection property that determines whether or not redirection of stdout is allowed."""
- self.statement_parser.allow_redirection = value
+ self._statement_parser.allow_redirection = value
- def decolorized_write(self, fileobj: IO, msg: str) -> None:
+ def _decolorized_write(self, fileobj: IO, msg: str) -> None:
"""Write a string to a fileobject, stripping ANSI escape sequences if necessary
Honor the current colors setting, which requires us to check whether the
@@ -612,7 +601,7 @@ class Cmd(cmd.Cmd):
msg_str += end
if color:
msg_str = color + msg_str + Fore.RESET
- self.decolorized_write(self.stdout, msg_str)
+ self._decolorized_write(self.stdout, msg_str)
except BrokenPipeError:
# This occurs if a command's output is being piped to another
# process and that process closes before the command is
@@ -640,12 +629,12 @@ class Cmd(cmd.Cmd):
else:
err_msg = "{}\n".format(err)
err_msg = err_color + err_msg + Fore.RESET
- self.decolorized_write(sys.stderr, err_msg)
+ self._decolorized_write(sys.stderr, err_msg)
if traceback_war and not self.debug:
war = "To enable full traceback, run the following command: 'set debug true'\n"
war = war_color + war + Fore.RESET
- self.decolorized_write(sys.stderr, war)
+ self._decolorized_write(sys.stderr, war)
def pfeedback(self, msg: str) -> None:
"""For printing nonessential feedback. Can be silenced with `quiet`.
@@ -654,7 +643,7 @@ class Cmd(cmd.Cmd):
if self.feedback_to_output:
self.poutput(msg)
else:
- self.decolorized_write(sys.stderr, "{}\n".format(msg))
+ self._decolorized_write(sys.stderr, "{}\n".format(msg))
def ppaged(self, msg: str, end: str = '\n', chop: bool = False) -> None:
"""Print output using a pager if it would go off screen and stdout isn't currently being redirected.
@@ -687,9 +676,9 @@ class Cmd(cmd.Cmd):
if sys.platform.startswith('win') or os.environ.get('TERM') is not None:
functional_terminal = True
- # Don't attempt to use a pager that can block if redirecting or running a script (either text or Python)
+ # Don't attempt to use a pager that can block if _redirecting or running a script (either text or Python)
# Also only attempt to use a pager if actually running in a real fully functional terminal
- if functional_terminal and not self.redirecting and not self._in_py and not self._script_dir:
+ if functional_terminal and not self._redirecting and not self._in_py and not self._script_dir:
if self.colors.lower() == constants.COLORS_NEVER.lower():
msg_str = utils.strip_ansi(msg_str)
@@ -699,11 +688,11 @@ class Cmd(cmd.Cmd):
# Prevent KeyboardInterrupts while in the pager. The pager application will
# still receive the SIGINT since it is in the same process group as us.
- with self.sigint_protection:
+ with self._sigint_protection:
pipe_proc = subprocess.Popen(pager, shell=True, stdin=subprocess.PIPE)
pipe_proc.communicate(msg_str.encode('utf-8', 'replace'))
else:
- self.decolorized_write(self.stdout, msg_str)
+ self._decolorized_write(self.stdout, msg_str)
except BrokenPipeError:
# This occurs if a command's output is being piped to another process and that process closes before the
# command is finished. If you would like your application to print a warning message, then set the
@@ -713,7 +702,7 @@ class Cmd(cmd.Cmd):
# ----- Methods related to tab completion -----
- def reset_completion_defaults(self) -> None:
+ def _reset_completion_defaults(self) -> None:
"""
Resets tab completion settings
Needs to be called each time readline runs tab completion
@@ -731,7 +720,7 @@ class Cmd(cmd.Cmd):
# noinspection PyUnresolvedReferences
readline.rl.mode._display_completions = self._display_matches_pyreadline
- def tokens_for_completion(self, line: str, begidx: int, endidx: int) -> Tuple[List[str], List[str]]:
+ def _tokens_for_completion(self, line: str, begidx: int, endidx: int) -> Tuple[List[str], List[str]]:
"""
Used by tab completion functions to get all tokens through the one being completed
:param line: the current input line with leading whitespace removed
@@ -945,7 +934,7 @@ class Cmd(cmd.Cmd):
:return: a list of possible tab completions
"""
# Get all tokens through the one being completed
- tokens, _ = self.tokens_for_completion(line, begidx, endidx)
+ tokens, _ = self._tokens_for_completion(line, begidx, endidx)
if not tokens:
return []
@@ -987,7 +976,7 @@ class Cmd(cmd.Cmd):
:return: a list of possible tab completions
"""
# Get all tokens through the one being completed
- tokens, _ = self.tokens_for_completion(line, begidx, endidx)
+ tokens, _ = self._tokens_for_completion(line, begidx, endidx)
if not tokens:
return []
@@ -1157,35 +1146,6 @@ class Cmd(cmd.Cmd):
return matches
- @staticmethod
- def get_exes_in_path(starts_with: str) -> List[str]:
- """Returns names of executables in a user's path
-
- :param starts_with: what the exes should start with. leave blank for all exes in path.
- :return: a list of matching exe names
- """
- # Purposely don't match any executable containing wildcards
- wildcards = ['*', '?']
- for wildcard in wildcards:
- if wildcard in starts_with:
- return []
-
- # Get a list of every directory in the PATH environment variable and ignore symbolic links
- paths = [p for p in os.getenv('PATH').split(os.path.pathsep) if not os.path.islink(p)]
-
- # Use a set to store exe names since there can be duplicates
- exes_set = set()
-
- # Find every executable file in the user's path that matches the pattern
- for path in paths:
- full_path = os.path.join(path, starts_with)
- matches = utils.files_from_glob_pattern(full_path + '*', access=os.X_OK)
-
- for match in matches:
- exes_set.add(os.path.basename(match))
-
- return list(exes_set)
-
def shell_cmd_complete(self, text: str, line: str, begidx: int, endidx: int,
complete_blank: bool = False) -> List[str]:
"""Performs completion of executables either in a user's path or a given path
@@ -1204,7 +1164,7 @@ class Cmd(cmd.Cmd):
# If there are no path characters in the search text, then do shell command completion in the user's path
if not text.startswith('~') and os.path.sep not in text:
- return self.get_exes_in_path(text)
+ return utils.get_exes_in_path(text)
# Otherwise look for executables in the given path
else:
@@ -1228,7 +1188,7 @@ class Cmd(cmd.Cmd):
# Get all tokens through the one being completed. We want the raw tokens
# so we can tell if redirection strings are quoted and ignore them.
- _, raw_tokens = self.tokens_for_completion(line, begidx, endidx)
+ _, raw_tokens = self._tokens_for_completion(line, begidx, endidx)
if not raw_tokens:
return []
@@ -1382,7 +1342,7 @@ class Cmd(cmd.Cmd):
import functools
if state == 0 and rl_type != RlType.NONE:
unclosed_quote = ''
- self.reset_completion_defaults()
+ self._reset_completion_defaults()
# lstrip the original line
orig_line = readline.get_line_buffer()
@@ -1399,7 +1359,7 @@ class Cmd(cmd.Cmd):
# from text and update the indexes. This only applies if we are at the the beginning of the line.
shortcut_to_restore = ''
if begidx == 0:
- for (shortcut, _) in self.shortcuts:
+ for (shortcut, _) in self._statement_parser.shortcuts:
if text.startswith(shortcut):
# Save the shortcut to restore later
shortcut_to_restore = shortcut
@@ -1413,7 +1373,7 @@ class Cmd(cmd.Cmd):
if begidx > 0:
# Parse the command line
- statement = self.statement_parser.parse_command_only(line)
+ statement = self._statement_parser.parse_command_only(line)
command = statement.command
expanded_line = statement.command_and_args
@@ -1433,12 +1393,12 @@ class Cmd(cmd.Cmd):
line = expanded_line
# Get all tokens through the one being completed
- tokens, raw_tokens = self.tokens_for_completion(line, begidx, endidx)
+ tokens, raw_tokens = self._tokens_for_completion(line, begidx, endidx)
# Check if we either had a parsing error or are trying to complete the command token
# The latter can happen if " or ' was entered as the command
if len(tokens) <= 1:
- self.completion_matches = []
+ self._completion_matches = []
return None
# Text we need to remove from completions later
@@ -1489,27 +1449,27 @@ class Cmd(cmd.Cmd):
# A valid command was not entered
else:
# Check if this command should be run as a shell command
- if self.default_to_shell and command in self.get_exes_in_path(command):
+ if self.default_to_shell and command in utils.get_exes_in_path(command):
compfunc = self.path_complete
else:
compfunc = self.completedefault
# Attempt tab completion for redirection first, and if that isn't occurring,
# call the completer function for the current command
- self.completion_matches = self._redirect_complete(text, line, begidx, endidx, compfunc)
+ self._completion_matches = self._redirect_complete(text, line, begidx, endidx, compfunc)
- if self.completion_matches:
+ if self._completion_matches:
# Eliminate duplicates
- self.completion_matches = utils.remove_duplicates(self.completion_matches)
+ self._completion_matches = utils.remove_duplicates(self._completion_matches)
self.display_matches = utils.remove_duplicates(self.display_matches)
if not self.display_matches:
- # Since self.display_matches is empty, set it to self.completion_matches
+ # Since self.display_matches is empty, set it to self._completion_matches
# before we alter them. That way the suggestions will reflect how we parsed
# the token being completed and not how readline did.
import copy
- self.display_matches = copy.copy(self.completion_matches)
+ self.display_matches = copy.copy(self._completion_matches)
# Check if we need to add an opening quote
if not unclosed_quote:
@@ -1517,7 +1477,7 @@ class Cmd(cmd.Cmd):
add_quote = False
# This is the tab completion text that will appear on the command line.
- common_prefix = os.path.commonprefix(self.completion_matches)
+ common_prefix = os.path.commonprefix(self._completion_matches)
if self.matches_delimited:
# Check if any portion of the display matches appears in the tab completion
@@ -1530,36 +1490,36 @@ class Cmd(cmd.Cmd):
add_quote = True
# If there is a tab completion and any match has a space, then add an opening quote
- elif common_prefix and any(' ' in match for match in self.completion_matches):
+ elif common_prefix and any(' ' in match for match in self._completion_matches):
add_quote = True
if add_quote:
# Figure out what kind of quote to add and save it as the unclosed_quote
- if any('"' in match for match in self.completion_matches):
+ if any('"' in match for match in self._completion_matches):
unclosed_quote = "'"
else:
unclosed_quote = '"'
- self.completion_matches = [unclosed_quote + match for match in self.completion_matches]
+ self._completion_matches = [unclosed_quote + match for match in self._completion_matches]
# Check if we need to remove text from the beginning of tab completions
elif text_to_remove:
- self.completion_matches = \
- [match.replace(text_to_remove, '', 1) for match in self.completion_matches]
+ self._completion_matches = \
+ [match.replace(text_to_remove, '', 1) for match in self._completion_matches]
# Check if we need to restore a shortcut in the tab completions
# so it doesn't get erased from the command line
if shortcut_to_restore:
- self.completion_matches = \
- [shortcut_to_restore + match for match in self.completion_matches]
+ self._completion_matches = \
+ [shortcut_to_restore + match for match in self._completion_matches]
else:
# Complete token against anything a user can run
- self.completion_matches = self.basic_complete(text, line, begidx, endidx,
- self.get_commands_aliases_and_macros_for_completion())
+ self._completion_matches = self.basic_complete(text, line, begidx, endidx,
+ self.get_commands_aliases_and_macros_for_completion())
# Handle single result
- if len(self.completion_matches) == 1:
+ if len(self._completion_matches) == 1:
str_to_append = ''
# Add a closing quote if needed and allowed
@@ -1570,16 +1530,16 @@ class Cmd(cmd.Cmd):
if self.allow_appended_space and endidx == len(line):
str_to_append += ' '
- self.completion_matches[0] += str_to_append
+ self._completion_matches[0] += str_to_append
# Sort matches if they haven't already been sorted
if not self.matches_sorted:
- self.completion_matches.sort(key=self.matches_sort_key)
+ self._completion_matches.sort(key=self.matches_sort_key)
self.display_matches.sort(key=self.matches_sort_key)
self.matches_sorted = True
try:
- return self.completion_matches[state]
+ return self._completion_matches[state]
except IndexError:
return None
@@ -1606,7 +1566,7 @@ class Cmd(cmd.Cmd):
"""Default completion function for argparse commands."""
completer = AutoCompleter(argparser, self)
- tokens, _ = self.tokens_for_completion(line, begidx, endidx)
+ tokens, _ = self._tokens_for_completion(line, begidx, endidx)
if not tokens:
return []
@@ -1666,12 +1626,12 @@ class Cmd(cmd.Cmd):
:param signum: signal number
:param frame
"""
- if self.cur_pipe_proc_reader is not None:
+ if self._cur_pipe_proc_reader is not None:
# Pass the SIGINT to the current pipe process
- self.cur_pipe_proc_reader.send_sigint()
+ self._cur_pipe_proc_reader.send_sigint()
# Check if we are allowed to re-raise the KeyboardInterrupt
- if not self.sigint_protection:
+ if not self._sigint_protection:
raise KeyboardInterrupt("Got a keyboard interrupt")
def precmd(self, statement: Statement) -> Statement:
@@ -1692,7 +1652,7 @@ class Cmd(cmd.Cmd):
:param line: line read by readline
:return: tuple containing (command, args, line)
"""
- statement = self.statement_parser.parse_command_only(line)
+ statement = self._statement_parser.parse_command_only(line)
return statement.command, statement.args, statement.command_and_args
def onecmd_plus_hooks(self, line: str, pyscript_bridge_call: bool = False) -> bool:
@@ -1732,27 +1692,27 @@ class Cmd(cmd.Cmd):
# we need to run the finalization hooks
raise EmptyStatement
- # Keep track of whether or not we were already redirecting before this command
- already_redirecting = self.redirecting
+ # Keep track of whether or not we were already _redirecting before this command
+ already_redirecting = self._redirecting
# This will be a utils.RedirectionSavedState object for the command
saved_state = None
try:
# Get sigint protection while we set up redirection
- with self.sigint_protection:
+ with self._sigint_protection:
if pyscript_bridge_call:
# Start saving command's stdout at this point
self.stdout.pause_storage = False
redir_error, saved_state = self._redirect_output(statement)
- self.cur_pipe_proc_reader = saved_state.pipe_proc_reader
+ self._cur_pipe_proc_reader = saved_state.pipe_proc_reader
# Do not continue if an error occurred while trying to redirect
if not redir_error:
- # See if we need to update self.redirecting
+ # See if we need to update self._redirecting
if not already_redirecting:
- self.redirecting = saved_state.redirecting
+ self._redirecting = saved_state.redirecting
timestart = datetime.datetime.now()
@@ -1783,12 +1743,12 @@ class Cmd(cmd.Cmd):
self.pfeedback('Elapsed: {}'.format(datetime.datetime.now() - timestart))
finally:
# Get sigint protection while we restore stuff
- with self.sigint_protection:
+ with self._sigint_protection:
if saved_state is not None:
self._restore_output(statement, saved_state)
if not already_redirecting:
- self.redirecting = False
+ self._redirecting = False
if pyscript_bridge_call:
# Stop saving command's stdout before command finalization hooks run
@@ -1805,7 +1765,7 @@ class Cmd(cmd.Cmd):
def _run_cmdfinalization_hooks(self, stop: bool, statement: Optional[Statement]) -> bool:
"""Run the command finalization hooks"""
- with self.sigint_protection:
+ with self._sigint_protection:
if not sys.platform.startswith('win') and self.stdout.isatty():
# Before the next command runs, fix any terminal problems like those
# caused by certain binary characters having been printed to it.
@@ -1856,7 +1816,7 @@ class Cmd(cmd.Cmd):
"""
while True:
try:
- statement = self.statement_parser.parse(line)
+ statement = self._statement_parser.parse(line)
if statement.multiline_command and statement.terminator:
# we have a completed multiline command, we are done
break
@@ -1867,7 +1827,7 @@ class Cmd(cmd.Cmd):
except ValueError:
# we have unclosed quotation marks, lets parse only the command
# and see if it's a multiline
- statement = self.statement_parser.parse_command_only(line)
+ statement = self._statement_parser.parse_command_only(line)
if not statement.multiline_command:
# not a multiline command, so raise the exception
raise
@@ -1876,7 +1836,7 @@ class Cmd(cmd.Cmd):
# - a multiline command with no terminator
# - a multiline command with unclosed quotation marks
try:
- self.at_continuation_prompt = True
+ self._at_continuation_prompt = True
newline = self.pseudo_raw_input(self.continuation_prompt)
if newline == 'eof':
# they entered either a blank line, or we hit an EOF
@@ -1891,10 +1851,10 @@ class Cmd(cmd.Cmd):
raise ex
else:
self.poutput('^C')
- statement = self.statement_parser.parse('')
+ statement = self._statement_parser.parse('')
break
finally:
- self.at_continuation_prompt = False
+ self._at_continuation_prompt = False
if not statement.command:
raise EmptyStatement()
@@ -2001,7 +1961,7 @@ class Cmd(cmd.Cmd):
redir_error = False
# Initialize the saved state
- saved_state = utils.RedirectionSavedState(self.stdout, sys.stdout, self.cur_pipe_proc_reader)
+ saved_state = utils.RedirectionSavedState(self.stdout, sys.stdout, self._cur_pipe_proc_reader)
if not self.allow_redirection:
return redir_error, saved_state
@@ -2055,7 +2015,7 @@ class Cmd(cmd.Cmd):
elif statement.output:
import tempfile
- if (not statement.output_to) and (not self.can_clip):
+ if (not statement.output_to) and (not self._can_clip):
self.perror("Cannot redirect to paste buffer; install 'pyperclip' and re-run to enable",
traceback_war=False)
redir_error = True
@@ -2109,11 +2069,11 @@ class Cmd(cmd.Cmd):
sys.stdout = saved_state.saved_sys_stdout
# Check if we need to wait for the process being piped to
- if self.cur_pipe_proc_reader is not None:
- self.cur_pipe_proc_reader.wait()
+ if self._cur_pipe_proc_reader is not None:
+ self._cur_pipe_proc_reader.wait()
- # Restore cur_pipe_proc_reader. This always is done, regardless of whether this command redirected.
- self.cur_pipe_proc_reader = saved_state.saved_pipe_proc_reader
+ # Restore _cur_pipe_proc_reader. This always is done, regardless of whether this command redirected.
+ self._cur_pipe_proc_reader = saved_state.saved_pipe_proc_reader
def cmd_func(self, command: str) -> Optional[Callable]:
"""
@@ -2175,7 +2135,7 @@ class Cmd(cmd.Cmd):
return self.do_shell(statement.command_and_args)
else:
err_msg = self.default_error.format(statement.command)
- self.decolorized_write(sys.stderr, "{}\n".format(err_msg))
+ self._decolorized_write(sys.stderr, "{}\n".format(err_msg))
def pseudo_raw_input(self, prompt: str) -> str:
"""Began life as a copy of cmd's cmdloop; like raw_input but
@@ -2187,10 +2147,10 @@ class Cmd(cmd.Cmd):
if self.use_rawinput:
try:
if sys.stdin.isatty():
- # Wrap in try since terminal_lock may not be locked when this function is called from unit tests
+ # Wrap in try since _terminal_lock may not be locked when this function is called from unit tests
try:
# A prompt is about to be drawn. Allow asynchronous changes to the terminal.
- self.terminal_lock.release()
+ self._terminal_lock.release()
except RuntimeError:
pass
@@ -2206,7 +2166,7 @@ class Cmd(cmd.Cmd):
finally:
if sys.stdin.isatty():
# The prompt is gone. Do not allow asynchronous changes to the terminal.
- self.terminal_lock.acquire()
+ self._terminal_lock.acquire()
else:
if self.stdin.isatty():
# on a tty, print the prompt first, then read the line
@@ -2302,7 +2262,7 @@ class Cmd(cmd.Cmd):
"""Create or overwrite an alias"""
# Validate the alias name
- valid, errmsg = self.statement_parser.is_valid_command(args.name)
+ valid, errmsg = self._statement_parser.is_valid_command(args.name)
if not valid:
self.perror("Invalid alias name: {}".format(errmsg), traceback_war=False)
return
@@ -2313,7 +2273,7 @@ class Cmd(cmd.Cmd):
# Unquote redirection and terminator tokens
tokens_to_unquote = constants.REDIRECTION_TOKENS
- tokens_to_unquote.extend(self.statement_parser.terminators)
+ tokens_to_unquote.extend(self._statement_parser.terminators)
utils.unquote_specific_tokens(args.command_args, tokens_to_unquote)
# Build the alias value string
@@ -2433,7 +2393,7 @@ class Cmd(cmd.Cmd):
"""Create or overwrite a macro"""
# Validate the macro name
- valid, errmsg = self.statement_parser.is_valid_command(args.name)
+ valid, errmsg = self._statement_parser.is_valid_command(args.name)
if not valid:
self.perror("Invalid macro name: {}".format(errmsg), traceback_war=False)
return
@@ -2448,7 +2408,7 @@ class Cmd(cmd.Cmd):
# Unquote redirection and terminator tokens
tokens_to_unquote = constants.REDIRECTION_TOKENS
- tokens_to_unquote.extend(self.statement_parser.terminators)
+ tokens_to_unquote.extend(self._statement_parser.terminators)
utils.unquote_specific_tokens(args.command_args, tokens_to_unquote)
# Build the macro value string
@@ -2643,7 +2603,7 @@ class Cmd(cmd.Cmd):
"""Completes the subcommand argument of help"""
# Get all tokens through the one being completed
- tokens, _ = self.tokens_for_completion(line, begidx, endidx)
+ tokens, _ = self._tokens_for_completion(line, begidx, endidx)
if not tokens:
return []
@@ -2707,7 +2667,7 @@ class Cmd(cmd.Cmd):
# If there is no help information then print an error
elif help_func is None and (func is None or not func.__doc__):
err_msg = self.help_error.format(args.command)
- self.decolorized_write(sys.stderr, "{}\n".format(err_msg))
+ self._decolorized_write(sys.stderr, "{}\n".format(err_msg))
# Otherwise delegate to cmd base class do_help()
else:
@@ -2841,7 +2801,7 @@ class Cmd(cmd.Cmd):
@with_argparser(ACArgumentParser())
def do_shortcuts(self, _: argparse.Namespace) -> None:
"""List available shortcuts"""
- result = "\n".join('%s: %s' % (sc[0], sc[1]) for sc in sorted(self.shortcuts))
+ result = "\n".join('%s: %s' % (sc[0], sc[1]) for sc in sorted(self._statement_parser.shortcuts))
self.poutput("Shortcuts for other commands:\n{}\n".format(result))
@with_argparser(ACArgumentParser(epilog=INTERNAL_COMMAND_EPILOG))
@@ -2910,7 +2870,7 @@ class Cmd(cmd.Cmd):
read_only_settings = """
Commands may be terminated with: {}
Output redirection and pipes allowed: {}"""
- return read_only_settings.format(str(self.statement_parser.terminators), self.allow_redirection)
+ return read_only_settings.format(str(self._statement_parser.terminators), self.allow_redirection)
def show(self, args: argparse.Namespace, parameter: str = '') -> None:
"""Shows current settings of parameters.
@@ -3010,7 +2970,7 @@ class Cmd(cmd.Cmd):
# Prevent KeyboardInterrupts while in the shell process. The shell process will
# still receive the SIGINT since it is in the same process group as us.
- with self.sigint_protection:
+ with self._sigint_protection:
# For any stream that is a StdSim, we will use a pipe so we can capture its output
proc = subprocess.Popen(expanded_command,
stdout=subprocess.PIPE if isinstance(self.stdout, utils.StdSim) else self.stdout,
@@ -3094,17 +3054,17 @@ class Cmd(cmd.Cmd):
raise EmbeddedConsoleExit
# Set up Python environment
- self.pystate[self.pyscript_name] = bridge
- self.pystate['run'] = py_run
- self.pystate['quit'] = py_quit
- self.pystate['exit'] = py_quit
+ self._pystate[self.pyscript_name] = bridge
+ self._pystate['run'] = py_run
+ self._pystate['quit'] = py_quit
+ self._pystate['exit'] = py_quit
if self.locals_in_py:
- self.pystate['self'] = self
- elif 'self' in self.pystate:
- del self.pystate['self']
+ self._pystate['self'] = self
+ elif 'self' in self._pystate:
+ del self._pystate['self']
- localvars = self.pystate
+ localvars = self._pystate
from code import InteractiveConsole
interp = InteractiveConsole(locals=localvars)
interp.runcode('import sys, os;sys.path.insert(0, os.getcwd())')
@@ -3139,7 +3099,7 @@ class Cmd(cmd.Cmd):
readline.clear_history()
# Restore py's history
- for item in self.py_history:
+ for item in self._py_history:
readline.add_history(item)
if self.use_rawinput and self.completekey:
@@ -3206,10 +3166,10 @@ class Cmd(cmd.Cmd):
# Set up readline for cmd2
if rl_type != RlType.NONE:
# Save py's history
- self.py_history.clear()
+ self._py_history.clear()
for i in range(1, readline.get_current_history_length() + 1):
# noinspection PyArgumentList
- self.py_history.append(readline.get_history_item(i))
+ self._py_history.append(readline.get_history_item(i))
readline.clear_history()
@@ -3508,7 +3468,7 @@ class Cmd(cmd.Cmd):
if not self.persistent_history_file:
return
- self.history.truncate(self.persistent_history_length)
+ self.history.truncate(self._persistent_history_length)
try:
with open(self.persistent_history_file, 'wb') as fobj:
pickle.dump(self.history, fobj)
@@ -3530,7 +3490,7 @@ class Cmd(cmd.Cmd):
commands_run = 0
try:
- with self.sigint_protection:
+ with self._sigint_protection:
# Disable echo while we manually redirect stdout to a StringIO buffer
saved_echo = self.echo
saved_stdout = self.stdout
@@ -3575,7 +3535,7 @@ class Cmd(cmd.Cmd):
if stop:
break
finally:
- with self.sigint_protection:
+ with self._sigint_protection:
# Restore altered attributes to their original state
self.echo = saved_echo
self.stdout = saved_stdout
@@ -3695,7 +3655,7 @@ class Cmd(cmd.Cmd):
return self.runcmds_plus_hooks(script_commands)
finally:
- with self.sigint_protection:
+ with self._sigint_protection:
# Check if a script dir was added before an exception occurred
if orig_script_dir_count != len(self._script_dir):
self._script_dir.pop()
@@ -3768,7 +3728,7 @@ class Cmd(cmd.Cmd):
runner = unittest.TextTestRunner(stream=stream)
test_results = runner.run(testcase)
if test_results.wasSuccessful():
- self.decolorized_write(sys.stderr, stream.read())
+ self._decolorized_write(sys.stderr, stream.read())
self.poutput('Tests passed', color=Fore.LIGHTGREEN_EX)
else:
# Strip off the initial traceback which isn't particularly useful for end users
@@ -3789,14 +3749,14 @@ class Cmd(cmd.Cmd):
To the user it appears as if an alert message is printed above the prompt and their current input
text and cursor location is left alone.
- IMPORTANT: This function will not print an alert unless it can acquire self.terminal_lock to ensure
+ IMPORTANT: This function will not print an alert unless it can acquire self._terminal_lock to ensure
a prompt is onscreen. Therefore it is best to acquire the lock before calling this function
to guarantee the alert prints.
:param alert_msg: the message to display to the user
:param new_prompt: if you also want to change the prompt that is displayed, then include it here
see async_update_prompt() docstring for guidance on updating a prompt
- :raises RuntimeError if called while another thread holds terminal_lock
+ :raises RuntimeError if called while another thread holds _terminal_lock
"""
if not (vt100_support and self.use_rawinput):
return
@@ -3805,11 +3765,11 @@ class Cmd(cmd.Cmd):
import colorama.ansi as ansi
from colorama import Cursor
- # Sanity check that can't fail if self.terminal_lock was acquired before calling this function
- if self.terminal_lock.acquire(blocking=False):
+ # Sanity check that can't fail if self._terminal_lock was acquired before calling this function
+ if self._terminal_lock.acquire(blocking=False):
# Figure out what prompt is displaying
- current_prompt = self.continuation_prompt if self.at_continuation_prompt else self.prompt
+ current_prompt = self.continuation_prompt if self._at_continuation_prompt else self.prompt
# Only update terminal if there are changes
update_terminal = False
@@ -3823,7 +3783,7 @@ class Cmd(cmd.Cmd):
self.prompt = new_prompt
# If we aren't at a continuation prompt, then it's OK to update it
- if not self.at_continuation_prompt:
+ if not self._at_continuation_prompt:
rl_set_prompt(self.prompt)
update_terminal = True
@@ -3881,10 +3841,10 @@ class Cmd(cmd.Cmd):
# Redraw the prompt and input lines
rl_force_redisplay()
- self.terminal_lock.release()
+ self._terminal_lock.release()
else:
- raise RuntimeError("another thread holds terminal_lock")
+ raise RuntimeError("another thread holds _terminal_lock")
def async_update_prompt(self, new_prompt: str) -> None: # pragma: no cover
"""
@@ -3894,7 +3854,7 @@ class Cmd(cmd.Cmd):
it is best to keep the prompt the same width as what's on screen. Otherwise the user's input text will
be shifted and the update will not be seamless.
- IMPORTANT: This function will not update the prompt unless it can acquire self.terminal_lock to ensure
+ IMPORTANT: This function will not update the prompt unless it can acquire self._terminal_lock to ensure
a prompt is onscreen. Therefore it is best to acquire the lock before calling this function
to guarantee the prompt changes.
@@ -3903,7 +3863,7 @@ class Cmd(cmd.Cmd):
and display immediately after the multiline line command completes.
:param new_prompt: what to change the prompt to
- :raises RuntimeError if called while another thread holds terminal_lock
+ :raises RuntimeError if called while another thread holds _terminal_lock
"""
self.async_alert('', new_prompt)
@@ -3911,18 +3871,18 @@ class Cmd(cmd.Cmd):
"""
Set the terminal window title
- IMPORTANT: This function will not set the title unless it can acquire self.terminal_lock to avoid
+ IMPORTANT: This function will not set the title unless it can acquire self._terminal_lock to avoid
writing to stderr while a command is running. Therefore it is best to acquire the lock
before calling this function to guarantee the title changes.
:param title: the new window title
- :raises RuntimeError if called while another thread holds terminal_lock
+ :raises RuntimeError if called while another thread holds _terminal_lock
"""
if not vt100_support:
return
- # Sanity check that can't fail if self.terminal_lock was acquired before calling this function
- if self.terminal_lock.acquire(blocking=False):
+ # Sanity check that can't fail if self._terminal_lock was acquired before calling this function
+ if self._terminal_lock.acquire(blocking=False):
try:
import colorama.ansi as ansi
sys.stderr.write(ansi.set_title(title))
@@ -3930,10 +3890,10 @@ class Cmd(cmd.Cmd):
# Debugging in Pycharm has issues with setting terminal title
pass
finally:
- self.terminal_lock.release()
+ self._terminal_lock.release()
else:
- raise RuntimeError("another thread holds terminal_lock")
+ raise RuntimeError("another thread holds _terminal_lock")
def enable_command(self, command: str) -> None:
"""
@@ -4027,7 +3987,7 @@ class Cmd(cmd.Cmd):
:param message_to_print: the message reporting that the command is disabled
:param kwargs: not used
"""
- self.decolorized_write(sys.stderr, "{}\n".format(message_to_print))
+ self._decolorized_write(sys.stderr, "{}\n".format(message_to_print))
def cmdloop(self, intro: Optional[str] = None) -> int:
"""This is an outer wrapper around _cmdloop() which deals with extra features provided by cmd2.
@@ -4051,7 +4011,7 @@ class Cmd(cmd.Cmd):
signal.signal(signal.SIGINT, self.sigint_handler)
# Grab terminal lock before the prompt has been drawn by readline
- self.terminal_lock.acquire()
+ self._terminal_lock.acquire()
# Always run the preloop first
for func in self._preloop_hooks:
@@ -4080,7 +4040,7 @@ class Cmd(cmd.Cmd):
# Release terminal lock now that postloop code should have stopped any terminal updater threads
# This will also zero the lock count in case cmdloop() is called again
- self.terminal_lock.release()
+ self._terminal_lock.release()
# Restore the original signal handler
signal.signal(signal.SIGINT, original_sigint_handler)
diff --git a/cmd2/constants.py b/cmd2/constants.py
index dede0381..06d6c6c4 100644
--- a/cmd2/constants.py
+++ b/cmd2/constants.py
@@ -24,3 +24,5 @@ LINE_FEED = '\n'
COLORS_NEVER = 'Never'
COLORS_TERMINAL = 'Terminal'
COLORS_ALWAYS = 'Always'
+
+DEFAULT_SHORTCUTS = {'?': 'help', '!': 'shell', '@': 'run_script', '@@': '_relative_run_script'}
diff --git a/cmd2/parsing.py b/cmd2/parsing.py
index 8febd270..a89f6e5e 100644
--- a/cmd2/parsing.py
+++ b/cmd2/parsing.py
@@ -324,7 +324,7 @@ class StatementParser:
This string is suitable for inclusion in an error message of your
choice:
- valid, errmsg = statement_parser.is_valid_command('>')
+ valid, errmsg = _statement_parser.is_valid_command('>')
if not valid:
errmsg = "Alias {}".format(errmsg)
"""
@@ -494,7 +494,7 @@ class StatementParser:
output = constants.REDIRECTION_APPEND
output_index = append_index
- # Check if we are redirecting to a file
+ # Check if we are _redirecting to a file
if len(tokens) > output_index + 1:
unquoted_path = utils.strip_quotes(tokens[output_index + 1])
if unquoted_path:
diff --git a/cmd2/pyscript_bridge.py b/cmd2/pyscript_bridge.py
index 01d283ea..0638f1fb 100644
--- a/cmd2/pyscript_bridge.py
+++ b/cmd2/pyscript_bridge.py
@@ -97,7 +97,7 @@ class PyscriptBridge(object):
with redirect_stderr(copy_stderr):
stop = self._cmd2_app.onecmd_plus_hooks(command, pyscript_bridge_call=True)
finally:
- with self._cmd2_app.sigint_protection:
+ with self._cmd2_app._sigint_protection:
self._cmd2_app.stdout = copy_cmd_stdout.inner_stream
self.stop = stop or self.stop
diff --git a/cmd2/utils.py b/cmd2/utils.py
index 3500ba7a..43eb3d9d 100644
--- a/cmd2/utils.py
+++ b/cmd2/utils.py
@@ -348,6 +348,35 @@ def files_from_glob_patterns(patterns: List[str], access=os.F_OK) -> List[str]:
return files
+def get_exes_in_path(starts_with: str) -> List[str]:
+ """Returns names of executables in a user's path
+
+ :param starts_with: what the exes should start with. leave blank for all exes in path.
+ :return: a list of matching exe names
+ """
+ # Purposely don't match any executable containing wildcards
+ wildcards = ['*', '?']
+ for wildcard in wildcards:
+ if wildcard in starts_with:
+ return []
+
+ # Get a list of every directory in the PATH environment variable and ignore symbolic links
+ paths = [p for p in os.getenv('PATH').split(os.path.pathsep) if not os.path.islink(p)]
+
+ # Use a set to store exe names since there can be duplicates
+ exes_set = set()
+
+ # Find every executable file in the user's path that matches the pattern
+ for path in paths:
+ full_path = os.path.join(path, starts_with)
+ matches = files_from_glob_pattern(full_path + '*', access=os.X_OK)
+
+ for match in matches:
+ exes_set.add(os.path.basename(match))
+
+ return list(exes_set)
+
+
class StdSim(object):
"""
Class to simulate behavior of sys.stdout or sys.stderr.
@@ -586,7 +615,7 @@ class RedirectionSavedState(object):
self.saved_sys_stdout = sys_stdout
self.saved_pipe_proc_reader = pipe_proc_reader
- # Tells if the command is redirecting
+ # Tells if the command is _redirecting
self.redirecting = False
# If the command created a process to pipe to, then then is its reader