summaryrefslogtreecommitdiff
path: root/cmd2/cmd2.py
diff options
context:
space:
mode:
Diffstat (limited to 'cmd2/cmd2.py')
-rw-r--r--cmd2/cmd2.py199
1 files changed, 194 insertions, 5 deletions
diff --git a/cmd2/cmd2.py b/cmd2/cmd2.py
index 42e00c39..8db7cef3 100644
--- a/cmd2/cmd2.py
+++ b/cmd2/cmd2.py
@@ -34,6 +34,7 @@ import cmd
import collections
from colorama import Fore
import glob
+import inspect
import os
import platform
import re
@@ -43,6 +44,7 @@ from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple, Union
from . import constants
from . import utils
+from . import plugin
from .argparse_completer import AutoCompleter, ACArgumentParser
from .clipboard import can_clip, get_paste_buffer, write_to_paste_buffer
from .parsing import StatementParser, Statement
@@ -381,6 +383,10 @@ class Cmd(cmd.Cmd):
import atexit
atexit.register(readline.write_history_file, persistent_history_file)
+ # initialize plugin system
+ # needs to be done before we call __init__(0)
+ self._initialize_plugin_system()
+
# Call super class constructor
super().__init__(completekey=completekey, stdin=stdin, stdout=stdout)
@@ -1680,12 +1686,35 @@ class Cmd(cmd.Cmd):
:return: True if cmdloop() should exit, False otherwise
"""
import datetime
+
stop = False
try:
statement = self._complete_statement(line)
- (stop, statement) = self.postparsing_precmd(statement)
+ except EmptyStatement:
+ return self._run_cmdfinalization_hooks(stop, None)
+ except ValueError as ex:
+ # If shlex.split failed on syntax, let user know whats going on
+ self.perror("Invalid syntax: {}".format(ex), traceback_war=False)
+ return stop
+
+ # now that we have a statement, run it with all the hooks
+ try:
+ # call the postparsing hooks
+ data = plugin.PostparsingData(False, statement)
+ for func in self._postparsing_hooks:
+ data = func(data)
+ if data.stop:
+ break
+ # postparsing_precmd is deprecated
+ if not data.stop:
+ (data.stop, data.statement) = self.postparsing_precmd(data.statement)
+ # unpack the data object
+ statement = data.statement
+ stop = data.stop
if stop:
- return self.postparsing_postcmd(stop)
+ # we should not run the command, but
+ # we need to run the finalization hooks
+ raise EmptyStatement
try:
if self.allow_redirection:
@@ -1693,23 +1722,53 @@ class Cmd(cmd.Cmd):
timestart = datetime.datetime.now()
if self._in_py:
self._last_result = None
+
+ # precommand hooks
+ data = plugin.PrecommandData(statement)
+ for func in self._precmd_hooks:
+ data = func(data)
+ statement = data.statement
+ # call precmd() for compatibility with cmd.Cmd
statement = self.precmd(statement)
+
+ # go run the command function
stop = self.onecmd(statement)
+
+ # postcommand hooks
+ data = plugin.PostcommandData(stop, statement)
+ for func in self._postcmd_hooks:
+ data = func(data)
+ # retrieve the final value of stop, ignoring any statement modification from the hooks
+ stop = data.stop
+ # call postcmd() for compatibility with cmd.Cmd
stop = self.postcmd(stop, statement)
+
if self.timing:
self.pfeedback('Elapsed: %s' % str(datetime.datetime.now() - timestart))
finally:
if self.allow_redirection and self.redirecting:
self._restore_output(statement)
except EmptyStatement:
+ # don't do anything, but do allow command finalization hooks to run
pass
- except ValueError as ex:
- # If shlex.split failed on syntax, let user know whats going on
- self.perror("Invalid syntax: {}".format(ex), traceback_war=False)
except Exception as ex:
self.perror(ex)
finally:
+ return self._run_cmdfinalization_hooks(stop, statement)
+
+ def _run_cmdfinalization_hooks(self, stop: bool, statement: Statement) -> bool:
+ """Run the command finalization hooks"""
+ try:
+ data = plugin.CommandFinalizationData(stop, statement)
+ for func in self._cmdfinalization_hooks:
+ data = func(data)
+ # retrieve the final value of stop, ignoring any
+ # modifications to the statement
+ stop = data.stop
+ # postparsing_postcmd is deprecated
return self.postparsing_postcmd(stop)
+ except Exception as ex:
+ self.perror(ex)
def runcmds_plus_hooks(self, cmds: List[str]) -> bool:
"""Convenience method to run multiple commands by onecmd_plus_hooks.
@@ -3063,6 +3122,8 @@ Script should contain one command per line, just like command would be typed in
self.cmdqueue.extend(callargs)
# Always run the preloop first
+ for func in self._preloop_hooks:
+ func()
self.preloop()
# If transcript-based regression testing was requested, then do that instead of the main loop
@@ -3081,8 +3142,136 @@ Script should contain one command per line, just like command would be typed in
self._cmdloop()
# Run the postloop() no matter what
+ for func in self._postloop_hooks:
+ func()
self.postloop()
+ ###
+ #
+ # plugin related functions
+ #
+ ###
+ def _initialize_plugin_system(self):
+ """Initialize the plugin system"""
+ self._preloop_hooks = []
+ self._postloop_hooks = []
+ self._postparsing_hooks = []
+ self._precmd_hooks = []
+ self._postcmd_hooks = []
+ self._cmdfinalization_hooks = []
+
+ @classmethod
+ def _validate_callable_param_count(cls, func, count):
+ """Ensure a function has the given number of parameters."""
+ signature = inspect.signature(func)
+ # validate that the callable has the right number of parameters
+ nparam = len(signature.parameters)
+ if nparam != count:
+ raise TypeError('{} has {} positional arguments, expected {}'.format(
+ func.__name__,
+ nparam,
+ count,
+ ))
+
+ @classmethod
+ def _validate_prepostloop_callable(cls, func):
+ """Check parameter and return types for preloop and postloop hooks."""
+ cls._validate_callable_param_count(func, 0)
+ # make sure there is no return notation
+ signature = inspect.signature(func)
+ if signature.return_annotation != None:
+ raise TypeError("{} must declare return a return type of 'None'".format(
+ func.__name__,
+ ))
+
+ def register_preloop_hook(self, func):
+ """Register a function to be called at the beginning of the command loop."""
+ self._validate_prepostloop_callable(func)
+ self._preloop_hooks.append(func)
+
+ def register_postloop_hook(self, func):
+ """Register a function to be called at the end of the command loop."""
+ self._validate_prepostloop_callable(func)
+ self._postloop_hooks.append(func)
+
+ @classmethod
+ def _validate_postparsing_callable(cls, func):
+ """Check parameter and return types for postparsing hooks"""
+ cls._validate_callable_param_count(func, 1)
+ signature = inspect.signature(func)
+ _, param = list(signature.parameters.items())[0]
+ if param.annotation != plugin.PostparsingData:
+ raise TypeError("{} must have one parameter declared with type 'cmd2.plugin.PostparsingData'".format(
+ func.__name__
+ ))
+ if signature.return_annotation != plugin.PostparsingData:
+ raise TypeError("{} must declare return a return type of 'cmd2.plugin.PostparsingData'".format(
+ func.__name__
+ ))
+
+ def register_postparsing_hook(self, func):
+ """Register a function to be called after parsing user input but before running the command"""
+ self._validate_postparsing_callable(func)
+ self._postparsing_hooks.append(func)
+
+ @classmethod
+ def _validate_prepostcmd_hook(cls, func, data_type):
+ """Check parameter and return types for pre and post command hooks."""
+ signature = inspect.signature(func)
+ # validate that the callable has the right number of parameters
+ cls._validate_callable_param_count(func, 1)
+ # validate the parameter has the right annotation
+ paramname = list(signature.parameters.keys())[0]
+ param = signature.parameters[paramname]
+ if param.annotation != data_type:
+ raise TypeError('argument 1 of {} has incompatible type {}, expected {}'.format(
+ func.__name__,
+ param.annotation,
+ data_type,
+ ))
+ # validate the return value has the right annotation
+ if signature.return_annotation == signature.empty:
+ raise TypeError('{} does not have a declared return type, expected {}'.format(
+ func.__name__,
+ data_type,
+ ))
+ if signature.return_annotation != data_type:
+ raise TypeError('{} has incompatible return type {}, expected {}'.format(
+ func.__name__,
+ signature.return_annotation,
+ data_type,
+ ))
+
+ def register_precmd_hook(self, func):
+ """Register a hook to be called before the command function."""
+ self._validate_prepostcmd_hook(func, plugin.PrecommandData)
+ self._precmd_hooks.append(func)
+
+ def register_postcmd_hook(self, func):
+ """Register a hook to be called after the command function."""
+ self._validate_prepostcmd_hook(func, plugin.PostcommandData)
+ self._postcmd_hooks.append(func)
+
+ @classmethod
+ def _validate_cmdfinalization_callable(cls, func):
+ """Check parameter and return types for command finalization hooks."""
+ cls._validate_callable_param_count(func, 1)
+ signature = inspect.signature(func)
+ _, param = list(signature.parameters.items())[0]
+ if param.annotation != plugin.CommandFinalizationData:
+ raise TypeError("{} must have one parameter declared with type 'cmd2.plugin.CommandFinalizationData'".format(
+ func.__name__
+ ))
+ if signature.return_annotation != plugin.CommandFinalizationData:
+ raise TypeError("{} must declare return a return type of 'cmd2.plugin.CommandFinalizationData'".format(
+ func.__name__
+ ))
+ pass
+
+ def register_cmdfinalization_hook(self, func):
+ """Register a hook to be called after a command is completed, whether it completes successfully or not."""
+ self._validate_cmdfinalization_callable(func)
+ self._cmdfinalization_hooks.append(func)
class History(list):
""" A list of HistoryItems that knows how to respond to user requests. """