diff options
Diffstat (limited to 'cmd2/cmd2.py')
-rw-r--r-- | cmd2/cmd2.py | 199 |
1 files changed, 194 insertions, 5 deletions
diff --git a/cmd2/cmd2.py b/cmd2/cmd2.py index 42e00c39..8db7cef3 100644 --- a/cmd2/cmd2.py +++ b/cmd2/cmd2.py @@ -34,6 +34,7 @@ import cmd import collections from colorama import Fore import glob +import inspect import os import platform import re @@ -43,6 +44,7 @@ from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple, Union from . import constants from . import utils +from . import plugin from .argparse_completer import AutoCompleter, ACArgumentParser from .clipboard import can_clip, get_paste_buffer, write_to_paste_buffer from .parsing import StatementParser, Statement @@ -381,6 +383,10 @@ class Cmd(cmd.Cmd): import atexit atexit.register(readline.write_history_file, persistent_history_file) + # initialize plugin system + # needs to be done before we call __init__(0) + self._initialize_plugin_system() + # Call super class constructor super().__init__(completekey=completekey, stdin=stdin, stdout=stdout) @@ -1680,12 +1686,35 @@ class Cmd(cmd.Cmd): :return: True if cmdloop() should exit, False otherwise """ import datetime + stop = False try: statement = self._complete_statement(line) - (stop, statement) = self.postparsing_precmd(statement) + except EmptyStatement: + return self._run_cmdfinalization_hooks(stop, None) + except ValueError as ex: + # If shlex.split failed on syntax, let user know whats going on + self.perror("Invalid syntax: {}".format(ex), traceback_war=False) + return stop + + # now that we have a statement, run it with all the hooks + try: + # call the postparsing hooks + data = plugin.PostparsingData(False, statement) + for func in self._postparsing_hooks: + data = func(data) + if data.stop: + break + # postparsing_precmd is deprecated + if not data.stop: + (data.stop, data.statement) = self.postparsing_precmd(data.statement) + # unpack the data object + statement = data.statement + stop = data.stop if stop: - return self.postparsing_postcmd(stop) + # we should not run the command, but + # we need to run the finalization hooks + raise EmptyStatement try: if self.allow_redirection: @@ -1693,23 +1722,53 @@ class Cmd(cmd.Cmd): timestart = datetime.datetime.now() if self._in_py: self._last_result = None + + # precommand hooks + data = plugin.PrecommandData(statement) + for func in self._precmd_hooks: + data = func(data) + statement = data.statement + # call precmd() for compatibility with cmd.Cmd statement = self.precmd(statement) + + # go run the command function stop = self.onecmd(statement) + + # postcommand hooks + data = plugin.PostcommandData(stop, statement) + for func in self._postcmd_hooks: + data = func(data) + # retrieve the final value of stop, ignoring any statement modification from the hooks + stop = data.stop + # call postcmd() for compatibility with cmd.Cmd stop = self.postcmd(stop, statement) + if self.timing: self.pfeedback('Elapsed: %s' % str(datetime.datetime.now() - timestart)) finally: if self.allow_redirection and self.redirecting: self._restore_output(statement) except EmptyStatement: + # don't do anything, but do allow command finalization hooks to run pass - except ValueError as ex: - # If shlex.split failed on syntax, let user know whats going on - self.perror("Invalid syntax: {}".format(ex), traceback_war=False) except Exception as ex: self.perror(ex) finally: + return self._run_cmdfinalization_hooks(stop, statement) + + def _run_cmdfinalization_hooks(self, stop: bool, statement: Statement) -> bool: + """Run the command finalization hooks""" + try: + data = plugin.CommandFinalizationData(stop, statement) + for func in self._cmdfinalization_hooks: + data = func(data) + # retrieve the final value of stop, ignoring any + # modifications to the statement + stop = data.stop + # postparsing_postcmd is deprecated return self.postparsing_postcmd(stop) + except Exception as ex: + self.perror(ex) def runcmds_plus_hooks(self, cmds: List[str]) -> bool: """Convenience method to run multiple commands by onecmd_plus_hooks. @@ -3063,6 +3122,8 @@ Script should contain one command per line, just like command would be typed in self.cmdqueue.extend(callargs) # Always run the preloop first + for func in self._preloop_hooks: + func() self.preloop() # If transcript-based regression testing was requested, then do that instead of the main loop @@ -3081,8 +3142,136 @@ Script should contain one command per line, just like command would be typed in self._cmdloop() # Run the postloop() no matter what + for func in self._postloop_hooks: + func() self.postloop() + ### + # + # plugin related functions + # + ### + def _initialize_plugin_system(self): + """Initialize the plugin system""" + self._preloop_hooks = [] + self._postloop_hooks = [] + self._postparsing_hooks = [] + self._precmd_hooks = [] + self._postcmd_hooks = [] + self._cmdfinalization_hooks = [] + + @classmethod + def _validate_callable_param_count(cls, func, count): + """Ensure a function has the given number of parameters.""" + signature = inspect.signature(func) + # validate that the callable has the right number of parameters + nparam = len(signature.parameters) + if nparam != count: + raise TypeError('{} has {} positional arguments, expected {}'.format( + func.__name__, + nparam, + count, + )) + + @classmethod + def _validate_prepostloop_callable(cls, func): + """Check parameter and return types for preloop and postloop hooks.""" + cls._validate_callable_param_count(func, 0) + # make sure there is no return notation + signature = inspect.signature(func) + if signature.return_annotation != None: + raise TypeError("{} must declare return a return type of 'None'".format( + func.__name__, + )) + + def register_preloop_hook(self, func): + """Register a function to be called at the beginning of the command loop.""" + self._validate_prepostloop_callable(func) + self._preloop_hooks.append(func) + + def register_postloop_hook(self, func): + """Register a function to be called at the end of the command loop.""" + self._validate_prepostloop_callable(func) + self._postloop_hooks.append(func) + + @classmethod + def _validate_postparsing_callable(cls, func): + """Check parameter and return types for postparsing hooks""" + cls._validate_callable_param_count(func, 1) + signature = inspect.signature(func) + _, param = list(signature.parameters.items())[0] + if param.annotation != plugin.PostparsingData: + raise TypeError("{} must have one parameter declared with type 'cmd2.plugin.PostparsingData'".format( + func.__name__ + )) + if signature.return_annotation != plugin.PostparsingData: + raise TypeError("{} must declare return a return type of 'cmd2.plugin.PostparsingData'".format( + func.__name__ + )) + + def register_postparsing_hook(self, func): + """Register a function to be called after parsing user input but before running the command""" + self._validate_postparsing_callable(func) + self._postparsing_hooks.append(func) + + @classmethod + def _validate_prepostcmd_hook(cls, func, data_type): + """Check parameter and return types for pre and post command hooks.""" + signature = inspect.signature(func) + # validate that the callable has the right number of parameters + cls._validate_callable_param_count(func, 1) + # validate the parameter has the right annotation + paramname = list(signature.parameters.keys())[0] + param = signature.parameters[paramname] + if param.annotation != data_type: + raise TypeError('argument 1 of {} has incompatible type {}, expected {}'.format( + func.__name__, + param.annotation, + data_type, + )) + # validate the return value has the right annotation + if signature.return_annotation == signature.empty: + raise TypeError('{} does not have a declared return type, expected {}'.format( + func.__name__, + data_type, + )) + if signature.return_annotation != data_type: + raise TypeError('{} has incompatible return type {}, expected {}'.format( + func.__name__, + signature.return_annotation, + data_type, + )) + + def register_precmd_hook(self, func): + """Register a hook to be called before the command function.""" + self._validate_prepostcmd_hook(func, plugin.PrecommandData) + self._precmd_hooks.append(func) + + def register_postcmd_hook(self, func): + """Register a hook to be called after the command function.""" + self._validate_prepostcmd_hook(func, plugin.PostcommandData) + self._postcmd_hooks.append(func) + + @classmethod + def _validate_cmdfinalization_callable(cls, func): + """Check parameter and return types for command finalization hooks.""" + cls._validate_callable_param_count(func, 1) + signature = inspect.signature(func) + _, param = list(signature.parameters.items())[0] + if param.annotation != plugin.CommandFinalizationData: + raise TypeError("{} must have one parameter declared with type 'cmd2.plugin.CommandFinalizationData'".format( + func.__name__ + )) + if signature.return_annotation != plugin.CommandFinalizationData: + raise TypeError("{} must declare return a return type of 'cmd2.plugin.CommandFinalizationData'".format( + func.__name__ + )) + pass + + def register_cmdfinalization_hook(self, func): + """Register a hook to be called after a command is completed, whether it completes successfully or not.""" + self._validate_cmdfinalization_callable(func) + self._cmdfinalization_hooks.append(func) class History(list): """ A list of HistoryItems that knows how to respond to user requests. """ |