diff options
author | Kevin Van Brunt <kmvanbrunt@gmail.com> | 2019-06-14 17:36:44 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-06-14 17:36:44 -0400 |
commit | ddd07f9cd6d72baca1232ae98856cf3b3d564706 (patch) | |
tree | 8e03d435730baf1cc16ccc016e594d0b64d8e04a /cmd2/cmd2.py | |
parent | f64f9d559caa08b5649b9bd356af2812acf103bd (diff) | |
parent | 756d8d38502e934ea180c4cfb8dea3efd124a3bf (diff) | |
download | cmd2-git-ddd07f9cd6d72baca1232ae98856cf3b3d564706.tar.gz |
Merge pull request #696 from python-cmd2/script_refactor
Script refactor
Diffstat (limited to 'cmd2/cmd2.py')
-rw-r--r-- | cmd2/cmd2.py | 301 |
1 files changed, 144 insertions, 157 deletions
diff --git a/cmd2/cmd2.py b/cmd2/cmd2.py index 387fe6d9..02462d96 100644 --- a/cmd2/cmd2.py +++ b/cmd2/cmd2.py @@ -335,11 +335,12 @@ class Cmd(cmd.Cmd): DEFAULT_SHORTCUTS = {'?': 'help', '!': 'shell', '@': 'load', '@@': '_relative_load'} DEFAULT_EDITOR = utils.find_editor() - def __init__(self, completekey: str = 'tab', stdin=None, stdout=None, persistent_history_file: str = '', - persistent_history_length: int = 1000, startup_script: Optional[str] = None, use_ipython: bool = False, - transcript_files: Optional[List[str]] = None, allow_redirection: bool = True, - multiline_commands: Optional[List[str]] = None, terminators: Optional[List[str]] = None, - shortcuts: Optional[Dict[str, str]] = None) -> None: + def __init__(self, completekey: str = 'tab', stdin=None, stdout=None, *, + persistent_history_file: str = '', persistent_history_length: int = 1000, + startup_script: Optional[str] = None, use_ipython: bool = False, + allow_cli_args: bool = True, transcript_files: Optional[List[str]] = None, + allow_redirection: bool = True, multiline_commands: Optional[List[str]] = None, + terminators: Optional[List[str]] = None, shortcuts: Optional[Dict[str, str]] = None) -> None: """An easy but powerful framework for writing line-oriented command interpreters, extends Python's cmd package. :param completekey: (optional) readline name of a completion key, default to Tab @@ -349,6 +350,9 @@ class Cmd(cmd.Cmd): :param persistent_history_length: (optional) max number of history items to write to the persistent history file :param startup_script: (optional) file path to a a script to load and execute at startup :param use_ipython: (optional) should the "ipy" command be included for an embedded IPython shell + :param allow_cli_args: (optional) if True, then cmd2 will process command line arguments as either + commands to be run or, if -t is specified, transcript files to run. + This should be set to False if your application parses its own arguments. :param transcript_files: (optional) allows running transcript tests when allow_cli_args is False :param allow_redirection: (optional) should output redirection and pipes be allowed :param multiline_commands: (optional) list of commands allowed to accept multi-line input @@ -372,7 +376,6 @@ class Cmd(cmd.Cmd): super().__init__(completekey=completekey, stdin=stdin, stdout=stdout) # Attributes which should NOT be dynamically settable at runtime - self.allow_cli_args = True # Should arguments passed on the command-line be processed as commands? self.default_to_shell = False # Attempt to run unrecognized commands as shell commands self.quit_on_sigint = False # Quit the loop on interrupt instead of just resetting prompt @@ -400,13 +403,13 @@ class Cmd(cmd.Cmd): 'timing': 'Report execution times'} # Commands to exclude from the help menu and tab completion - self.hidden_commands = ['eof', 'eos', '_relative_load'] + self.hidden_commands = ['eof', '_relative_load'] # Commands to exclude from the history command # initialize history self.persistent_history_length = persistent_history_length self._initialize_history(persistent_history_file) - self.exclude_from_history = '''history edit eof eos'''.split() + self.exclude_from_history = '''history edit eof'''.split() # Command aliases and macros self.macros = dict() @@ -423,10 +426,6 @@ class Cmd(cmd.Cmd): terminators=terminators, multiline_commands=multiline_commands, shortcuts=shortcuts) - self._transcript_files = transcript_files - - # Used to enable the ability for a Python script to quit the application - self._should_quit = False # True if running inside a Python script or interactive console, False otherwise self._in_py = False @@ -435,9 +434,6 @@ class Cmd(cmd.Cmd): # Built-in commands don't make use of this. It is purely there for user-defined commands and convenience. self._last_result = None - # Codes used for exit conditions - self._STOP_AND_EXIT = True # cmd convention - # Used load command to store the current script dir as a LIFO queue to support _relative_load command self._script_dir = [] @@ -466,11 +462,33 @@ class Cmd(cmd.Cmd): # If this string is non-empty, then this warning message will print if a broken pipe error occurs while printing self.broken_pipe_warning = '' + # Commands that will run at the beginning of the command loop + self._startup_commands = [] + # If a startup script is provided, then add it in the queue to load if startup_script is not None: startup_script = os.path.abspath(os.path.expanduser(startup_script)) if os.path.exists(startup_script) and os.path.getsize(startup_script) > 0: - self.cmdqueue.append("load '{}'".format(startup_script)) + self._startup_commands.append("load '{}'".format(startup_script)) + + # Transcript files to run instead of interactive command loop + self._transcript_files = None + + # Check for command line args + if allow_cli_args: + parser = argparse.ArgumentParser() + parser.add_argument('-t', '--test', action="store_true", + help='Test against transcript(s) in FILE (wildcards OK)') + callopts, callargs = parser.parse_known_args() + + # If transcript testing was called for, use other arguments as transcript files + if callopts.test: + self._transcript_files = callargs + # If commands were supplied at invocation, then add them to the command queue + elif callargs: + self._startup_commands.extend(callargs) + elif transcript_files: + self._transcript_files = transcript_files # The default key for sorting tab completion matches. This only applies when the matches are not # already marked as sorted by setting self.matches_sorted to True. Its default value performs a @@ -1166,7 +1184,7 @@ class Cmd(cmd.Cmd): # Find every executable file in the user's path that matches the pattern for path in paths: full_path = os.path.join(path, starts_with) - matches = [f for f in glob.glob(full_path + '*') if os.path.isfile(f) and os.access(f, os.X_OK)] + matches = utils.files_from_glob_pattern(full_path + '*', access=os.X_OK) for match in matches: exes_set.add(os.path.basename(match)) @@ -1689,7 +1707,7 @@ class Cmd(cmd.Cmd): :param pyscript_bridge_call: This should only ever be set to True by PyscriptBridge to signify the beginning of an app() call in a pyscript. It is used to enable/disable the storage of the command's stdout. - :return: True if cmdloop() should exit, False otherwise + :return: True if running of commands should stop """ import datetime @@ -1810,47 +1828,25 @@ class Cmd(cmd.Cmd): except Exception as ex: self.perror(ex) - def runcmds_plus_hooks(self, cmds: List[str]) -> bool: - """Convenience method to run multiple commands by onecmd_plus_hooks. - - This method adds the given cmds to the command queue and processes the - queue until completion or an error causes it to abort. Scripts that are - loaded will have their commands added to the queue. Scripts may even - load other scripts recursively. This means, however, that you should not - use this method if there is a running cmdloop or some other event-loop. - This method is only intended to be used in "one-off" scenarios. - - NOTE: You may need this method even if you only have one command. If - that command is a load, then you will need this command to fully process - all the subsequent commands that are loaded from the script file. This - is an improvement over onecmd_plus_hooks, which expects to be used - inside of a command loop which does the processing of loaded commands. + def runcmds_plus_hooks(self, cmds: List[Union[HistoryItem, str]]) -> bool: + """ + Used when commands are being run in an automated fashion like text scripts or history replays. + The prompt and command line for each command will be printed if echo is True. - Example: cmd_obj.runcmds_plus_hooks(['load myscript.txt']) + :param cmds: commands to run + :return: True if running of commands should stop + """ + for line in cmds: + if isinstance(line, HistoryItem): + line = line.raw - :param cmds: command strings suitable for onecmd_plus_hooks. - :return: True implies the entire application should exit. + if self.echo: + self.poutput('{}{}'.format(self.prompt, line)) - """ - stop = False - self.cmdqueue = list(cmds) + self.cmdqueue - try: - while self.cmdqueue and not stop: - line = self.cmdqueue.pop(0) - if self.echo and line != 'eos': - self.poutput('{}{}'.format(self.prompt, line)) + if self.onecmd_plus_hooks(line): + return True - stop = self.onecmd_plus_hooks(line) - finally: - # Clear out the command queue and script directory stack, just in - # case we hit an error and they were not completed. - self.cmdqueue = [] - self._script_dir = [] - # NOTE: placing this return here inside the finally block will - # swallow exceptions. This is consistent with what is done in - # onecmd_plus_hooks and _cmdloop, although it may not be - # necessary/desired here. - return stop + return False def _complete_statement(self, line: str) -> Statement: """Keep accepting lines of input until the command is complete. @@ -2234,14 +2230,12 @@ class Cmd(cmd.Cmd): return line.strip() - def _cmdloop(self) -> bool: + def _cmdloop(self) -> None: """Repeatedly issue a prompt, accept input, parse an initial prefix off the received input, and dispatch to action methods, passing them the remainder of the line as argument. This serves the same role as cmd.cmdloop(). - - :return: True implies the entire application should exit. """ # An almost perfect copy from Cmd; however, the pseudo_raw_input portion # has been split out so that it can be called separately @@ -2271,25 +2265,21 @@ class Cmd(cmd.Cmd): # Enable tab completion readline.parse_and_bind(self.completekey + ": complete") - stop = False try: - while not stop: - if self.cmdqueue: - # Run command out of cmdqueue if nonempty (populated by load command or commands at invocation) - line = self.cmdqueue.pop(0) + # Run startup commands + stop = self.runcmds_plus_hooks(self._startup_commands) + self._startup_commands.clear() - if self.echo and line != 'eos': - self.poutput('{}{}'.format(self.prompt, line)) - else: - # Otherwise, read a command from stdin - try: - line = self.pseudo_raw_input(self.prompt) - except KeyboardInterrupt as ex: - if self.quit_on_sigint: - raise ex - else: - self.poutput('^C') - line = '' + while not stop: + # Get commands from user + try: + line = self.pseudo_raw_input(self.prompt) + except KeyboardInterrupt as ex: + if self.quit_on_sigint: + raise ex + else: + self.poutput('^C') + line = '' # Run the command along with all associated pre and post hooks stop = self.onecmd_plus_hooks(line) @@ -2307,11 +2297,6 @@ class Cmd(cmd.Cmd): # noinspection PyUnresolvedReferences readline.rl.mode._display_completions = orig_pyreadline_display - self.cmdqueue.clear() - self._script_dir.clear() - - return stop - # ----- Alias sub-command functions ----- def alias_create(self, args: argparse.Namespace) -> None: @@ -2863,14 +2848,14 @@ class Cmd(cmd.Cmd): @with_argparser(ACArgumentParser(epilog=INTERNAL_COMMAND_EPILOG)) def do_eof(self, _: argparse.Namespace) -> bool: """Called when <Ctrl>-D is pressed""" - # End of script should not exit app, but <Ctrl>-D should. - return self._STOP_AND_EXIT + # Return True to stop the command loop + return True @with_argparser(ACArgumentParser()) def do_quit(self, _: argparse.Namespace) -> bool: """Exit this application""" - self._should_quit = True - return self._STOP_AND_EXIT + # Return True to stop the command loop + return True def select(self, opts: Union[str, List[str], List[Tuple[Any, Optional[str]]]], prompt: str = 'Your choice? ') -> str: @@ -2925,10 +2910,8 @@ class Cmd(cmd.Cmd): """ read_only_settings = """ Commands may be terminated with: {} - Arguments at invocation allowed: {} Output redirection and pipes allowed: {}""" - return read_only_settings.format(str(self.statement_parser.terminators), self.allow_cli_args, - self.allow_redirection) + return read_only_settings.format(str(self.statement_parser.terminators), self.allow_redirection) def show(self, args: argparse.Namespace, parameter: str = '') -> None: """Shows current settings of parameters. @@ -3085,6 +3068,8 @@ class Cmd(cmd.Cmd): self.perror(err, traceback_war=False) return False + bridge = PyscriptBridge(self) + try: self._in_py = True @@ -3110,7 +3095,6 @@ class Cmd(cmd.Cmd): raise EmbeddedConsoleExit # Set up Python environment - bridge = PyscriptBridge(self) self.pystate[self.pyscript_name] = bridge self.pystate['run'] = py_run self.pystate['quit'] = py_quit @@ -3255,7 +3239,7 @@ class Cmd(cmd.Cmd): finally: self._in_py = False - return self._should_quit + return bridge.stop pyscript_parser = ACArgumentParser() setattr(pyscript_parser.add_argument('script_path', help='path to the script file'), @@ -3313,7 +3297,9 @@ class Cmd(cmd.Cmd): embed(banner1=banner, exit_msg=exit_msg) load_ipy(bridge) - history_parser = ACArgumentParser() + history_description = "View, run, edit, save, or clear previously entered commands" + + history_parser = ACArgumentParser(description=history_description) history_action_group = history_parser.add_mutually_exclusive_group() history_action_group.add_argument('-r', '--run', action='store_true', help='run selected history items') history_action_group.add_argument('-e', '--edit', action='store_true', @@ -3349,8 +3335,11 @@ class Cmd(cmd.Cmd): history_parser.add_argument('arg', nargs='?', help=history_arg_help) @with_argparser(history_parser) - def do_history(self, args: argparse.Namespace) -> None: - """View, run, edit, save, or clear previously entered commands""" + def do_history(self, args: argparse.Namespace) -> Optional[bool]: + """ + View, run, edit, save, or clear previously entered commands + :return: True if running of commands should stop + """ # -v must be used alone with no other options if args.verbose: @@ -3412,10 +3401,7 @@ class Cmd(cmd.Cmd): self.perror("If this is what you want to do, specify '1:' as the range of history.", traceback_war=False) else: - for runme in history: - self.pfeedback(runme.raw) - if runme: - self.onecmd_plus_hooks(runme.raw) + return self.runcmds_plus_hooks(history) elif args.edit: import tempfile fd, fname = tempfile.mkstemp(suffix='.txt', text=True) @@ -3427,9 +3413,7 @@ class Cmd(cmd.Cmd): fobj.write('{}\n'.format(command.raw)) try: self.do_edit(fname) - self.do_load(fname) - except Exception: - raise + return self.do_load(fname) finally: os.remove(fname) elif args.output_file: @@ -3525,8 +3509,9 @@ class Cmd(cmd.Cmd): self.perror(msg.format(self.persistent_history_file, ex), traceback_war=False) def _generate_transcript(self, history: List[Union[HistoryItem, str]], transcript_file: str) -> None: - """Generate a transcript file from a given history of commands.""" - import io + """ + Generate a transcript file from a given history of commands + """ # Validate the transcript file path to make sure directory exists and write access is available transcript_path = os.path.abspath(os.path.expanduser(transcript_file)) transcript_dir = os.path.dirname(transcript_path) @@ -3535,6 +3520,7 @@ class Cmd(cmd.Cmd): traceback_war=False) return + commands_run = 0 try: with self.sigint_protection: # Disable echo while we manually redirect stdout to a StringIO buffer @@ -3566,24 +3552,31 @@ class Cmd(cmd.Cmd): else: command += '{}{}\n'.format(self.continuation_prompt, line) transcript += command - # create a new string buffer and set it to stdout to catch the output - # of the command - membuf = io.StringIO() - self.stdout = membuf + + # Use a StdSim object to capture output + self.stdout = utils.StdSim(self.stdout) + # then run the command and let the output go into our buffer - self.onecmd_plus_hooks(history_item) - # rewind the buffer to the beginning - membuf.seek(0) - # get the output out of the buffer - output = membuf.read() - # and add the regex-escaped output to the transcript - transcript += output.replace('/', r'\/') + stop = self.onecmd_plus_hooks(history_item) + commands_run += 1 + + # add the regex-escaped output to the transcript + transcript += self.stdout.getvalue().replace('/', r'\/') + + # check if we are supposed to stop + if stop: + break finally: with self.sigint_protection: # Restore altered attributes to their original state self.echo = saved_echo self.stdout = saved_stdout + # Check if all commands ran + if commands_run < len(history): + warning = "Command {} triggered a stop and ended transcript generation early".format(commands_run) + self.perror(warning, err_color=Fore.LIGHTYELLOW_EX, traceback_war=False) + # finally, we can write the transcript out to the file try: with open(transcript_file, 'w') as fout: @@ -3592,12 +3585,12 @@ class Cmd(cmd.Cmd): self.perror('Failed to save transcript: {}'.format(ex), traceback_war=False) else: # and let the user know what we did - if len(history) > 1: + if commands_run > 1: plural = 'commands and their outputs' else: plural = 'command and its output' msg = '{} {} saved to transcript file {!r}' - self.pfeedback(msg.format(len(history), plural, transcript_file)) + self.pfeedback(msg.format(commands_run, plural, transcript_file)) edit_description = ("Edit a file in a text editor\n" "\n" @@ -3629,22 +3622,11 @@ class Cmd(cmd.Cmd): else: return None - @with_argparser(ACArgumentParser(epilog=INTERNAL_COMMAND_EPILOG)) - def do_eos(self, _: argparse.Namespace) -> None: - """Handle cleanup when a script has finished executing""" - if self._script_dir: - self._script_dir.pop() - load_description = ("Run commands in script file that is encoded as either ASCII or UTF-8 text\n" "\n" "Script should contain one command per line, just like the command would be\n" "typed in the console.\n" "\n" - "It loads commands from a script file into a queue and then the normal cmd2\n" - "REPL resumes control and executes the commands in the queue in FIFO order.\n" - "If you attempt to redirect/pipe a load command, it will capture the output\n" - "of the load command itself, not what it adds to the queue.\n" - "\n" "If the -r/--record_transcript flag is used, this command instead records\n" "the output of the script commands to a transcript for testing purposes.\n" ) @@ -3656,8 +3638,11 @@ class Cmd(cmd.Cmd): ACTION_ARG_CHOICES, ('path_complete',)) @with_argparser(load_parser) - def do_load(self, args: argparse.Namespace) -> None: - """Run commands in script file that is encoded as either ASCII or UTF-8 text""" + def do_load(self, args: argparse.Namespace) -> Optional[bool]: + """ + Run commands in script file that is encoded as either ASCII or UTF-8 text + :return: True if running of commands should stop + """ expanded_path = os.path.abspath(os.path.expanduser(args.script_path)) # Make sure the path exists and we can access it @@ -3681,21 +3666,28 @@ class Cmd(cmd.Cmd): return try: - # Read all lines of the script and insert into the head of the - # command queue. Add an "end of script (eos)" command to cleanup the - # self._script_dir list when done. + # Read all lines of the script with open(expanded_path, encoding='utf-8') as target: script_commands = target.read().splitlines() except OSError as ex: # pragma: no cover self.perror("Problem accessing script from '{}': {}".format(expanded_path, ex)) return - if args.transcript: - self._generate_transcript(script_commands, args.transcript) - return + orig_script_dir_count = len(self._script_dir) - self.cmdqueue = script_commands + ['eos'] + self.cmdqueue - self._script_dir.append(os.path.dirname(expanded_path)) + try: + self._script_dir.append(os.path.dirname(expanded_path)) + + if args.transcript: + self._generate_transcript(script_commands, os.path.expanduser(args.transcript)) + else: + return self.runcmds_plus_hooks(script_commands) + + finally: + with self.sigint_protection: + # Check if a script dir was added before an exception occurred + if orig_script_dir_count != len(self._script_dir): + self._script_dir.pop() relative_load_description = load_description relative_load_description += ("\n\n" @@ -3709,20 +3701,23 @@ class Cmd(cmd.Cmd): relative_load_parser.add_argument('file_path', help='a file path pointing to a script') @with_argparser(relative_load_parser) - def do__relative_load(self, args: argparse.Namespace) -> None: - """Run commands in script file that is encoded as either ASCII or UTF-8 text""" + def do__relative_load(self, args: argparse.Namespace) -> Optional[bool]: + """ + Run commands in script file that is encoded as either ASCII or UTF-8 text + :return: True if running of commands should stop + """ file_path = args.file_path # NOTE: Relative path is an absolute path, it is just relative to the current script directory relative_path = os.path.join(self._current_script_dir or '', file_path) - self.do_load(relative_path) + return self.do_load(relative_path) - def run_transcript_tests(self, callargs: List[str]) -> None: + def run_transcript_tests(self, transcript_paths: List[str]) -> None: """Runs transcript tests for provided file(s). This is called when either -t is provided on the command line or the transcript_files argument is provided during construction of the cmd2.Cmd instance. - :param callargs: list of transcript test file names + :param transcript_paths: list of transcript test file paths """ import unittest from .transcript import Cmd2TestCase @@ -3730,7 +3725,14 @@ class Cmd(cmd.Cmd): class TestMyAppCase(Cmd2TestCase): cmdapp = self - self.__class__.testfiles = callargs + # Validate that there is at least one transcript file + transcripts_expanded = utils.files_from_glob_patterns(transcript_paths, access=os.R_OK) + if not transcripts_expanded: + self.perror('No test files found - nothing to test', traceback_war=False) + self.exit_code = -1 + return + + self.__class__.testfiles = transcripts_expanded sys.argv = [sys.argv[0]] # the --test argument upsets unittest.main() testcase = TestMyAppCase() stream = utils.StdSim(sys.stderr) @@ -3740,7 +3742,7 @@ class Cmd(cmd.Cmd): self.decolorized_write(sys.stderr, stream.read()) self.poutput('Tests passed', color=Fore.LIGHTGREEN_EX) else: - # Strip off the initial trackeback which isn't particularly useful for end users + # Strip off the initial traceback which isn't particularly useful for end users error_str = stream.read() end_of_trace = error_str.find('AssertionError:') file_offset = error_str[end_of_trace:].find('File ') @@ -4003,7 +4005,6 @@ class Cmd(cmd.Cmd): _cmdloop() provides the main loop equivalent to cmd.cmdloop(). This is a wrapper around that which deals with the following extra features provided by cmd2: - - commands at invocation - transcript testing - intro banner - exit code @@ -4020,20 +4021,6 @@ class Cmd(cmd.Cmd): original_sigint_handler = signal.getsignal(signal.SIGINT) signal.signal(signal.SIGINT, self.sigint_handler) - if self.allow_cli_args: - parser = argparse.ArgumentParser() - parser.add_argument('-t', '--test', action="store_true", - help='Test against transcript(s) in FILE (wildcards OK)') - callopts, callargs = parser.parse_known_args() - - # If transcript testing was called for, use other arguments as transcript files - if callopts.test: - self._transcript_files = callargs - - # If commands were supplied at invocation, then add them to the command queue - if callargs: - self.cmdqueue.extend(callargs) - # Grab terminal lock before the prompt has been drawn by readline self.terminal_lock.acquire() |