diff options
Diffstat (limited to 'cmd2/cmd2.py')
-rw-r--r-- | cmd2/cmd2.py | 776 |
1 files changed, 451 insertions, 325 deletions
diff --git a/cmd2/cmd2.py b/cmd2/cmd2.py index e1d15131..88fdcd87 100644 --- a/cmd2/cmd2.py +++ b/cmd2/cmd2.py @@ -127,7 +127,10 @@ from .utils import ( if rl_type == RlType.NONE: # pragma: no cover sys.stderr.write(ansi.style_warning(rl_warning)) else: - from .rl_utils import rl_force_redisplay, readline + from .rl_utils import ( + readline, + rl_force_redisplay, + ) # Used by rlcompleter in Python console loaded by py command orig_rl_delims = readline.get_completer_delims() @@ -142,7 +145,10 @@ else: # Get the readline lib so we can make changes to it import ctypes - from .rl_utils import readline_lib + + from .rl_utils import ( + readline_lib, + ) rl_basic_quote_characters = ctypes.c_char_p.in_dll(readline_lib, "rl_basic_quote_characters") orig_rl_basic_quotes = ctypes.cast(rl_basic_quote_characters, ctypes.c_void_p).value @@ -151,7 +157,9 @@ else: ipython_available = True try: # noinspection PyUnresolvedReferences,PyPackageRequirements - from IPython import embed + from IPython import ( + embed, + ) except ImportError: # pragma: no cover ipython_available = False @@ -188,24 +196,37 @@ class Cmd(cmd.Cmd): Line-oriented command interpreters are often useful for test harnesses, internal tools, and rapid prototypes. """ + DEFAULT_EDITOR = utils.find_editor() - INTERNAL_COMMAND_EPILOG = ("Notes:\n" - " This command is for internal use and is not intended to be called from the\n" - " command line.") + INTERNAL_COMMAND_EPILOG = ( + "Notes:\n" " This command is for internal use and is not intended to be called from the\n" " command line." + ) # Sorting keys for strings ALPHABETICAL_SORT_KEY = utils.norm_fold NATURAL_SORT_KEY = utils.natural_keys - def __init__(self, completekey: str = 'tab', stdin=None, stdout=None, *, - persistent_history_file: str = '', persistent_history_length: int = 1000, - startup_script: str = '', silent_startup_script: bool = False, use_ipython: bool = False, - allow_cli_args: bool = True, transcript_files: Optional[List[str]] = None, - allow_redirection: bool = True, multiline_commands: Optional[List[str]] = None, - terminators: Optional[List[str]] = None, shortcuts: Optional[Dict[str, str]] = None, - command_sets: Optional[Iterable[CommandSet]] = None, - auto_load_commands: bool = True) -> None: + def __init__( + self, + completekey: str = 'tab', + stdin=None, + stdout=None, + *, + persistent_history_file: str = '', + persistent_history_length: int = 1000, + startup_script: str = '', + silent_startup_script: bool = False, + use_ipython: bool = False, + allow_cli_args: bool = True, + transcript_files: Optional[List[str]] = None, + allow_redirection: bool = True, + multiline_commands: Optional[List[str]] = None, + terminators: Optional[List[str]] = None, + shortcuts: Optional[Dict[str, str]] = None, + command_sets: Optional[Iterable[CommandSet]] = None, + auto_load_commands: bool = True + ) -> None: """An easy but powerful framework for writing line-oriented command interpreters. Extends Python's cmd package. @@ -320,9 +341,9 @@ class Cmd(cmd.Cmd): # True if running inside a Python script or interactive console, False otherwise self._in_py = False - self.statement_parser = StatementParser(terminators=terminators, - multiline_commands=multiline_commands, - shortcuts=shortcuts) + self.statement_parser = StatementParser( + terminators=terminators, multiline_commands=multiline_commands, shortcuts=shortcuts + ) # Stores results from the last command run to enable usage of results in a Python script or interactive console # Built-in commands don't make use of this. It is purely there for user-defined commands and convenience. @@ -377,8 +398,7 @@ class Cmd(cmd.Cmd): # Check for command line args if allow_cli_args: parser = argparse.ArgumentParser() - parser.add_argument('-t', '--test', action="store_true", - help='Test against transcript(s) in FILE (wildcards OK)') + parser.add_argument('-t', '--test', action="store_true", help='Test against transcript(s) in FILE (wildcards OK)') callopts, callargs = parser.parse_known_args() # If transcript testing was called for, use other arguments as transcript files @@ -501,8 +521,11 @@ class Cmd(cmd.Cmd): :param subclass_match: If True, return all sub-classes of provided type, otherwise only search for exact match :return: Matching CommandSets """ - return [cmdset for cmdset in self._installed_command_sets - if type(cmdset) == commandset_type or (subclass_match and isinstance(cmdset, commandset_type))] + return [ + cmdset + for cmdset in self._installed_command_sets + if type(cmdset) == commandset_type or (subclass_match and isinstance(cmdset, commandset_type)) + ] def find_commandset_for_command(self, command_name: str) -> Optional[CommandSet]: """ @@ -526,9 +549,11 @@ class Cmd(cmd.Cmd): load_commandset_by_type(subclasses) else: init_sig = inspect.signature(cmdset_type.__init__) - if not (cmdset_type in existing_commandset_types - or len(init_sig.parameters) != 1 - or 'self' not in init_sig.parameters): + if not ( + cmdset_type in existing_commandset_types + or len(init_sig.parameters) != 1 + or 'self' not in init_sig.parameters + ): cmdset = cmdset_type() self.register_command_set(cmdset) @@ -548,14 +573,16 @@ class Cmd(cmd.Cmd): methods = inspect.getmembers( cmdset, predicate=lambda meth: isinstance(meth, Callable) - and hasattr(meth, '__name__') and meth.__name__.startswith(COMMAND_FUNC_PREFIX)) + and hasattr(meth, '__name__') + and meth.__name__.startswith(COMMAND_FUNC_PREFIX), + ) default_category = getattr(cmdset, CLASS_ATTR_DEFAULT_HELP_CATEGORY, None) installed_attributes = [] try: for method_name, method in methods: - command = method_name[len(COMMAND_FUNC_PREFIX):] + command = method_name[len(COMMAND_FUNC_PREFIX) :] self._install_command_function(command, method, type(cmdset).__name__) installed_attributes.append(method_name) @@ -588,8 +615,7 @@ class Cmd(cmd.Cmd): if cmdset in self._installed_command_sets: self._installed_command_sets.remove(cmdset) if cmdset in self._cmd_to_command_sets.values(): - self._cmd_to_command_sets = \ - {key: val for key, val in self._cmd_to_command_sets.items() if val is not cmdset} + self._cmd_to_command_sets = {key: val for key, val in self._cmd_to_command_sets.items() if val is not cmdset} cmdset.on_unregistered() raise @@ -644,10 +670,12 @@ class Cmd(cmd.Cmd): methods = inspect.getmembers( cmdset, predicate=lambda meth: isinstance(meth, Callable) - and hasattr(meth, '__name__') and meth.__name__.startswith(COMMAND_FUNC_PREFIX)) + and hasattr(meth, '__name__') + and meth.__name__.startswith(COMMAND_FUNC_PREFIX), + ) for method in methods: - cmd_name = method[0][len(COMMAND_FUNC_PREFIX):] + cmd_name = method[0][len(COMMAND_FUNC_PREFIX) :] # Enable the command before uninstalling it to make sure we remove both # the real functions and the ones used by the DisabledCommand object. @@ -671,10 +699,12 @@ class Cmd(cmd.Cmd): methods = inspect.getmembers( cmdset, predicate=lambda meth: isinstance(meth, Callable) - and hasattr(meth, '__name__') and meth.__name__.startswith(COMMAND_FUNC_PREFIX)) + and hasattr(meth, '__name__') + and meth.__name__.startswith(COMMAND_FUNC_PREFIX), + ) for method in methods: - command_name = method[0][len(COMMAND_FUNC_PREFIX):] + command_name = method[0][len(COMMAND_FUNC_PREFIX) :] # Search for the base command function and verify it has an argparser defined if command_name in self.disabled_commands: @@ -691,9 +721,11 @@ class Cmd(cmd.Cmd): attached_cmdset = getattr(subparser, constants.PARSER_ATTR_COMMANDSET, None) if attached_cmdset is not None and attached_cmdset is not cmdset: raise CommandSetRegistrationError( - 'Cannot uninstall CommandSet when another CommandSet depends on it') + 'Cannot uninstall CommandSet when another CommandSet depends on it' + ) check_parser_uninstallable(subparser) break + if command_parser is not None: check_parser_uninstallable(command_parser) @@ -712,7 +744,7 @@ class Cmd(cmd.Cmd): predicate=lambda meth: isinstance(meth, Callable) and hasattr(meth, constants.SUBCMD_ATTR_NAME) and hasattr(meth, constants.SUBCMD_ATTR_COMMAND) - and hasattr(meth, constants.CMD_ATTR_ARGPARSER) + and hasattr(meth, constants.CMD_ATTR_ARGPARSER), ) # iterate through all matching methods @@ -736,12 +768,14 @@ class Cmd(cmd.Cmd): command_func = self.cmd_func(command_name) if command_func is None: - raise CommandSetRegistrationError('Could not find command "{}" needed by subcommand: {}' - .format(command_name, str(method))) + raise CommandSetRegistrationError( + 'Could not find command "{}" needed by subcommand: {}'.format(command_name, str(method)) + ) command_parser = getattr(command_func, constants.CMD_ATTR_ARGPARSER, None) if command_parser is None: - raise CommandSetRegistrationError('Could not find argparser for command "{}" needed by subcommand: {}' - .format(command_name, str(method))) + raise CommandSetRegistrationError( + 'Could not find argparser for command "{}" needed by subcommand: {}'.format(command_name, str(method)) + ) def find_subcommand(action: argparse.ArgumentParser, subcmd_names: List[str]) -> argparse.ArgumentParser: if not subcmd_names: @@ -815,7 +849,7 @@ class Cmd(cmd.Cmd): predicate=lambda meth: isinstance(meth, Callable) and hasattr(meth, constants.SUBCMD_ATTR_NAME) and hasattr(meth, constants.SUBCMD_ATTR_COMMAND) - and hasattr(meth, constants.CMD_ATTR_ARGPARSER) + and hasattr(meth, constants.CMD_ATTR_ARGPARSER), ) # iterate through all matching methods @@ -832,14 +866,16 @@ class Cmd(cmd.Cmd): if command_func is None: # pragma: no cover # This really shouldn't be possible since _register_subcommands would prevent this from happening # but keeping in case it does for some strange reason - raise CommandSetRegistrationError('Could not find command "{}" needed by subcommand: {}' - .format(command_name, str(method))) + raise CommandSetRegistrationError( + 'Could not find command "{}" needed by subcommand: {}'.format(command_name, str(method)) + ) command_parser = getattr(command_func, constants.CMD_ATTR_ARGPARSER, None) if command_parser is None: # pragma: no cover # This really shouldn't be possible since _register_subcommands would prevent this from happening # but keeping in case it does for some strange reason - raise CommandSetRegistrationError('Could not find argparser for command "{}" needed by subcommand: {}' - .format(command_name, str(method))) + raise CommandSetRegistrationError( + 'Could not find argparser for command "{}" needed by subcommand: {}'.format(command_name, str(method)) + ) for action in command_parser._actions: if isinstance(action, argparse._SubParsersAction): @@ -868,21 +904,26 @@ class Cmd(cmd.Cmd): def build_settables(self): """Create the dictionary of user-settable parameters""" - self.add_settable(Settable('allow_style', str, - 'Allow ANSI text style sequences in output (valid values: ' - '{}, {}, {})'.format(ansi.STYLE_TERMINAL, - ansi.STYLE_ALWAYS, - ansi.STYLE_NEVER), - choices=[ansi.STYLE_TERMINAL, ansi.STYLE_ALWAYS, ansi.STYLE_NEVER])) - - self.add_settable(Settable('always_show_hint', bool, - 'Display tab completion hint even when completion suggestions print')) + self.add_settable( + Settable( + 'allow_style', + str, + 'Allow ANSI text style sequences in output (valid values: ' + '{}, {}, {})'.format(ansi.STYLE_TERMINAL, ansi.STYLE_ALWAYS, ansi.STYLE_NEVER), + choices=[ansi.STYLE_TERMINAL, ansi.STYLE_ALWAYS, ansi.STYLE_NEVER], + ) + ) + + self.add_settable( + Settable('always_show_hint', bool, 'Display tab completion hint even when completion suggestions print') + ) self.add_settable(Settable('debug', bool, "Show full traceback on exception")) self.add_settable(Settable('echo', bool, "Echo command issued into output")) self.add_settable(Settable('editor', str, "Program used by 'edit'")) self.add_settable(Settable('feedback_to_output', bool, "Include nonessentials in '|', '>' results")) - self.add_settable(Settable('max_completion_items', int, - "Maximum number of CompletionItems to display during tab completion")) + self.add_settable( + Settable('max_completion_items', int, "Maximum number of CompletionItems to display during tab completion") + ) self.add_settable(Settable('quiet', bool, "Don't print nonessential feedback")) self.add_settable(Settable('timing', bool, "Report execution times")) @@ -900,8 +941,9 @@ class Cmd(cmd.Cmd): if new_val in [ansi.STYLE_TERMINAL, ansi.STYLE_ALWAYS, ansi.STYLE_NEVER]: ansi.allow_style = new_val else: - raise ValueError("must be {}, {}, or {} (case-insensitive)".format(ansi.STYLE_TERMINAL, ansi.STYLE_ALWAYS, - ansi.STYLE_NEVER)) + raise ValueError( + "must be {}, {}, or {} (case-insensitive)".format(ansi.STYLE_TERMINAL, ansi.STYLE_ALWAYS, ansi.STYLE_NEVER) + ) def _completion_supported(self) -> bool: """Return whether tab completion is supported""" @@ -976,6 +1018,7 @@ class Cmd(cmd.Cmd): """ if self.debug and sys.exc_info() != (None, None, None): import traceback + traceback.print_exc() if isinstance(msg, Exception): @@ -1103,6 +1146,7 @@ class Cmd(cmd.Cmd): - Two empty lists """ import copy + unclosed_quote = '' quotes_to_try = copy.copy(constants.QUOTES) @@ -1225,9 +1269,16 @@ class Cmd(cmd.Cmd): return matches - def flag_based_complete(self, text: str, line: str, begidx: int, endidx: int, - flag_dict: Dict[str, Union[Iterable, Callable]], *, - all_else: Union[None, Iterable, Callable] = None) -> List[str]: + def flag_based_complete( + self, + text: str, + line: str, + begidx: int, + endidx: int, + flag_dict: Dict[str, Union[Iterable, Callable]], + *, + all_else: Union[None, Iterable, Callable] = None + ) -> List[str]: """Tab completes based on a particular flag preceding the token being completed. :param text: the string prefix we are attempting to match (all matches must begin with it) @@ -1267,9 +1318,16 @@ class Cmd(cmd.Cmd): return completions_matches - def index_based_complete(self, text: str, line: str, begidx: int, endidx: int, - index_dict: Mapping[int, Union[Iterable, Callable]], *, - all_else: Union[None, Iterable, Callable] = None) -> List[str]: + def index_based_complete( + self, + text: str, + line: str, + begidx: int, + endidx: int, + index_dict: Mapping[int, Union[Iterable, Callable]], + *, + all_else: Union[None, Iterable, Callable] = None + ) -> List[str]: """Tab completes based on a fixed position in the input string. :param text: the string prefix we are attempting to match (all matches must begin with it) @@ -1312,8 +1370,9 @@ class Cmd(cmd.Cmd): return matches # noinspection PyUnusedLocal - def path_complete(self, text: str, line: str, begidx: int, endidx: int, *, - path_filter: Optional[Callable[[str], bool]] = None) -> List[str]: + def path_complete( + self, text: str, line: str, begidx: int, endidx: int, *, path_filter: Optional[Callable[[str], bool]] = None + ) -> List[str]: """Performs completion of local file system paths :param text: the string prefix we are attempting to match (all matches must begin with it) @@ -1456,8 +1515,7 @@ class Cmd(cmd.Cmd): return matches - def shell_cmd_complete(self, text: str, line: str, begidx: int, endidx: int, *, - complete_blank: bool = False) -> List[str]: + def shell_cmd_complete(self, text: str, line: str, begidx: int, endidx: int, *, complete_blank: bool = False) -> List[str]: """Performs completion of executables either in a user's path or a given path :param text: the string prefix we are attempting to match (all matches must begin with it) @@ -1478,8 +1536,9 @@ class Cmd(cmd.Cmd): # Otherwise look for executables in the given path else: - return self.path_complete(text, line, begidx, endidx, - path_filter=lambda path: os.path.isdir(path) or os.access(path, os.X_OK)) + return self.path_complete( + text, line, begidx, endidx, path_filter=lambda path: os.path.isdir(path) or os.access(path, os.X_OK) + ) def _redirect_complete(self, text: str, line: str, begidx: int, endidx: int, compfunc: Callable) -> List[str]: """Called by complete() as the first tab completion function for all commands @@ -1600,8 +1659,9 @@ class Cmd(cmd.Cmd): return metadata - def _display_matches_gnu_readline(self, substitution: str, matches: List[str], - longest_match_length: int) -> None: # pragma: no cover + def _display_matches_gnu_readline( + self, substitution: str, matches: List[str], longest_match_length: int + ) -> None: # pragma: no cover """Prints a match list using GNU readline's rl_display_match_list() This exists to print self.display_matches if it has data. Otherwise matches prints. @@ -1809,6 +1869,7 @@ class Cmd(cmd.Cmd): # before we alter them. That way the suggestions will reflect how we parsed # the token being completed and not how readline did. import copy + self.display_matches = copy.copy(self.completion_matches) # Check if we need to add an opening quote @@ -1825,8 +1886,7 @@ class Cmd(cmd.Cmd): # For delimited matches, we check for a space in what appears before the display # matches (common_prefix) as well as in the display matches themselves. - if ' ' in common_prefix or (display_prefix - and any(' ' in match for match in self.display_matches)): + if ' ' in common_prefix or (display_prefix and any(' ' in match for match in self.display_matches)): add_quote = True # If there is a tab completion and any match has a space, then add an opening quote @@ -1902,7 +1962,7 @@ class Cmd(cmd.Cmd): shortcut_to_restore = shortcut # Adjust text and where it begins - text = text[len(shortcut_to_restore):] + text = text[len(shortcut_to_restore) :] begidx += len(shortcut_to_restore) break else: @@ -1969,13 +2029,19 @@ class Cmd(cmd.Cmd): def get_all_commands(self) -> List[str]: """Return a list of all commands""" - return [name[len(constants.COMMAND_FUNC_PREFIX):] for name in self.get_names() - if name.startswith(constants.COMMAND_FUNC_PREFIX) and callable(getattr(self, name))] + return [ + name[len(constants.COMMAND_FUNC_PREFIX) :] + for name in self.get_names() + if name.startswith(constants.COMMAND_FUNC_PREFIX) and callable(getattr(self, name)) + ] def get_visible_commands(self) -> List[str]: """Return a list of commands that have not been hidden or disabled""" - return [command for command in self.get_all_commands() - if command not in self.hidden_commands and command not in self.disabled_commands] + return [ + command + for command in self.get_all_commands() + if command not in self.hidden_commands and command not in self.disabled_commands + ] def _get_alias_completion_items(self) -> List[CompletionItem]: """Return list of current alias names and values as CompletionItems""" @@ -1998,12 +2064,14 @@ class Cmd(cmd.Cmd): def get_help_topics(self) -> List[str]: """Return a list of help topics""" - all_topics = [name[len(constants.HELP_FUNC_PREFIX):] for name in self.get_names() - if name.startswith(constants.HELP_FUNC_PREFIX) and callable(getattr(self, name))] + all_topics = [ + name[len(constants.HELP_FUNC_PREFIX) :] + for name in self.get_names() + if name.startswith(constants.HELP_FUNC_PREFIX) and callable(getattr(self, name)) + ] # Filter out hidden and disabled commands - return [topic for topic in all_topics - if topic not in self.hidden_commands and topic not in self.disabled_commands] + return [topic for topic in all_topics if topic not in self.hidden_commands and topic not in self.disabled_commands] # noinspection PyUnusedLocal def sigint_handler(self, signum: int, frame) -> None: @@ -2084,8 +2152,9 @@ class Cmd(cmd.Cmd): statement = self.statement_parser.parse_command_only(line) return statement.command, statement.args, statement.command_and_args - def onecmd_plus_hooks(self, line: str, *, add_to_history: bool = True, - raise_keyboard_interrupt: bool = False, py_bridge_call: bool = False) -> bool: + def onecmd_plus_hooks( + self, line: str, *, add_to_history: bool = True, raise_keyboard_interrupt: bool = False, py_bridge_call: bool = False + ) -> bool: """Top-level function called by cmdloop() to handle parsing a line and running the command and all of its hooks. :param line: command line to run @@ -2203,6 +2272,7 @@ class Cmd(cmd.Cmd): # Before the next command runs, fix any terminal problems like those # caused by certain binary characters having been printed to it. import subprocess + proc = subprocess.Popen(['stty', 'sane']) proc.communicate() @@ -2213,8 +2283,9 @@ class Cmd(cmd.Cmd): # modifications to the statement return data.stop - def runcmds_plus_hooks(self, cmds: List[Union[HistoryItem, str]], *, add_to_history: bool = True, - stop_on_keyboard_interrupt: bool = True) -> bool: + def runcmds_plus_hooks( + self, cmds: List[Union[HistoryItem, str]], *, add_to_history: bool = True, stop_on_keyboard_interrupt: bool = True + ) -> bool: """ Used when commands are being run in an automated fashion like text scripts or history replays. The prompt and command line for each command will be printed if echo is True. @@ -2233,8 +2304,9 @@ class Cmd(cmd.Cmd): self.poutput('{}{}'.format(self.prompt, line)) try: - if self.onecmd_plus_hooks(line, add_to_history=add_to_history, - raise_keyboard_interrupt=stop_on_keyboard_interrupt): + if self.onecmd_plus_hooks( + line, add_to_history=add_to_history, raise_keyboard_interrupt=stop_on_keyboard_interrupt + ): return True except KeyboardInterrupt as e: if stop_on_keyboard_interrupt: @@ -2340,16 +2412,18 @@ class Cmd(cmd.Cmd): if orig_line != statement.raw: # Build a Statement that contains the resolved macro line # but the originally typed line for its raw member. - statement = Statement(statement.args, - raw=orig_line, - command=statement.command, - arg_list=statement.arg_list, - multiline_command=statement.multiline_command, - terminator=statement.terminator, - suffix=statement.suffix, - pipe_to=statement.pipe_to, - output=statement.output, - output_to=statement.output_to) + statement = Statement( + statement.args, + raw=orig_line, + command=statement.command, + arg_list=statement.arg_list, + multiline_command=statement.multiline_command, + terminator=statement.terminator, + suffix=statement.suffix, + pipe_to=statement.pipe_to, + output=statement.output, + output_to=statement.output_to, + ) return statement def _resolve_macro(self, statement: Statement) -> Optional[str]: @@ -2366,12 +2440,7 @@ class Cmd(cmd.Cmd): # Make sure enough arguments were passed in if len(statement.arg_list) < macro.minimum_arg_count: - self.perror( - "The macro '{}' expects at least {} argument(s)".format( - statement.command, - macro.minimum_arg_count - ) - ) + self.perror("The macro '{}' expects at least {} argument(s)".format(statement.command, macro.minimum_arg_count)) return None # Resolve the arguments in reverse and read their values from statement.argv since those @@ -2391,7 +2460,7 @@ class Cmd(cmd.Cmd): resolved = parts[0] + replacement + parts[1] # Append extra arguments and use statement.arg_list since these arguments need their quotes preserved - for arg in statement.arg_list[macro.minimum_arg_count:]: + for arg in statement.arg_list[macro.minimum_arg_count :]: resolved += ' ' + arg # Restore any terminator, suffix, redirection, etc. @@ -2408,8 +2477,7 @@ class Cmd(cmd.Cmd): import subprocess # Initialize the redirection saved state - redir_saved_state = utils.RedirectionSavedState(self.stdout, sys.stdout, - self._cur_pipe_proc_reader, self._redirecting) + redir_saved_state = utils.RedirectionSavedState(self.stdout, sys.stdout, self._cur_pipe_proc_reader, self._redirecting) # The ProcReader for this command cmd_pipe_proc_reader = None # type: Optional[utils.ProcReader] @@ -2436,12 +2504,14 @@ class Cmd(cmd.Cmd): kwargs['start_new_session'] = True # For any stream that is a StdSim, we will use a pipe so we can capture its output - proc = subprocess.Popen(statement.pipe_to, - stdin=subproc_stdin, - stdout=subprocess.PIPE if isinstance(self.stdout, utils.StdSim) else self.stdout, - stderr=subprocess.PIPE if isinstance(sys.stderr, utils.StdSim) else sys.stderr, - shell=True, - **kwargs) + proc = subprocess.Popen( + statement.pipe_to, + stdin=subproc_stdin, + stdout=subprocess.PIPE if isinstance(self.stdout, utils.StdSim) else self.stdout, + stderr=subprocess.PIPE if isinstance(sys.stderr, utils.StdSim) else sys.stderr, + shell=True, + **kwargs + ) # Popen was called with shell=True so the user can chain pipe commands and redirect their output # like: !ls -l | grep user | wc -l > out.txt. But this makes it difficult to know if the pipe process @@ -2456,8 +2526,7 @@ class Cmd(cmd.Cmd): if proc.returncode is not None: subproc_stdin.close() new_stdout.close() - raise RedirectionError( - 'Pipe process exited with code {} before command could run'.format(proc.returncode)) + raise RedirectionError('Pipe process exited with code {} before command could run'.format(proc.returncode)) else: redir_saved_state.redirecting = True cmd_pipe_proc_reader = utils.ProcReader(proc, self.stdout, sys.stderr) @@ -2465,9 +2534,9 @@ class Cmd(cmd.Cmd): elif statement.output: import tempfile + if (not statement.output_to) and (not self._can_clip): - raise RedirectionError( - "Cannot redirect to paste buffer; missing 'pyperclip' and/or pyperclip dependencies") + raise RedirectionError("Cannot redirect to paste buffer; missing 'pyperclip' and/or pyperclip dependencies") # Redirecting to a file elif statement.output_to: @@ -2555,7 +2624,7 @@ class Cmd(cmd.Cmd): # noinspection PyMethodOverriding def onecmd(self, statement: Union[Statement, str], *, add_to_history: bool = True) -> bool: - """ This executes the actual do_* method for a command. + """This executes the actual do_* method for a command. If the command provided doesn't exist, then it executes default() instead. @@ -2571,8 +2640,11 @@ class Cmd(cmd.Cmd): func = self.cmd_func(statement.command) if func: # Check to see if this command should be stored in history - if statement.command not in self.exclude_from_history and \ - statement.command not in self.disabled_commands and add_to_history: + if ( + statement.command not in self.exclude_from_history + and statement.command not in self.disabled_commands + and add_to_history + ): self.history.append(statement) stop = func(statement) @@ -2871,11 +2943,8 @@ class Cmd(cmd.Cmd): ############################################################# # Top-level parser for alias - alias_description = ("Manage aliases\n" - "\n" - "An alias is a command that enables replacement of a word by another string.") - alias_epilog = ("See also:\n" - " macro") + alias_description = "Manage aliases\n" "\n" "An alias is a command that enables replacement of a word by another string." + alias_epilog = "See also:\n" " macro" alias_parser = DEFAULT_ARGUMENT_PARSER(description=alias_description, epilog=alias_epilog) alias_subparsers = alias_parser.add_subparsers(dest='subcommand', metavar='SUBCOMMAND') alias_subparsers.required = True @@ -2891,17 +2960,19 @@ class Cmd(cmd.Cmd): # alias -> create alias_create_description = "Create or overwrite an alias" - alias_create_epilog = ("Notes:\n" - " If you want to use redirection, pipes, or terminators in the value of the\n" - " alias, then quote them.\n" - "\n" - " Since aliases are resolved during parsing, tab completion will function as\n" - " it would for the actual command the alias resolves to.\n" - "\n" - "Examples:\n" - " alias create ls !ls -lF\n" - " alias create show_log !cat \"log file.txt\"\n" - " alias create save_results print_results \">\" out.txt\n") + alias_create_epilog = ( + "Notes:\n" + " If you want to use redirection, pipes, or terminators in the value of the\n" + " alias, then quote them.\n" + "\n" + " Since aliases are resolved during parsing, tab completion will function as\n" + " it would for the actual command the alias resolves to.\n" + "\n" + "Examples:\n" + " alias create ls !ls -lF\n" + " alias create show_log !cat \"log file.txt\"\n" + " alias create save_results print_results \">\" out.txt\n" + ) alias_create_parser = DEFAULT_ARGUMENT_PARSER(description=alias_create_description, epilog=alias_create_epilog) alias_create_parser.add_argument('name', help='name of this alias') @@ -2970,10 +3041,12 @@ class Cmd(cmd.Cmd): # alias -> list alias_list_help = "list aliases" - alias_list_description = ("List specified aliases in a reusable form that can be saved to a startup\n" - "script to preserve aliases across sessions\n" - "\n" - "Without arguments, all aliases will be listed.") + alias_list_description = ( + "List specified aliases in a reusable form that can be saved to a startup\n" + "script to preserve aliases across sessions\n" + "\n" + "Without arguments, all aliases will be listed." + ) alias_list_parser = DEFAULT_ARGUMENT_PARSER(description=alias_list_description) alias_list_parser.add_argument('names', nargs=argparse.ZERO_OR_MORE, help='alias(es) to list', @@ -3016,11 +3089,8 @@ class Cmd(cmd.Cmd): ############################################################# # Top-level parser for macro - macro_description = ("Manage macros\n" - "\n" - "A macro is similar to an alias, but it can contain argument placeholders.") - macro_epilog = ("See also:\n" - " alias") + macro_description = "Manage macros\n" "\n" "A macro is similar to an alias, but it can contain argument placeholders." + macro_epilog = "See also:\n" " alias" macro_parser = DEFAULT_ARGUMENT_PARSER(description=macro_description, epilog=macro_epilog) macro_subparsers = macro_parser.add_subparsers(dest='subcommand', metavar='SUBCOMMAND') macro_subparsers.required = True @@ -3037,40 +3107,42 @@ class Cmd(cmd.Cmd): macro_create_help = "create or overwrite a macro" macro_create_description = "Create or overwrite a macro" - macro_create_epilog = ("A macro is similar to an alias, but it can contain argument placeholders.\n" - "Arguments are expressed when creating a macro using {#} notation where {1}\n" - "means the first argument.\n" - "\n" - "The following creates a macro called my_macro that expects two arguments:\n" - "\n" - " macro create my_macro make_dinner --meat {1} --veggie {2}\n" - "\n" - "When the macro is called, the provided arguments are resolved and the\n" - "assembled command is run. For example:\n" - "\n" - " my_macro beef broccoli ---> make_dinner --meat beef --veggie broccoli\n" - "\n" - "Notes:\n" - " To use the literal string {1} in your command, escape it this way: {{1}}.\n" - "\n" - " Extra arguments passed to a macro are appended to resolved command.\n" - "\n" - " An argument number can be repeated in a macro. In the following example the\n" - " first argument will populate both {1} instances.\n" - "\n" - " macro create ft file_taxes -p {1} -q {2} -r {1}\n" - "\n" - " To quote an argument in the resolved command, quote it during creation.\n" - "\n" - " macro create backup !cp \"{1}\" \"{1}.orig\"\n" - "\n" - " If you want to use redirection, pipes, or terminators in the value of the\n" - " macro, then quote them.\n" - "\n" - " macro create show_results print_results -type {1} \"|\" less\n" - "\n" - " Because macros do not resolve until after hitting Enter, tab completion\n" - " will only complete paths while typing a macro.") + macro_create_epilog = ( + "A macro is similar to an alias, but it can contain argument placeholders.\n" + "Arguments are expressed when creating a macro using {#} notation where {1}\n" + "means the first argument.\n" + "\n" + "The following creates a macro called my_macro that expects two arguments:\n" + "\n" + " macro create my_macro make_dinner --meat {1} --veggie {2}\n" + "\n" + "When the macro is called, the provided arguments are resolved and the\n" + "assembled command is run. For example:\n" + "\n" + " my_macro beef broccoli ---> make_dinner --meat beef --veggie broccoli\n" + "\n" + "Notes:\n" + " To use the literal string {1} in your command, escape it this way: {{1}}.\n" + "\n" + " Extra arguments passed to a macro are appended to resolved command.\n" + "\n" + " An argument number can be repeated in a macro. In the following example the\n" + " first argument will populate both {1} instances.\n" + "\n" + " macro create ft file_taxes -p {1} -q {2} -r {1}\n" + "\n" + " To quote an argument in the resolved command, quote it during creation.\n" + "\n" + " macro create backup !cp \"{1}\" \"{1}.orig\"\n" + "\n" + " If you want to use redirection, pipes, or terminators in the value of the\n" + " macro, then quote them.\n" + "\n" + " macro create show_results print_results -type {1} \"|\" less\n" + "\n" + " Because macros do not resolve until after hitting Enter, tab completion\n" + " will only complete paths while typing a macro." + ) macro_create_parser = DEFAULT_ARGUMENT_PARSER(description=macro_create_description, epilog=macro_create_epilog) macro_create_parser.add_argument('name', help='name of this macro') @@ -3117,7 +3189,7 @@ class Cmd(cmd.Cmd): cur_match = normal_matches.__next__() # Get the number string between the braces - cur_num_str = (re.findall(MacroArg.digit_pattern, cur_match.group())[0]) + cur_num_str = re.findall(MacroArg.digit_pattern, cur_match.group())[0] cur_num = int(cur_num_str) if cur_num < 1: self.perror("Argument numbers must be greater than 0") @@ -3134,9 +3206,7 @@ class Cmd(cmd.Cmd): # Make sure the argument numbers are continuous if len(arg_nums) != max_arg_num: - self.perror( - "Not all numbers between 1 and {} are present " - "in the argument placeholders".format(max_arg_num)) + self.perror("Not all numbers between 1 and {} are present " "in the argument placeholders".format(max_arg_num)) return # Find all escaped arguments @@ -3185,10 +3255,12 @@ class Cmd(cmd.Cmd): # macro -> list macro_list_help = "list macros" - macro_list_description = ("List specified macros in a reusable form that can be saved to a startup script\n" - "to preserve macros across sessions\n" - "\n" - "Without arguments, all macros will be listed.") + macro_list_description = ( + "List specified macros in a reusable form that can be saved to a startup script\n" + "to preserve macros across sessions\n" + "\n" + "Without arguments, all macros will be listed." + ) macro_list_parser = DEFAULT_ARGUMENT_PARSER(description=macro_list_description) macro_list_parser.add_argument('names', nargs=argparse.ZERO_OR_MORE, help='macro(s) to list', @@ -3235,8 +3307,9 @@ class Cmd(cmd.Cmd): strs_to_match = list(topics | visible_commands) return self.basic_complete(text, line, begidx, endidx, strs_to_match) - def complete_help_subcommands(self, text: str, line: str, begidx: int, endidx: int, - arg_tokens: Dict[str, List[str]]) -> List[str]: + def complete_help_subcommands( + self, text: str, line: str, begidx: int, endidx: int, arg_tokens: Dict[str, List[str]] + ) -> List[str]: """Completes the subcommands argument of help""" # Make sure we have a command whose subcommands we will complete @@ -3281,7 +3354,10 @@ class Cmd(cmd.Cmd): # If the command function uses argparse, then use argparse's help if func is not None and argparser is not None: - from .argparse_completer import ArgparseCompleter + from .argparse_completer import ( + ArgparseCompleter, + ) + completer = ArgparseCompleter(argparser, self) # Set end to blank so the help output matches how it looks when "command -h" is used @@ -3421,9 +3497,7 @@ class Cmd(cmd.Cmd): break for doc_line in doc_block: - self.stdout.write('{: <{col_width}}{doc}\n'.format(command, - col_width=widest, - doc=doc_line)) + self.stdout.write('{: <{col_width}}{doc}\n'.format(command, col_width=widest, doc=doc_line)) command = '' self.stdout.write("\n") @@ -3453,18 +3527,17 @@ class Cmd(cmd.Cmd): # Return True to stop the command loop return True - def select(self, opts: Union[str, List[str], List[Tuple[Any, Optional[str]]]], - prompt: str = 'Your choice? ') -> str: + def select(self, opts: Union[str, List[str], List[Tuple[Any, Optional[str]]]], prompt: str = 'Your choice? ') -> str: """Presents a numbered menu to the user. Modeled after - the bash shell's SELECT. Returns the item chosen. + the bash shell's SELECT. Returns the item chosen. - Argument ``opts`` can be: + Argument ``opts`` can be: - | a single string -> will be split into one-word options - | a list of strings -> will be offered as options - | a list of tuples -> interpreted as (value, text), so - that the return value can differ from - the text advertised to the user """ + | a single string -> will be split into one-word options + | a list of strings -> will be offered as options + | a list of tuples -> interpreted as (value, text), so + that the return value can differ from + the text advertised to the user""" local_opts = opts if isinstance(opts, str): @@ -3500,11 +3573,11 @@ class Cmd(cmd.Cmd): raise IndexError return fulloptions[choice - 1][0] except (ValueError, IndexError): - self.poutput("{!r} isn't a valid choice. Pick a number between 1 and {}:".format( - response, len(fulloptions))) + self.poutput("{!r} isn't a valid choice. Pick a number between 1 and {}:".format(response, len(fulloptions))) - def complete_set_value(self, text: str, line: str, begidx: int, endidx: int, - arg_tokens: Dict[str, List[str]]) -> List[str]: + def complete_set_value( + self, text: str, line: str, begidx: int, endidx: int, arg_tokens: Dict[str, List[str]] + ) -> List[str]: """Completes the value argument of set""" param = arg_tokens['param'][0] try: @@ -3532,9 +3605,11 @@ class Cmd(cmd.Cmd): # When tab completing value, we recreate the set command parser with a value argument specific to # the settable being edited. To make this easier, define a parent parser with all the common elements. - set_description = ("Set a settable parameter or show current settings of parameters\n" - "Call without arguments for a list of all settable parameters with their values.\n" - "Call with just param to view that parameter's value.") + set_description = ( + "Set a settable parameter or show current settings of parameters\n" + "Call without arguments for a list of all settable parameters with their values.\n" + "Call with just param to view that parameter's value." + ) set_parser_parent = DEFAULT_ARGUMENT_PARSER(description=set_description, add_help=False) set_parser_parent.add_argument('-v', '--verbose', action='store_true', help='include description of parameters when viewing') @@ -3599,8 +3674,7 @@ class Cmd(cmd.Cmd): for param in sorted(results, key=self.default_sort_key): result_str = results[param] if args.verbose: - self.poutput('{} # {}'.format(utils.align_left(result_str, width=max_len), - self.settables[param].description)) + self.poutput('{} # {}'.format(utils.align_left(result_str, width=max_len), self.settables[param].description)) else: self.poutput(result_str) @@ -3626,10 +3700,12 @@ class Cmd(cmd.Cmd): # still receive the SIGINT since it is in the same process group as us. with self.sigint_protection: # For any stream that is a StdSim, we will use a pipe so we can capture its output - proc = subprocess.Popen(expanded_command, - stdout=subprocess.PIPE if isinstance(self.stdout, utils.StdSim) else self.stdout, - stderr=subprocess.PIPE if isinstance(sys.stderr, utils.StdSim) else sys.stderr, - shell=True) + proc = subprocess.Popen( + expanded_command, + stdout=subprocess.PIPE if isinstance(self.stdout, utils.StdSim) else self.stdout, + stderr=subprocess.PIPE if isinstance(sys.stderr, utils.StdSim) else sys.stderr, + shell=True, + ) proc_reader = utils.ProcReader(proc, self.stdout, sys.stderr) proc_reader.wait() @@ -3685,8 +3761,7 @@ class Cmd(cmd.Cmd): # Set up tab completion for the Python console # rlcompleter relies on the default settings of the Python readline module if rl_type == RlType.GNU: - cmd2_env.readline_settings.basic_quotes = ctypes.cast(rl_basic_quote_characters, - ctypes.c_void_p).value + cmd2_env.readline_settings.basic_quotes = ctypes.cast(rl_basic_quote_characters, ctypes.c_void_p).value rl_basic_quote_characters.value = orig_rl_basic_quotes if 'gnureadline' in sys.modules: @@ -3764,14 +3839,16 @@ class Cmd(cmd.Cmd): else: sys.modules['readline'] = cmd2_env.readline_module - py_description = ("Invoke Python command or shell\n" - "\n" - "Note that, when invoking a command directly from the command line, this shell\n" - "has limited ability to parse Python statements into tokens. In particular,\n" - "there may be problems with whitespace and quotes depending on their placement.\n" - "\n" - "If you see strange parsing behavior, it's best to just open the Python shell\n" - "by providing no arguments to py and run more complex statements there.") + py_description = ( + "Invoke Python command or shell\n" + "\n" + "Note that, when invoking a command directly from the command line, this shell\n" + "has limited ability to parse Python statements into tokens. In particular,\n" + "there may be problems with whitespace and quotes depending on their placement.\n" + "\n" + "If you see strange parsing behavior, it's best to just open the Python shell\n" + "by providing no arguments to py and run more complex statements there." + ) py_parser = DEFAULT_ARGUMENT_PARSER(description=py_description) py_parser.add_argument('command', nargs=argparse.OPTIONAL, help="command to run") @@ -3794,7 +3871,10 @@ class Cmd(cmd.Cmd): """Function callable from the interactive Python console to exit that environment""" raise EmbeddedConsoleExit - from .py_bridge import PyBridge + from .py_bridge import ( + PyBridge, + ) + py_bridge = PyBridge(self) saved_sys_path = None @@ -3865,9 +3945,10 @@ class Cmd(cmd.Cmd): # Otherwise we will open an interactive Python shell else: cprt = 'Type "help", "copyright", "credits" or "license" for more information.' - instructions = ('End with `Ctrl-D` (Unix) / `Ctrl-Z` (Windows), `quit()`, `exit()`.\n' - 'Non-Python commands can be issued with: {}("your command")' - .format(self.py_bridge_name)) + instructions = ( + 'End with `Ctrl-D` (Unix) / `Ctrl-Z` (Windows), `quit()`, `exit()`.\n' + 'Non-Python commands can be issued with: {}("your command")'.format(self.py_bridge_name) + ) saved_cmd2_env = None @@ -3877,8 +3958,7 @@ class Cmd(cmd.Cmd): with self.sigint_protection: saved_cmd2_env = self._set_up_py_shell_env(interp) - interp.interact(banner="Python {} on {}\n{}\n\n{}\n". - format(sys.version, sys.platform, cprt, instructions)) + interp.interact(banner="Python {} on {}\n{}\n\n{}\n".format(sys.version, sys.platform, cprt, instructions)) except BaseException: # We don't care about any exception that happened in the interactive console pass @@ -3947,7 +4027,9 @@ class Cmd(cmd.Cmd): :return: True if running of commands should stop """ - from .py_bridge import PyBridge + from .py_bridge import ( + PyBridge, + ) # noinspection PyUnusedLocal def load_ipy(cmd2_app: Cmd, py_bridge: PyBridge): @@ -3969,9 +4051,13 @@ class Cmd(cmd.Cmd): del py_bridge # Start ipy shell - embed(banner1=('Entering an embedded IPython shell. Type quit or <Ctrl>-d to exit.\n' - 'Run Python code from external files with: run filename.py\n'), - exit_msg='Leaving IPython, back to {}'.format(sys.argv[0])) + embed( + banner1=( + 'Entering an embedded IPython shell. Type quit or <Ctrl>-d to exit.\n' + 'Run Python code from external files with: run filename.py\n' + ), + exit_msg='Leaving IPython, back to {}'.format(sys.argv[0]), + ) if self.in_pyscript(): self.perror("Recursively entering interactive Python shells is not allowed") @@ -4001,24 +4087,32 @@ class Cmd(cmd.Cmd): history_action_group.add_argument('-c', '--clear', action='store_true', help='clear all history') history_format_group = history_parser.add_argument_group(title='formatting') - history_format_group.add_argument('-s', '--script', action='store_true', - help='output commands in script format, i.e. without command\n' - 'numbers') - history_format_group.add_argument('-x', '--expanded', action='store_true', - help='output fully parsed commands with any aliases and\n' - 'macros expanded, instead of typed commands') - history_format_group.add_argument('-v', '--verbose', action='store_true', - help='display history and include expanded commands if they\n' - 'differ from the typed command') - history_format_group.add_argument('-a', '--all', action='store_true', - help='display all commands, including ones persisted from\n' - 'previous sessions') - - history_arg_help = ("empty all history items\n" - "a one history item by number\n" - "a..b, a:b, a:, ..b items by indices (inclusive)\n" - "string items containing string\n" - "/regex/ items matching regular expression") + history_format_group.add_argument( + '-s', '--script', action='store_true', help='output commands in script format, i.e. without command\n' 'numbers' + ) + history_format_group.add_argument( + '-x', + '--expanded', + action='store_true', + help='output fully parsed commands with any aliases and\n' 'macros expanded, instead of typed commands', + ) + history_format_group.add_argument( + '-v', + '--verbose', + action='store_true', + help='display history and include expanded commands if they\n' 'differ from the typed command', + ) + history_format_group.add_argument( + '-a', '--all', action='store_true', help='display all commands, including ones persisted from\n' 'previous sessions' + ) + + history_arg_help = ( + "empty all history items\n" + "a one history item by number\n" + "a..b, a:b, a:, ..b items by indices (inclusive)\n" + "string items containing string\n" + "/regex/ items matching regular expression" + ) history_parser.add_argument('arg', nargs=argparse.OPTIONAL, help=history_arg_help) @with_argparser(history_parser) @@ -4031,15 +4125,13 @@ class Cmd(cmd.Cmd): # -v must be used alone with no other options if args.verbose: - if args.clear or args.edit or args.output_file or args.run or args.transcript \ - or args.expanded or args.script: + if args.clear or args.edit or args.output_file or args.run or args.transcript or args.expanded or args.script: self.poutput("-v can not be used with any other options") self.poutput(self.history_parser.format_usage()) return # -s and -x can only be used if none of these options are present: [-c -r -e -o -t] - if (args.script or args.expanded) \ - and (args.clear or args.edit or args.output_file or args.run or args.transcript): + if (args.script or args.expanded) and (args.clear or args.edit or args.output_file or args.run or args.transcript): self.poutput("-s and -x can not be used with -c, -r, -e, -o, or -t") self.poutput(self.history_parser.format_usage()) return @@ -4072,6 +4164,7 @@ class Cmd(cmd.Cmd): return self.runcmds_plus_hooks(history) elif args.edit: import tempfile + fd, fname = tempfile.mkstemp(suffix='.txt', text=True) with os.fdopen(fd, 'w') as fobj: for command in history: @@ -4173,8 +4266,16 @@ class Cmd(cmd.Cmd): try: with open(hist_file, 'rb') as fobj: history = pickle.load(fobj) - except (AttributeError, EOFError, FileNotFoundError, ImportError, IndexError, KeyError, ValueError, - pickle.UnpicklingError): + except ( + AttributeError, + EOFError, + FileNotFoundError, + ImportError, + IndexError, + KeyError, + ValueError, + pickle.UnpicklingError, + ): # If any of these errors occur when attempting to unpickle, just use an empty history pass except OSError as ex: @@ -4202,6 +4303,7 @@ class Cmd(cmd.Cmd): # if the history file is in plain text format from 0.9.12 or lower # this will fail, and the history in the plain text file will be lost import atexit + atexit.register(self._persist_history) def _persist_history(self): @@ -4303,11 +4405,13 @@ class Cmd(cmd.Cmd): msg = '{} {} saved to transcript file {!r}' self.pfeedback(msg.format(commands_run, plural, transcript_file)) - edit_description = ("Run a text editor and optionally open a file with it\n" - "\n" - "The editor used is determined by a settable parameter. To set it:\n" - "\n" - " set editor (program-name)") + edit_description = ( + "Run a text editor and optionally open a file with it\n" + "\n" + "The editor used is determined by a settable parameter. To set it:\n" + "\n" + " set editor (program-name)" + ) edit_parser = DEFAULT_ARGUMENT_PARSER(description=edit_description) edit_parser.add_argument('file_path', nargs=argparse.OPTIONAL, @@ -4343,13 +4447,15 @@ class Cmd(cmd.Cmd): else: return None - run_script_description = ("Run commands in script file that is encoded as either ASCII or UTF-8 text\n" - "\n" - "Script should contain one command per line, just like the command would be\n" - "typed in the console.\n" - "\n" - "If the -t/--transcript flag is used, this command instead records\n" - "the output of the script commands to a transcript for testing purposes.\n") + run_script_description = ( + "Run commands in script file that is encoded as either ASCII or UTF-8 text\n" + "\n" + "Script should contain one command per line, just like the command would be\n" + "typed in the console.\n" + "\n" + "If the -t/--transcript flag is used, this command instead records\n" + "the output of the script commands to a transcript for testing purposes.\n" + ) run_script_parser = DEFAULT_ARGUMENT_PARSER(description=run_script_description) run_script_parser.add_argument('-t', '--transcript', metavar='TRANSCRIPT_FILE', @@ -4420,13 +4526,14 @@ class Cmd(cmd.Cmd): relative_run_script_description += ( "\n\n" "If this is called from within an already-running script, the filename will be\n" - "interpreted relative to the already-running script's directory.") + "interpreted relative to the already-running script's directory." + ) - relative_run_script_epilog = ("Notes:\n" - " This command is intended to only be used within text file scripts.") + relative_run_script_epilog = "Notes:\n" " This command is intended to only be used within text file scripts." - relative_run_script_parser = DEFAULT_ARGUMENT_PARSER(description=relative_run_script_description, - epilog=relative_run_script_epilog) + relative_run_script_parser = DEFAULT_ARGUMENT_PARSER( + description=relative_run_script_description, epilog=relative_run_script_epilog + ) relative_run_script_parser.add_argument('file_path', help='a file path pointing to a script') @with_argparser(relative_run_script_parser) @@ -4453,8 +4560,12 @@ class Cmd(cmd.Cmd): """ import time import unittest + import cmd2 - from .transcript import Cmd2TestCase + + from .transcript import ( + Cmd2TestCase, + ) class TestMyAppCase(Cmd2TestCase): cmdapp = self @@ -4470,8 +4581,7 @@ class Cmd(cmd.Cmd): num_transcripts = len(transcripts_expanded) plural = '' if len(transcripts_expanded) == 1 else 's' self.poutput(ansi.style(utils.align_center(' cmd2 transcript test ', fill_char='='), bold=True)) - self.poutput('platform {} -- Python {}, cmd2-{}, readline-{}'.format(sys.platform, verinfo, cmd2.__version__, - rl_type)) + self.poutput('platform {} -- Python {}, cmd2-{}, readline-{}'.format(sys.platform, verinfo, cmd2.__version__, rl_type)) self.poutput('cwd: {}'.format(os.getcwd())) self.poutput('cmd2 app: {}'.format(sys.argv[0])) self.poutput(ansi.style('collected {} transcript{}'.format(num_transcripts, plural), bold=True)) @@ -4545,9 +4655,13 @@ class Cmd(cmd.Cmd): import shutil current_prompt = self.continuation_prompt if self._at_continuation_prompt else self.prompt - terminal_str = ansi.async_alert_str(terminal_columns=shutil.get_terminal_size().columns, - prompt=current_prompt, line=readline.get_line_buffer(), - cursor_offset=rl_get_point(), alert_msg=alert_msg) + terminal_str = ansi.async_alert_str( + terminal_columns=shutil.get_terminal_size().columns, + prompt=current_prompt, + line=readline.get_line_buffer(), + cursor_offset=rl_get_point(), + alert_msg=alert_msg, + ) if rl_type == RlType.GNU: sys.stderr.write(terminal_str) sys.stderr.flush() @@ -4680,13 +4794,16 @@ class Cmd(cmd.Cmd): completer_func_name = constants.COMPLETER_FUNC_PREFIX + command # Add the disabled command record - self.disabled_commands[command] = DisabledCommand(command_function=command_function, - help_function=getattr(self, help_func_name, None), - completer_function=getattr(self, completer_func_name, None)) + self.disabled_commands[command] = DisabledCommand( + command_function=command_function, + help_function=getattr(self, help_func_name, None), + completer_function=getattr(self, completer_func_name, None), + ) # Overwrite the command and help functions to print the message - new_func = functools.partial(self._report_disabled_command_usage, - message_to_print=message_to_print.replace(constants.COMMAND_NAME, command)) + new_func = functools.partial( + self._report_disabled_command_usage, message_to_print=message_to_print.replace(constants.COMMAND_NAME, command) + ) setattr(self, self._cmd_func_name(command), new_func) setattr(self, help_func_name, new_func) @@ -4738,6 +4855,7 @@ class Cmd(cmd.Cmd): # Register a SIGINT signal handler for Ctrl+C import signal + original_sigint_handler = signal.getsignal(signal.SIGINT) signal.signal(signal.SIGINT, self.sigint_handler) @@ -4799,11 +4917,13 @@ class Cmd(cmd.Cmd): # validate that the callable has the right number of parameters nparam = len(signature.parameters) if nparam != count: - raise TypeError('{} has {} positional arguments, expected {}'.format( - func.__name__, - nparam, - count, - )) + raise TypeError( + '{} has {} positional arguments, expected {}'.format( + func.__name__, + nparam, + count, + ) + ) @classmethod def _validate_prepostloop_callable(cls, func: Callable[[None], None]) -> None: @@ -4812,9 +4932,11 @@ class Cmd(cmd.Cmd): # make sure there is no return notation signature = inspect.signature(func) if signature.return_annotation is not None: - raise TypeError("{} must declare return a return type of 'None'".format( - func.__name__, - )) + raise TypeError( + "{} must declare return a return type of 'None'".format( + func.__name__, + ) + ) def register_preloop_hook(self, func: Callable[[None], None]) -> None: """Register a function to be called at the beginning of the command loop.""" @@ -4833,13 +4955,11 @@ class Cmd(cmd.Cmd): signature = inspect.signature(func) _, param = list(signature.parameters.items())[0] if param.annotation != plugin.PostparsingData: - raise TypeError("{} must have one parameter declared with type 'cmd2.plugin.PostparsingData'".format( - func.__name__ - )) + raise TypeError( + "{} must have one parameter declared with type 'cmd2.plugin.PostparsingData'".format(func.__name__) + ) if signature.return_annotation != plugin.PostparsingData: - raise TypeError("{} must declare return a return type of 'cmd2.plugin.PostparsingData'".format( - func.__name__ - )) + raise TypeError("{} must declare return a return type of 'cmd2.plugin.PostparsingData'".format(func.__name__)) def register_postparsing_hook(self, func: Callable[[plugin.PostparsingData], plugin.PostparsingData]) -> None: """Register a function to be called after parsing user input but before running the command""" @@ -4856,23 +4976,29 @@ class Cmd(cmd.Cmd): paramname = list(signature.parameters.keys())[0] param = signature.parameters[paramname] if param.annotation != data_type: - raise TypeError('argument 1 of {} has incompatible type {}, expected {}'.format( - func.__name__, - param.annotation, - data_type, - )) + raise TypeError( + 'argument 1 of {} has incompatible type {}, expected {}'.format( + func.__name__, + param.annotation, + data_type, + ) + ) # validate the return value has the right annotation if signature.return_annotation == signature.empty: - raise TypeError('{} does not have a declared return type, expected {}'.format( - func.__name__, - data_type, - )) + raise TypeError( + '{} does not have a declared return type, expected {}'.format( + func.__name__, + data_type, + ) + ) if signature.return_annotation != data_type: - raise TypeError('{} has incompatible return type {}, expected {}'.format( - func.__name__, - signature.return_annotation, - data_type, - )) + raise TypeError( + '{} has incompatible return type {}, expected {}'.format( + func.__name__, + signature.return_annotation, + data_type, + ) + ) def register_precmd_hook(self, func: Callable[[plugin.PrecommandData], plugin.PrecommandData]) -> None: """Register a hook to be called before the command function.""" @@ -4885,28 +5011,28 @@ class Cmd(cmd.Cmd): self._postcmd_hooks.append(func) @classmethod - def _validate_cmdfinalization_callable(cls, func: Callable[[plugin.CommandFinalizationData], - plugin.CommandFinalizationData]) -> None: + def _validate_cmdfinalization_callable( + cls, func: Callable[[plugin.CommandFinalizationData], plugin.CommandFinalizationData] + ) -> None: """Check parameter and return types for command finalization hooks.""" cls._validate_callable_param_count(func, 1) signature = inspect.signature(func) _, param = list(signature.parameters.items())[0] if param.annotation != plugin.CommandFinalizationData: - raise TypeError("{} must have one parameter declared with type {}".format(func.__name__, - plugin.CommandFinalizationData)) + raise TypeError( + "{} must have one parameter declared with type {}".format(func.__name__, plugin.CommandFinalizationData) + ) if signature.return_annotation != plugin.CommandFinalizationData: - raise TypeError("{} must declare return a return type of {}".format(func.__name__, - plugin.CommandFinalizationData)) + raise TypeError("{} must declare return a return type of {}".format(func.__name__, plugin.CommandFinalizationData)) - def register_cmdfinalization_hook(self, func: Callable[[plugin.CommandFinalizationData], - plugin.CommandFinalizationData]) -> None: + def register_cmdfinalization_hook( + self, func: Callable[[plugin.CommandFinalizationData], plugin.CommandFinalizationData] + ) -> None: """Register a hook to be called after a command is completed, whether it completes successfully or not.""" self._validate_cmdfinalization_callable(func) self._cmdfinalization_hooks.append(func) - def _resolve_func_self(self, - cmd_support_func: Callable, - cmd_self: Union[CommandSet, 'Cmd']) -> object: + def _resolve_func_self(self, cmd_support_func: Callable, cmd_self: Union[CommandSet, 'Cmd']) -> object: """ Attempt to resolve a candidate instance to pass as 'self' for an unbound class method that was used when defining command's argparse object. Since we restrict registration to only a single CommandSet |