diff options
Diffstat (limited to 'cmd2/cmd2.py')
-rw-r--r-- | cmd2/cmd2.py | 120 |
1 files changed, 118 insertions, 2 deletions
diff --git a/cmd2/cmd2.py b/cmd2/cmd2.py index e2fd25fa..6e8db039 100644 --- a/cmd2/cmd2.py +++ b/cmd2/cmd2.py @@ -34,6 +34,7 @@ import cmd import collections from colorama import Fore import glob +import inspect import os import platform import re @@ -43,6 +44,7 @@ from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple, Union from . import constants from . import utils +from . import plugin from .argparse_completer import AutoCompleter, ACArgumentParser from .clipboard import can_clip, get_paste_buffer, write_to_paste_buffer from .parsing import StatementParser, Statement @@ -382,6 +384,10 @@ class Cmd(cmd.Cmd): import atexit atexit.register(readline.write_history_file, persistent_history_file) + # initialize plugin system + # needs to be done before we call __init__(0) + self._initialize_plugin_system() + # Call super class constructor super().__init__(completekey=completekey, stdin=stdin, stdout=stdout) @@ -1684,9 +1690,17 @@ class Cmd(cmd.Cmd): stop = False try: statement = self._complete_statement(line) - (stop, statement) = self.postparsing_precmd(statement) + # call the postparsing hooks + for func in self._postparsing_hooks: + (stop, statement) = func(statement) + if stop: + break + if not stop: + (stop, statement) = self.postparsing_precmd(statement) if stop: - return self.postparsing_postcmd(stop) + # we need to not run the command, but + # we need to run the finalization hooks + raise EmptyStatement try: if self.allow_redirection: @@ -1694,9 +1708,23 @@ class Cmd(cmd.Cmd): timestart = datetime.datetime.now() if self._in_py: self._last_result = None + + # precommand hooks + for func in self._precmd_hooks: + data = plugin.PrecommandData(statement) + result = func(data) + statement = result.statement + # call precmd() for compatibility with cmd.Cmd statement = self.precmd(statement) + + # go run the command function stop = self.onecmd(statement) + + # postcommand hooks + for func in self._postcmd_hooks: + stop = func(stop, statement) stop = self.postcmd(stop, statement) + if self.timing: self.pfeedback('Elapsed: %s' % str(datetime.datetime.now() - timestart)) finally: @@ -3064,6 +3092,8 @@ Script should contain one command per line, just like command would be typed in self.cmdqueue.extend(callargs) # Always run the preloop first + for func in self._preloop_hooks: + func() self.preloop() # If transcript-based regression testing was requested, then do that instead of the main loop @@ -3082,8 +3112,94 @@ Script should contain one command per line, just like command would be typed in self._cmdloop() # Run the postloop() no matter what + for func in self._postloop_hooks: + func() self.postloop() + ### + # + # plugin related functions + # + ### + def _initialize_plugin_system(self): + """Initialize the plugin system""" + self._preloop_hooks = [] + self._postloop_hooks = [] + self._postparsing_hooks = [] + self._precmd_hooks = [] + self._postcmd_hooks = [] + + @classmethod + def _validate_callable_param_count(cls, func, count): + signature = inspect.signature(func) + # validate that the callable has the right number of parameters + nparam = len(signature.parameters) + if nparam != count: + raise TypeError('{} has {} positional arguments, expected {}'.format( + func.__name__, + nparam, + count, + )) + + @classmethod + def _validate_prepostloop_callable(cls, func): + """Check parameter and return values for preloop and postloop hooks""" + cls._validate_callable_param_count(func, 0) + # make sure there is no return notation + signature = inspect.signature(func) + if signature.return_annotation != None: + raise TypeError("{} must declare return a return type of 'None'".format( + func.__name__, + )) + + def register_preloop_hook(self, func): + """Register a function to be called at the beginning of the command loop.""" + self._validate_prepostloop_callable(func) + self._preloop_hooks.append(func) + + def register_postloop_hook(self, func): + """Register a function to be called at the end of the command loop.""" + self._validate_prepostloop_callable(func) + self._postloop_hooks.append(func) + + def register_postparsing_hook(self, func): + """Register a function to be called after parsing user input but before running the command""" + self._postparsing_hooks.append(func) + # TODO check signature of registered func and throw error if it's wrong + + def register_precmd_hook(self, func): + """Register a function to be called before the command function.""" + signature = inspect.signature(func) + # validate that the callable has the right number of parameters + self._validate_callable_param_count(func, 1) + # validate the parameter has the right annotation + paramname = list(signature.parameters.keys())[0] + param = signature.parameters[paramname] + if param.annotation != plugin.PrecommandData: + raise TypeError('argument 1 of {} has incompatible type {}, expected {}'.format( + func.__name__, + param.annotation, + plugin.PrecommandData, + )) + # validate the return value has the right annotation + if signature.return_annotation == signature.empty: + raise TypeError('{} does not have a declared return type, expected {}'.format( + func.__name__, + plugin.PrecommandData, + )) + if signature.return_annotation != plugin.PrecommandData: + raise TypeError('{} has incompatible return type {}, expected {}'.format( + func.__name__, + signature.return_annotation, + plugin.PrecommandData, + )) + self._precmd_hooks.append(func) + + def register_postcmd_hook(self, func): + """Register a function to be called after the command function.""" + self._postcmd_hooks.append(func) + # TODO check signature of registered func and throw error if it's wrong + class History(list): """ A list of HistoryItems that knows how to respond to user requests. """ |