diff options
author | Todd Leonhardt <todd.leonhardt@gmail.com> | 2019-06-15 15:00:59 -0400 |
---|---|---|
committer | Todd Leonhardt <todd.leonhardt@gmail.com> | 2019-06-15 15:00:59 -0400 |
commit | 70bf9e1a12b89bb913c11fb07893ab4b9cab2576 (patch) | |
tree | 27bb62898d635bcaa8e6e8182d52f5105210b3f6 /cmd2 | |
parent | c12ba0ff11b3a8fd083c641cb9149aff6494bbf9 (diff) | |
download | cmd2-git-70bf9e1a12b89bb913c11fb07893ab4b9cab2576.tar.gz |
Began work to minimize public API
Diffstat (limited to 'cmd2')
-rw-r--r-- | cmd2/cmd2.py | 310 | ||||
-rw-r--r-- | cmd2/constants.py | 2 | ||||
-rw-r--r-- | cmd2/parsing.py | 4 | ||||
-rw-r--r-- | cmd2/pyscript_bridge.py | 2 | ||||
-rw-r--r-- | cmd2/utils.py | 31 |
5 files changed, 170 insertions, 179 deletions
diff --git a/cmd2/cmd2.py b/cmd2/cmd2.py index 46b098c5..062e6582 100644 --- a/cmd2/cmd2.py +++ b/cmd2/cmd2.py @@ -171,7 +171,7 @@ def with_argument_list(*args: List[Callable], preserve_quotes: bool = False) -> def arg_decorator(func: Callable): @functools.wraps(func) def cmd_wrapper(cmd2_instance, statement: Union[Statement, str]): - _, parsed_arglist = cmd2_instance.statement_parser.get_command_arg_list(command_name, + _, parsed_arglist = cmd2_instance._statement_parser.get_command_arg_list(command_name, statement, preserve_quotes) @@ -210,7 +210,7 @@ def with_argparser_and_unknown_args(argparser: argparse.ArgumentParser, *, def arg_decorator(func: Callable): @functools.wraps(func) def cmd_wrapper(cmd2_instance, statement: Union[Statement, str]): - statement, parsed_arglist = cmd2_instance.statement_parser.get_command_arg_list(command_name, + statement, parsed_arglist = cmd2_instance._statement_parser.get_command_arg_list(command_name, statement, preserve_quotes) @@ -268,7 +268,7 @@ def with_argparser(argparser: argparse.ArgumentParser, *, def arg_decorator(func: Callable): @functools.wraps(func) def cmd_wrapper(cmd2_instance, statement: Union[Statement, str]): - statement, parsed_arglist = cmd2_instance.statement_parser.get_command_arg_list(command_name, + statement, parsed_arglist = cmd2_instance._statement_parser.get_command_arg_list(command_name, statement, preserve_quotes) @@ -327,7 +327,6 @@ class Cmd(cmd.Cmd): Line-oriented command interpreters are often useful for test harnesses, internal tools, and rapid prototypes. """ - DEFAULT_SHORTCUTS = {'?': 'help', '!': 'shell', '@': 'run_script', '@@': '_relative_run_script'} DEFAULT_EDITOR = utils.find_editor() def __init__(self, completekey: str = 'tab', stdin=None, stdout=None, *, @@ -402,51 +401,46 @@ class Cmd(cmd.Cmd): # Commands to exclude from the history command # initialize history - self.persistent_history_length = persistent_history_length + self._persistent_history_length = persistent_history_length self._initialize_history(persistent_history_file) self.exclude_from_history = '''history edit eof'''.split() # Command aliases and macros self.macros = dict() - self.initial_stdout = sys.stdout - self.pystate = {} - self.py_history = [] + self._pystate = {} + self._py_history = [] self.pyscript_name = 'app' if shortcuts is None: - shortcuts = self.DEFAULT_SHORTCUTS + shortcuts = constants.DEFAULT_SHORTCUTS shortcuts = sorted(shortcuts.items(), reverse=True) - self.statement_parser = StatementParser(allow_redirection=allow_redirection, - terminators=terminators, - multiline_commands=multiline_commands, - shortcuts=shortcuts) + self._statement_parser = StatementParser(allow_redirection=allow_redirection, + terminators=terminators, + multiline_commands=multiline_commands, + shortcuts=shortcuts) # True if running inside a Python script or interactive console, False otherwise self._in_py = False - # Stores results from the last command run to enable usage of results in a Python script or interactive console - # Built-in commands don't make use of this. It is purely there for user-defined commands and convenience. - self._last_result = None - # Used by run_script command to store current script dir as a LIFO queue to support _relative_run_script command self._script_dir = [] # Context manager used to protect critical sections in the main thread from stopping due to a KeyboardInterrupt - self.sigint_protection = utils.ContextFlag() + self._sigint_protection = utils.ContextFlag() # If the current command created a process to pipe to, then this will be a ProcReader object. # Otherwise it will be None. Its used to know when a pipe process can be killed and/or waited upon. - self.cur_pipe_proc_reader = None + self._cur_pipe_proc_reader = None # Used by complete() for readline tab completion - self.completion_matches = [] + self._completion_matches = [] - # Used to keep track of whether we are redirecting or piping output - self.redirecting = False + # Used to keep track of whether we are _redirecting or piping output + self._redirecting = False # Used to keep track of whether a continuation prompt is being displayed - self.at_continuation_prompt = False + self._at_continuation_prompt = False # The error that prints when no help information can be found self.help_error = "No help on {}" @@ -535,7 +529,7 @@ class Cmd(cmd.Cmd): self.pager_chop = 'less -SRXF' # This boolean flag determines whether or not the cmd2 application can interact with the clipboard - self.can_clip = can_clip + self._can_clip = can_clip # This determines the value returned by cmdloop() when exiting the application self.exit_code = 0 @@ -543,7 +537,7 @@ class Cmd(cmd.Cmd): # This lock should be acquired before doing any asynchronous changes to the terminal to # ensure the updates to the terminal don't interfere with the input being typed or output # being printed by a command. - self.terminal_lock = threading.RLock() + self._terminal_lock = threading.RLock() # Commands that have been disabled from use. This is to support commands that are only available # during specific states of the application. This dictionary's keys are the command names and its @@ -566,24 +560,19 @@ class Cmd(cmd.Cmd): @property def aliases(self) -> Dict[str, str]: """Read-only property to access the aliases stored in the StatementParser.""" - return self.statement_parser.aliases - - @property - def shortcuts(self) -> Tuple[Tuple[str, str]]: - """Read-only property to access the shortcuts stored in the StatementParser.""" - return self.statement_parser.shortcuts + return self._statement_parser.aliases @property def allow_redirection(self) -> bool: """Getter for the allow_redirection property that determines whether or not redirection of stdout is allowed.""" - return self.statement_parser.allow_redirection + return self._statement_parser.allow_redirection @allow_redirection.setter def allow_redirection(self, value: bool) -> None: """Setter for the allow_redirection property that determines whether or not redirection of stdout is allowed.""" - self.statement_parser.allow_redirection = value + self._statement_parser.allow_redirection = value - def decolorized_write(self, fileobj: IO, msg: str) -> None: + def _decolorized_write(self, fileobj: IO, msg: str) -> None: """Write a string to a fileobject, stripping ANSI escape sequences if necessary Honor the current colors setting, which requires us to check whether the @@ -612,7 +601,7 @@ class Cmd(cmd.Cmd): msg_str += end if color: msg_str = color + msg_str + Fore.RESET - self.decolorized_write(self.stdout, msg_str) + self._decolorized_write(self.stdout, msg_str) except BrokenPipeError: # This occurs if a command's output is being piped to another # process and that process closes before the command is @@ -640,12 +629,12 @@ class Cmd(cmd.Cmd): else: err_msg = "{}\n".format(err) err_msg = err_color + err_msg + Fore.RESET - self.decolorized_write(sys.stderr, err_msg) + self._decolorized_write(sys.stderr, err_msg) if traceback_war and not self.debug: war = "To enable full traceback, run the following command: 'set debug true'\n" war = war_color + war + Fore.RESET - self.decolorized_write(sys.stderr, war) + self._decolorized_write(sys.stderr, war) def pfeedback(self, msg: str) -> None: """For printing nonessential feedback. Can be silenced with `quiet`. @@ -654,7 +643,7 @@ class Cmd(cmd.Cmd): if self.feedback_to_output: self.poutput(msg) else: - self.decolorized_write(sys.stderr, "{}\n".format(msg)) + self._decolorized_write(sys.stderr, "{}\n".format(msg)) def ppaged(self, msg: str, end: str = '\n', chop: bool = False) -> None: """Print output using a pager if it would go off screen and stdout isn't currently being redirected. @@ -687,9 +676,9 @@ class Cmd(cmd.Cmd): if sys.platform.startswith('win') or os.environ.get('TERM') is not None: functional_terminal = True - # Don't attempt to use a pager that can block if redirecting or running a script (either text or Python) + # Don't attempt to use a pager that can block if _redirecting or running a script (either text or Python) # Also only attempt to use a pager if actually running in a real fully functional terminal - if functional_terminal and not self.redirecting and not self._in_py and not self._script_dir: + if functional_terminal and not self._redirecting and not self._in_py and not self._script_dir: if self.colors.lower() == constants.COLORS_NEVER.lower(): msg_str = utils.strip_ansi(msg_str) @@ -699,11 +688,11 @@ class Cmd(cmd.Cmd): # Prevent KeyboardInterrupts while in the pager. The pager application will # still receive the SIGINT since it is in the same process group as us. - with self.sigint_protection: + with self._sigint_protection: pipe_proc = subprocess.Popen(pager, shell=True, stdin=subprocess.PIPE) pipe_proc.communicate(msg_str.encode('utf-8', 'replace')) else: - self.decolorized_write(self.stdout, msg_str) + self._decolorized_write(self.stdout, msg_str) except BrokenPipeError: # This occurs if a command's output is being piped to another process and that process closes before the # command is finished. If you would like your application to print a warning message, then set the @@ -713,7 +702,7 @@ class Cmd(cmd.Cmd): # ----- Methods related to tab completion ----- - def reset_completion_defaults(self) -> None: + def _reset_completion_defaults(self) -> None: """ Resets tab completion settings Needs to be called each time readline runs tab completion @@ -731,7 +720,7 @@ class Cmd(cmd.Cmd): # noinspection PyUnresolvedReferences readline.rl.mode._display_completions = self._display_matches_pyreadline - def tokens_for_completion(self, line: str, begidx: int, endidx: int) -> Tuple[List[str], List[str]]: + def _tokens_for_completion(self, line: str, begidx: int, endidx: int) -> Tuple[List[str], List[str]]: """ Used by tab completion functions to get all tokens through the one being completed :param line: the current input line with leading whitespace removed @@ -945,7 +934,7 @@ class Cmd(cmd.Cmd): :return: a list of possible tab completions """ # Get all tokens through the one being completed - tokens, _ = self.tokens_for_completion(line, begidx, endidx) + tokens, _ = self._tokens_for_completion(line, begidx, endidx) if not tokens: return [] @@ -987,7 +976,7 @@ class Cmd(cmd.Cmd): :return: a list of possible tab completions """ # Get all tokens through the one being completed - tokens, _ = self.tokens_for_completion(line, begidx, endidx) + tokens, _ = self._tokens_for_completion(line, begidx, endidx) if not tokens: return [] @@ -1157,35 +1146,6 @@ class Cmd(cmd.Cmd): return matches - @staticmethod - def get_exes_in_path(starts_with: str) -> List[str]: - """Returns names of executables in a user's path - - :param starts_with: what the exes should start with. leave blank for all exes in path. - :return: a list of matching exe names - """ - # Purposely don't match any executable containing wildcards - wildcards = ['*', '?'] - for wildcard in wildcards: - if wildcard in starts_with: - return [] - - # Get a list of every directory in the PATH environment variable and ignore symbolic links - paths = [p for p in os.getenv('PATH').split(os.path.pathsep) if not os.path.islink(p)] - - # Use a set to store exe names since there can be duplicates - exes_set = set() - - # Find every executable file in the user's path that matches the pattern - for path in paths: - full_path = os.path.join(path, starts_with) - matches = utils.files_from_glob_pattern(full_path + '*', access=os.X_OK) - - for match in matches: - exes_set.add(os.path.basename(match)) - - return list(exes_set) - def shell_cmd_complete(self, text: str, line: str, begidx: int, endidx: int, complete_blank: bool = False) -> List[str]: """Performs completion of executables either in a user's path or a given path @@ -1204,7 +1164,7 @@ class Cmd(cmd.Cmd): # If there are no path characters in the search text, then do shell command completion in the user's path if not text.startswith('~') and os.path.sep not in text: - return self.get_exes_in_path(text) + return utils.get_exes_in_path(text) # Otherwise look for executables in the given path else: @@ -1228,7 +1188,7 @@ class Cmd(cmd.Cmd): # Get all tokens through the one being completed. We want the raw tokens # so we can tell if redirection strings are quoted and ignore them. - _, raw_tokens = self.tokens_for_completion(line, begidx, endidx) + _, raw_tokens = self._tokens_for_completion(line, begidx, endidx) if not raw_tokens: return [] @@ -1382,7 +1342,7 @@ class Cmd(cmd.Cmd): import functools if state == 0 and rl_type != RlType.NONE: unclosed_quote = '' - self.reset_completion_defaults() + self._reset_completion_defaults() # lstrip the original line orig_line = readline.get_line_buffer() @@ -1399,7 +1359,7 @@ class Cmd(cmd.Cmd): # from text and update the indexes. This only applies if we are at the the beginning of the line. shortcut_to_restore = '' if begidx == 0: - for (shortcut, _) in self.shortcuts: + for (shortcut, _) in self._statement_parser.shortcuts: if text.startswith(shortcut): # Save the shortcut to restore later shortcut_to_restore = shortcut @@ -1413,7 +1373,7 @@ class Cmd(cmd.Cmd): if begidx > 0: # Parse the command line - statement = self.statement_parser.parse_command_only(line) + statement = self._statement_parser.parse_command_only(line) command = statement.command expanded_line = statement.command_and_args @@ -1433,12 +1393,12 @@ class Cmd(cmd.Cmd): line = expanded_line # Get all tokens through the one being completed - tokens, raw_tokens = self.tokens_for_completion(line, begidx, endidx) + tokens, raw_tokens = self._tokens_for_completion(line, begidx, endidx) # Check if we either had a parsing error or are trying to complete the command token # The latter can happen if " or ' was entered as the command if len(tokens) <= 1: - self.completion_matches = [] + self._completion_matches = [] return None # Text we need to remove from completions later @@ -1489,27 +1449,27 @@ class Cmd(cmd.Cmd): # A valid command was not entered else: # Check if this command should be run as a shell command - if self.default_to_shell and command in self.get_exes_in_path(command): + if self.default_to_shell and command in utils.get_exes_in_path(command): compfunc = self.path_complete else: compfunc = self.completedefault # Attempt tab completion for redirection first, and if that isn't occurring, # call the completer function for the current command - self.completion_matches = self._redirect_complete(text, line, begidx, endidx, compfunc) + self._completion_matches = self._redirect_complete(text, line, begidx, endidx, compfunc) - if self.completion_matches: + if self._completion_matches: # Eliminate duplicates - self.completion_matches = utils.remove_duplicates(self.completion_matches) + self._completion_matches = utils.remove_duplicates(self._completion_matches) self.display_matches = utils.remove_duplicates(self.display_matches) if not self.display_matches: - # Since self.display_matches is empty, set it to self.completion_matches + # Since self.display_matches is empty, set it to self._completion_matches # before we alter them. That way the suggestions will reflect how we parsed # the token being completed and not how readline did. import copy - self.display_matches = copy.copy(self.completion_matches) + self.display_matches = copy.copy(self._completion_matches) # Check if we need to add an opening quote if not unclosed_quote: @@ -1517,7 +1477,7 @@ class Cmd(cmd.Cmd): add_quote = False # This is the tab completion text that will appear on the command line. - common_prefix = os.path.commonprefix(self.completion_matches) + common_prefix = os.path.commonprefix(self._completion_matches) if self.matches_delimited: # Check if any portion of the display matches appears in the tab completion @@ -1530,36 +1490,36 @@ class Cmd(cmd.Cmd): add_quote = True # If there is a tab completion and any match has a space, then add an opening quote - elif common_prefix and any(' ' in match for match in self.completion_matches): + elif common_prefix and any(' ' in match for match in self._completion_matches): add_quote = True if add_quote: # Figure out what kind of quote to add and save it as the unclosed_quote - if any('"' in match for match in self.completion_matches): + if any('"' in match for match in self._completion_matches): unclosed_quote = "'" else: unclosed_quote = '"' - self.completion_matches = [unclosed_quote + match for match in self.completion_matches] + self._completion_matches = [unclosed_quote + match for match in self._completion_matches] # Check if we need to remove text from the beginning of tab completions elif text_to_remove: - self.completion_matches = \ - [match.replace(text_to_remove, '', 1) for match in self.completion_matches] + self._completion_matches = \ + [match.replace(text_to_remove, '', 1) for match in self._completion_matches] # Check if we need to restore a shortcut in the tab completions # so it doesn't get erased from the command line if shortcut_to_restore: - self.completion_matches = \ - [shortcut_to_restore + match for match in self.completion_matches] + self._completion_matches = \ + [shortcut_to_restore + match for match in self._completion_matches] else: # Complete token against anything a user can run - self.completion_matches = self.basic_complete(text, line, begidx, endidx, - self.get_commands_aliases_and_macros_for_completion()) + self._completion_matches = self.basic_complete(text, line, begidx, endidx, + self.get_commands_aliases_and_macros_for_completion()) # Handle single result - if len(self.completion_matches) == 1: + if len(self._completion_matches) == 1: str_to_append = '' # Add a closing quote if needed and allowed @@ -1570,16 +1530,16 @@ class Cmd(cmd.Cmd): if self.allow_appended_space and endidx == len(line): str_to_append += ' ' - self.completion_matches[0] += str_to_append + self._completion_matches[0] += str_to_append # Sort matches if they haven't already been sorted if not self.matches_sorted: - self.completion_matches.sort(key=self.matches_sort_key) + self._completion_matches.sort(key=self.matches_sort_key) self.display_matches.sort(key=self.matches_sort_key) self.matches_sorted = True try: - return self.completion_matches[state] + return self._completion_matches[state] except IndexError: return None @@ -1606,7 +1566,7 @@ class Cmd(cmd.Cmd): """Default completion function for argparse commands.""" completer = AutoCompleter(argparser, self) - tokens, _ = self.tokens_for_completion(line, begidx, endidx) + tokens, _ = self._tokens_for_completion(line, begidx, endidx) if not tokens: return [] @@ -1666,12 +1626,12 @@ class Cmd(cmd.Cmd): :param signum: signal number :param frame """ - if self.cur_pipe_proc_reader is not None: + if self._cur_pipe_proc_reader is not None: # Pass the SIGINT to the current pipe process - self.cur_pipe_proc_reader.send_sigint() + self._cur_pipe_proc_reader.send_sigint() # Check if we are allowed to re-raise the KeyboardInterrupt - if not self.sigint_protection: + if not self._sigint_protection: raise KeyboardInterrupt("Got a keyboard interrupt") def precmd(self, statement: Statement) -> Statement: @@ -1692,7 +1652,7 @@ class Cmd(cmd.Cmd): :param line: line read by readline :return: tuple containing (command, args, line) """ - statement = self.statement_parser.parse_command_only(line) + statement = self._statement_parser.parse_command_only(line) return statement.command, statement.args, statement.command_and_args def onecmd_plus_hooks(self, line: str, pyscript_bridge_call: bool = False) -> bool: @@ -1732,27 +1692,27 @@ class Cmd(cmd.Cmd): # we need to run the finalization hooks raise EmptyStatement - # Keep track of whether or not we were already redirecting before this command - already_redirecting = self.redirecting + # Keep track of whether or not we were already _redirecting before this command + already_redirecting = self._redirecting # This will be a utils.RedirectionSavedState object for the command saved_state = None try: # Get sigint protection while we set up redirection - with self.sigint_protection: + with self._sigint_protection: if pyscript_bridge_call: # Start saving command's stdout at this point self.stdout.pause_storage = False redir_error, saved_state = self._redirect_output(statement) - self.cur_pipe_proc_reader = saved_state.pipe_proc_reader + self._cur_pipe_proc_reader = saved_state.pipe_proc_reader # Do not continue if an error occurred while trying to redirect if not redir_error: - # See if we need to update self.redirecting + # See if we need to update self._redirecting if not already_redirecting: - self.redirecting = saved_state.redirecting + self._redirecting = saved_state.redirecting timestart = datetime.datetime.now() @@ -1783,12 +1743,12 @@ class Cmd(cmd.Cmd): self.pfeedback('Elapsed: {}'.format(datetime.datetime.now() - timestart)) finally: # Get sigint protection while we restore stuff - with self.sigint_protection: + with self._sigint_protection: if saved_state is not None: self._restore_output(statement, saved_state) if not already_redirecting: - self.redirecting = False + self._redirecting = False if pyscript_bridge_call: # Stop saving command's stdout before command finalization hooks run @@ -1805,7 +1765,7 @@ class Cmd(cmd.Cmd): def _run_cmdfinalization_hooks(self, stop: bool, statement: Optional[Statement]) -> bool: """Run the command finalization hooks""" - with self.sigint_protection: + with self._sigint_protection: if not sys.platform.startswith('win') and self.stdout.isatty(): # Before the next command runs, fix any terminal problems like those # caused by certain binary characters having been printed to it. @@ -1856,7 +1816,7 @@ class Cmd(cmd.Cmd): """ while True: try: - statement = self.statement_parser.parse(line) + statement = self._statement_parser.parse(line) if statement.multiline_command and statement.terminator: # we have a completed multiline command, we are done break @@ -1867,7 +1827,7 @@ class Cmd(cmd.Cmd): except ValueError: # we have unclosed quotation marks, lets parse only the command # and see if it's a multiline - statement = self.statement_parser.parse_command_only(line) + statement = self._statement_parser.parse_command_only(line) if not statement.multiline_command: # not a multiline command, so raise the exception raise @@ -1876,7 +1836,7 @@ class Cmd(cmd.Cmd): # - a multiline command with no terminator # - a multiline command with unclosed quotation marks try: - self.at_continuation_prompt = True + self._at_continuation_prompt = True newline = self.pseudo_raw_input(self.continuation_prompt) if newline == 'eof': # they entered either a blank line, or we hit an EOF @@ -1891,10 +1851,10 @@ class Cmd(cmd.Cmd): raise ex else: self.poutput('^C') - statement = self.statement_parser.parse('') + statement = self._statement_parser.parse('') break finally: - self.at_continuation_prompt = False + self._at_continuation_prompt = False if not statement.command: raise EmptyStatement() @@ -2001,7 +1961,7 @@ class Cmd(cmd.Cmd): redir_error = False # Initialize the saved state - saved_state = utils.RedirectionSavedState(self.stdout, sys.stdout, self.cur_pipe_proc_reader) + saved_state = utils.RedirectionSavedState(self.stdout, sys.stdout, self._cur_pipe_proc_reader) if not self.allow_redirection: return redir_error, saved_state @@ -2055,7 +2015,7 @@ class Cmd(cmd.Cmd): elif statement.output: import tempfile - if (not statement.output_to) and (not self.can_clip): + if (not statement.output_to) and (not self._can_clip): self.perror("Cannot redirect to paste buffer; install 'pyperclip' and re-run to enable", traceback_war=False) redir_error = True @@ -2109,11 +2069,11 @@ class Cmd(cmd.Cmd): sys.stdout = saved_state.saved_sys_stdout # Check if we need to wait for the process being piped to - if self.cur_pipe_proc_reader is not None: - self.cur_pipe_proc_reader.wait() + if self._cur_pipe_proc_reader is not None: + self._cur_pipe_proc_reader.wait() - # Restore cur_pipe_proc_reader. This always is done, regardless of whether this command redirected. - self.cur_pipe_proc_reader = saved_state.saved_pipe_proc_reader + # Restore _cur_pipe_proc_reader. This always is done, regardless of whether this command redirected. + self._cur_pipe_proc_reader = saved_state.saved_pipe_proc_reader def cmd_func(self, command: str) -> Optional[Callable]: """ @@ -2175,7 +2135,7 @@ class Cmd(cmd.Cmd): return self.do_shell(statement.command_and_args) else: err_msg = self.default_error.format(statement.command) - self.decolorized_write(sys.stderr, "{}\n".format(err_msg)) + self._decolorized_write(sys.stderr, "{}\n".format(err_msg)) def pseudo_raw_input(self, prompt: str) -> str: """Began life as a copy of cmd's cmdloop; like raw_input but @@ -2187,10 +2147,10 @@ class Cmd(cmd.Cmd): if self.use_rawinput: try: if sys.stdin.isatty(): - # Wrap in try since terminal_lock may not be locked when this function is called from unit tests + # Wrap in try since _terminal_lock may not be locked when this function is called from unit tests try: # A prompt is about to be drawn. Allow asynchronous changes to the terminal. - self.terminal_lock.release() + self._terminal_lock.release() except RuntimeError: pass @@ -2206,7 +2166,7 @@ class Cmd(cmd.Cmd): finally: if sys.stdin.isatty(): # The prompt is gone. Do not allow asynchronous changes to the terminal. - self.terminal_lock.acquire() + self._terminal_lock.acquire() else: if self.stdin.isatty(): # on a tty, print the prompt first, then read the line @@ -2302,7 +2262,7 @@ class Cmd(cmd.Cmd): """Create or overwrite an alias""" # Validate the alias name - valid, errmsg = self.statement_parser.is_valid_command(args.name) + valid, errmsg = self._statement_parser.is_valid_command(args.name) if not valid: self.perror("Invalid alias name: {}".format(errmsg), traceback_war=False) return @@ -2313,7 +2273,7 @@ class Cmd(cmd.Cmd): # Unquote redirection and terminator tokens tokens_to_unquote = constants.REDIRECTION_TOKENS - tokens_to_unquote.extend(self.statement_parser.terminators) + tokens_to_unquote.extend(self._statement_parser.terminators) utils.unquote_specific_tokens(args.command_args, tokens_to_unquote) # Build the alias value string @@ -2433,7 +2393,7 @@ class Cmd(cmd.Cmd): """Create or overwrite a macro""" # Validate the macro name - valid, errmsg = self.statement_parser.is_valid_command(args.name) + valid, errmsg = self._statement_parser.is_valid_command(args.name) if not valid: self.perror("Invalid macro name: {}".format(errmsg), traceback_war=False) return @@ -2448,7 +2408,7 @@ class Cmd(cmd.Cmd): # Unquote redirection and terminator tokens tokens_to_unquote = constants.REDIRECTION_TOKENS - tokens_to_unquote.extend(self.statement_parser.terminators) + tokens_to_unquote.extend(self._statement_parser.terminators) utils.unquote_specific_tokens(args.command_args, tokens_to_unquote) # Build the macro value string @@ -2643,7 +2603,7 @@ class Cmd(cmd.Cmd): """Completes the subcommand argument of help""" # Get all tokens through the one being completed - tokens, _ = self.tokens_for_completion(line, begidx, endidx) + tokens, _ = self._tokens_for_completion(line, begidx, endidx) if not tokens: return [] @@ -2707,7 +2667,7 @@ class Cmd(cmd.Cmd): # If there is no help information then print an error elif help_func is None and (func is None or not func.__doc__): err_msg = self.help_error.format(args.command) - self.decolorized_write(sys.stderr, "{}\n".format(err_msg)) + self._decolorized_write(sys.stderr, "{}\n".format(err_msg)) # Otherwise delegate to cmd base class do_help() else: @@ -2841,7 +2801,7 @@ class Cmd(cmd.Cmd): @with_argparser(ACArgumentParser()) def do_shortcuts(self, _: argparse.Namespace) -> None: """List available shortcuts""" - result = "\n".join('%s: %s' % (sc[0], sc[1]) for sc in sorted(self.shortcuts)) + result = "\n".join('%s: %s' % (sc[0], sc[1]) for sc in sorted(self._statement_parser.shortcuts)) self.poutput("Shortcuts for other commands:\n{}\n".format(result)) @with_argparser(ACArgumentParser(epilog=INTERNAL_COMMAND_EPILOG)) @@ -2910,7 +2870,7 @@ class Cmd(cmd.Cmd): read_only_settings = """ Commands may be terminated with: {} Output redirection and pipes allowed: {}""" - return read_only_settings.format(str(self.statement_parser.terminators), self.allow_redirection) + return read_only_settings.format(str(self._statement_parser.terminators), self.allow_redirection) def show(self, args: argparse.Namespace, parameter: str = '') -> None: """Shows current settings of parameters. @@ -3010,7 +2970,7 @@ class Cmd(cmd.Cmd): # Prevent KeyboardInterrupts while in the shell process. The shell process will # still receive the SIGINT since it is in the same process group as us. - with self.sigint_protection: + with self._sigint_protection: # For any stream that is a StdSim, we will use a pipe so we can capture its output proc = subprocess.Popen(expanded_command, stdout=subprocess.PIPE if isinstance(self.stdout, utils.StdSim) else self.stdout, @@ -3094,17 +3054,17 @@ class Cmd(cmd.Cmd): raise EmbeddedConsoleExit # Set up Python environment - self.pystate[self.pyscript_name] = bridge - self.pystate['run'] = py_run - self.pystate['quit'] = py_quit - self.pystate['exit'] = py_quit + self._pystate[self.pyscript_name] = bridge + self._pystate['run'] = py_run + self._pystate['quit'] = py_quit + self._pystate['exit'] = py_quit if self.locals_in_py: - self.pystate['self'] = self - elif 'self' in self.pystate: - del self.pystate['self'] + self._pystate['self'] = self + elif 'self' in self._pystate: + del self._pystate['self'] - localvars = self.pystate + localvars = self._pystate from code import InteractiveConsole interp = InteractiveConsole(locals=localvars) interp.runcode('import sys, os;sys.path.insert(0, os.getcwd())') @@ -3139,7 +3099,7 @@ class Cmd(cmd.Cmd): readline.clear_history() # Restore py's history - for item in self.py_history: + for item in self._py_history: readline.add_history(item) if self.use_rawinput and self.completekey: @@ -3206,10 +3166,10 @@ class Cmd(cmd.Cmd): # Set up readline for cmd2 if rl_type != RlType.NONE: # Save py's history - self.py_history.clear() + self._py_history.clear() for i in range(1, readline.get_current_history_length() + 1): # noinspection PyArgumentList - self.py_history.append(readline.get_history_item(i)) + self._py_history.append(readline.get_history_item(i)) readline.clear_history() @@ -3508,7 +3468,7 @@ class Cmd(cmd.Cmd): if not self.persistent_history_file: return - self.history.truncate(self.persistent_history_length) + self.history.truncate(self._persistent_history_length) try: with open(self.persistent_history_file, 'wb') as fobj: pickle.dump(self.history, fobj) @@ -3530,7 +3490,7 @@ class Cmd(cmd.Cmd): commands_run = 0 try: - with self.sigint_protection: + with self._sigint_protection: # Disable echo while we manually redirect stdout to a StringIO buffer saved_echo = self.echo saved_stdout = self.stdout @@ -3575,7 +3535,7 @@ class Cmd(cmd.Cmd): if stop: break finally: - with self.sigint_protection: + with self._sigint_protection: # Restore altered attributes to their original state self.echo = saved_echo self.stdout = saved_stdout @@ -3695,7 +3655,7 @@ class Cmd(cmd.Cmd): return self.runcmds_plus_hooks(script_commands) finally: - with self.sigint_protection: + with self._sigint_protection: # Check if a script dir was added before an exception occurred if orig_script_dir_count != len(self._script_dir): self._script_dir.pop() @@ -3768,7 +3728,7 @@ class Cmd(cmd.Cmd): runner = unittest.TextTestRunner(stream=stream) test_results = runner.run(testcase) if test_results.wasSuccessful(): - self.decolorized_write(sys.stderr, stream.read()) + self._decolorized_write(sys.stderr, stream.read()) self.poutput('Tests passed', color=Fore.LIGHTGREEN_EX) else: # Strip off the initial traceback which isn't particularly useful for end users @@ -3789,14 +3749,14 @@ class Cmd(cmd.Cmd): To the user it appears as if an alert message is printed above the prompt and their current input text and cursor location is left alone. - IMPORTANT: This function will not print an alert unless it can acquire self.terminal_lock to ensure + IMPORTANT: This function will not print an alert unless it can acquire self._terminal_lock to ensure a prompt is onscreen. Therefore it is best to acquire the lock before calling this function to guarantee the alert prints. :param alert_msg: the message to display to the user :param new_prompt: if you also want to change the prompt that is displayed, then include it here see async_update_prompt() docstring for guidance on updating a prompt - :raises RuntimeError if called while another thread holds terminal_lock + :raises RuntimeError if called while another thread holds _terminal_lock """ if not (vt100_support and self.use_rawinput): return @@ -3805,11 +3765,11 @@ class Cmd(cmd.Cmd): import colorama.ansi as ansi from colorama import Cursor - # Sanity check that can't fail if self.terminal_lock was acquired before calling this function - if self.terminal_lock.acquire(blocking=False): + # Sanity check that can't fail if self._terminal_lock was acquired before calling this function + if self._terminal_lock.acquire(blocking=False): # Figure out what prompt is displaying - current_prompt = self.continuation_prompt if self.at_continuation_prompt else self.prompt + current_prompt = self.continuation_prompt if self._at_continuation_prompt else self.prompt # Only update terminal if there are changes update_terminal = False @@ -3823,7 +3783,7 @@ class Cmd(cmd.Cmd): self.prompt = new_prompt # If we aren't at a continuation prompt, then it's OK to update it - if not self.at_continuation_prompt: + if not self._at_continuation_prompt: rl_set_prompt(self.prompt) update_terminal = True @@ -3881,10 +3841,10 @@ class Cmd(cmd.Cmd): # Redraw the prompt and input lines rl_force_redisplay() - self.terminal_lock.release() + self._terminal_lock.release() else: - raise RuntimeError("another thread holds terminal_lock") + raise RuntimeError("another thread holds _terminal_lock") def async_update_prompt(self, new_prompt: str) -> None: # pragma: no cover """ @@ -3894,7 +3854,7 @@ class Cmd(cmd.Cmd): it is best to keep the prompt the same width as what's on screen. Otherwise the user's input text will be shifted and the update will not be seamless. - IMPORTANT: This function will not update the prompt unless it can acquire self.terminal_lock to ensure + IMPORTANT: This function will not update the prompt unless it can acquire self._terminal_lock to ensure a prompt is onscreen. Therefore it is best to acquire the lock before calling this function to guarantee the prompt changes. @@ -3903,7 +3863,7 @@ class Cmd(cmd.Cmd): and display immediately after the multiline line command completes. :param new_prompt: what to change the prompt to - :raises RuntimeError if called while another thread holds terminal_lock + :raises RuntimeError if called while another thread holds _terminal_lock """ self.async_alert('', new_prompt) @@ -3911,18 +3871,18 @@ class Cmd(cmd.Cmd): """ Set the terminal window title - IMPORTANT: This function will not set the title unless it can acquire self.terminal_lock to avoid + IMPORTANT: This function will not set the title unless it can acquire self._terminal_lock to avoid writing to stderr while a command is running. Therefore it is best to acquire the lock before calling this function to guarantee the title changes. :param title: the new window title - :raises RuntimeError if called while another thread holds terminal_lock + :raises RuntimeError if called while another thread holds _terminal_lock """ if not vt100_support: return - # Sanity check that can't fail if self.terminal_lock was acquired before calling this function - if self.terminal_lock.acquire(blocking=False): + # Sanity check that can't fail if self._terminal_lock was acquired before calling this function + if self._terminal_lock.acquire(blocking=False): try: import colorama.ansi as ansi sys.stderr.write(ansi.set_title(title)) @@ -3930,10 +3890,10 @@ class Cmd(cmd.Cmd): # Debugging in Pycharm has issues with setting terminal title pass finally: - self.terminal_lock.release() + self._terminal_lock.release() else: - raise RuntimeError("another thread holds terminal_lock") + raise RuntimeError("another thread holds _terminal_lock") def enable_command(self, command: str) -> None: """ @@ -4027,7 +3987,7 @@ class Cmd(cmd.Cmd): :param message_to_print: the message reporting that the command is disabled :param kwargs: not used """ - self.decolorized_write(sys.stderr, "{}\n".format(message_to_print)) + self._decolorized_write(sys.stderr, "{}\n".format(message_to_print)) def cmdloop(self, intro: Optional[str] = None) -> int: """This is an outer wrapper around _cmdloop() which deals with extra features provided by cmd2. @@ -4051,7 +4011,7 @@ class Cmd(cmd.Cmd): signal.signal(signal.SIGINT, self.sigint_handler) # Grab terminal lock before the prompt has been drawn by readline - self.terminal_lock.acquire() + self._terminal_lock.acquire() # Always run the preloop first for func in self._preloop_hooks: @@ -4080,7 +4040,7 @@ class Cmd(cmd.Cmd): # Release terminal lock now that postloop code should have stopped any terminal updater threads # This will also zero the lock count in case cmdloop() is called again - self.terminal_lock.release() + self._terminal_lock.release() # Restore the original signal handler signal.signal(signal.SIGINT, original_sigint_handler) diff --git a/cmd2/constants.py b/cmd2/constants.py index dede0381..06d6c6c4 100644 --- a/cmd2/constants.py +++ b/cmd2/constants.py @@ -24,3 +24,5 @@ LINE_FEED = '\n' COLORS_NEVER = 'Never' COLORS_TERMINAL = 'Terminal' COLORS_ALWAYS = 'Always' + +DEFAULT_SHORTCUTS = {'?': 'help', '!': 'shell', '@': 'run_script', '@@': '_relative_run_script'} diff --git a/cmd2/parsing.py b/cmd2/parsing.py index 8febd270..a89f6e5e 100644 --- a/cmd2/parsing.py +++ b/cmd2/parsing.py @@ -324,7 +324,7 @@ class StatementParser: This string is suitable for inclusion in an error message of your choice: - valid, errmsg = statement_parser.is_valid_command('>') + valid, errmsg = _statement_parser.is_valid_command('>') if not valid: errmsg = "Alias {}".format(errmsg) """ @@ -494,7 +494,7 @@ class StatementParser: output = constants.REDIRECTION_APPEND output_index = append_index - # Check if we are redirecting to a file + # Check if we are _redirecting to a file if len(tokens) > output_index + 1: unquoted_path = utils.strip_quotes(tokens[output_index + 1]) if unquoted_path: diff --git a/cmd2/pyscript_bridge.py b/cmd2/pyscript_bridge.py index 01d283ea..0638f1fb 100644 --- a/cmd2/pyscript_bridge.py +++ b/cmd2/pyscript_bridge.py @@ -97,7 +97,7 @@ class PyscriptBridge(object): with redirect_stderr(copy_stderr): stop = self._cmd2_app.onecmd_plus_hooks(command, pyscript_bridge_call=True) finally: - with self._cmd2_app.sigint_protection: + with self._cmd2_app._sigint_protection: self._cmd2_app.stdout = copy_cmd_stdout.inner_stream self.stop = stop or self.stop diff --git a/cmd2/utils.py b/cmd2/utils.py index 3500ba7a..43eb3d9d 100644 --- a/cmd2/utils.py +++ b/cmd2/utils.py @@ -348,6 +348,35 @@ def files_from_glob_patterns(patterns: List[str], access=os.F_OK) -> List[str]: return files +def get_exes_in_path(starts_with: str) -> List[str]: + """Returns names of executables in a user's path + + :param starts_with: what the exes should start with. leave blank for all exes in path. + :return: a list of matching exe names + """ + # Purposely don't match any executable containing wildcards + wildcards = ['*', '?'] + for wildcard in wildcards: + if wildcard in starts_with: + return [] + + # Get a list of every directory in the PATH environment variable and ignore symbolic links + paths = [p for p in os.getenv('PATH').split(os.path.pathsep) if not os.path.islink(p)] + + # Use a set to store exe names since there can be duplicates + exes_set = set() + + # Find every executable file in the user's path that matches the pattern + for path in paths: + full_path = os.path.join(path, starts_with) + matches = files_from_glob_pattern(full_path + '*', access=os.X_OK) + + for match in matches: + exes_set.add(os.path.basename(match)) + + return list(exes_set) + + class StdSim(object): """ Class to simulate behavior of sys.stdout or sys.stderr. @@ -586,7 +615,7 @@ class RedirectionSavedState(object): self.saved_sys_stdout = sys_stdout self.saved_pipe_proc_reader = pipe_proc_reader - # Tells if the command is redirecting + # Tells if the command is _redirecting self.redirecting = False # If the command created a process to pipe to, then then is its reader |