summaryrefslogtreecommitdiff
path: root/cmd2
diff options
context:
space:
mode:
Diffstat (limited to 'cmd2')
-rw-r--r--cmd2/cmd2.py199
-rw-r--r--cmd2/parsing.py108
-rw-r--r--cmd2/plugin.py23
3 files changed, 287 insertions, 43 deletions
diff --git a/cmd2/cmd2.py b/cmd2/cmd2.py
index 42e00c39..8db7cef3 100644
--- a/cmd2/cmd2.py
+++ b/cmd2/cmd2.py
@@ -34,6 +34,7 @@ import cmd
import collections
from colorama import Fore
import glob
+import inspect
import os
import platform
import re
@@ -43,6 +44,7 @@ from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple, Union
from . import constants
from . import utils
+from . import plugin
from .argparse_completer import AutoCompleter, ACArgumentParser
from .clipboard import can_clip, get_paste_buffer, write_to_paste_buffer
from .parsing import StatementParser, Statement
@@ -381,6 +383,10 @@ class Cmd(cmd.Cmd):
import atexit
atexit.register(readline.write_history_file, persistent_history_file)
+ # initialize plugin system
+ # needs to be done before we call __init__(0)
+ self._initialize_plugin_system()
+
# Call super class constructor
super().__init__(completekey=completekey, stdin=stdin, stdout=stdout)
@@ -1680,12 +1686,35 @@ class Cmd(cmd.Cmd):
:return: True if cmdloop() should exit, False otherwise
"""
import datetime
+
stop = False
try:
statement = self._complete_statement(line)
- (stop, statement) = self.postparsing_precmd(statement)
+ except EmptyStatement:
+ return self._run_cmdfinalization_hooks(stop, None)
+ except ValueError as ex:
+ # If shlex.split failed on syntax, let user know whats going on
+ self.perror("Invalid syntax: {}".format(ex), traceback_war=False)
+ return stop
+
+ # now that we have a statement, run it with all the hooks
+ try:
+ # call the postparsing hooks
+ data = plugin.PostparsingData(False, statement)
+ for func in self._postparsing_hooks:
+ data = func(data)
+ if data.stop:
+ break
+ # postparsing_precmd is deprecated
+ if not data.stop:
+ (data.stop, data.statement) = self.postparsing_precmd(data.statement)
+ # unpack the data object
+ statement = data.statement
+ stop = data.stop
if stop:
- return self.postparsing_postcmd(stop)
+ # we should not run the command, but
+ # we need to run the finalization hooks
+ raise EmptyStatement
try:
if self.allow_redirection:
@@ -1693,23 +1722,53 @@ class Cmd(cmd.Cmd):
timestart = datetime.datetime.now()
if self._in_py:
self._last_result = None
+
+ # precommand hooks
+ data = plugin.PrecommandData(statement)
+ for func in self._precmd_hooks:
+ data = func(data)
+ statement = data.statement
+ # call precmd() for compatibility with cmd.Cmd
statement = self.precmd(statement)
+
+ # go run the command function
stop = self.onecmd(statement)
+
+ # postcommand hooks
+ data = plugin.PostcommandData(stop, statement)
+ for func in self._postcmd_hooks:
+ data = func(data)
+ # retrieve the final value of stop, ignoring any statement modification from the hooks
+ stop = data.stop
+ # call postcmd() for compatibility with cmd.Cmd
stop = self.postcmd(stop, statement)
+
if self.timing:
self.pfeedback('Elapsed: %s' % str(datetime.datetime.now() - timestart))
finally:
if self.allow_redirection and self.redirecting:
self._restore_output(statement)
except EmptyStatement:
+ # don't do anything, but do allow command finalization hooks to run
pass
- except ValueError as ex:
- # If shlex.split failed on syntax, let user know whats going on
- self.perror("Invalid syntax: {}".format(ex), traceback_war=False)
except Exception as ex:
self.perror(ex)
finally:
+ return self._run_cmdfinalization_hooks(stop, statement)
+
+ def _run_cmdfinalization_hooks(self, stop: bool, statement: Statement) -> bool:
+ """Run the command finalization hooks"""
+ try:
+ data = plugin.CommandFinalizationData(stop, statement)
+ for func in self._cmdfinalization_hooks:
+ data = func(data)
+ # retrieve the final value of stop, ignoring any
+ # modifications to the statement
+ stop = data.stop
+ # postparsing_postcmd is deprecated
return self.postparsing_postcmd(stop)
+ except Exception as ex:
+ self.perror(ex)
def runcmds_plus_hooks(self, cmds: List[str]) -> bool:
"""Convenience method to run multiple commands by onecmd_plus_hooks.
@@ -3063,6 +3122,8 @@ Script should contain one command per line, just like command would be typed in
self.cmdqueue.extend(callargs)
# Always run the preloop first
+ for func in self._preloop_hooks:
+ func()
self.preloop()
# If transcript-based regression testing was requested, then do that instead of the main loop
@@ -3081,8 +3142,136 @@ Script should contain one command per line, just like command would be typed in
self._cmdloop()
# Run the postloop() no matter what
+ for func in self._postloop_hooks:
+ func()
self.postloop()
+ ###
+ #
+ # plugin related functions
+ #
+ ###
+ def _initialize_plugin_system(self):
+ """Initialize the plugin system"""
+ self._preloop_hooks = []
+ self._postloop_hooks = []
+ self._postparsing_hooks = []
+ self._precmd_hooks = []
+ self._postcmd_hooks = []
+ self._cmdfinalization_hooks = []
+
+ @classmethod
+ def _validate_callable_param_count(cls, func, count):
+ """Ensure a function has the given number of parameters."""
+ signature = inspect.signature(func)
+ # validate that the callable has the right number of parameters
+ nparam = len(signature.parameters)
+ if nparam != count:
+ raise TypeError('{} has {} positional arguments, expected {}'.format(
+ func.__name__,
+ nparam,
+ count,
+ ))
+
+ @classmethod
+ def _validate_prepostloop_callable(cls, func):
+ """Check parameter and return types for preloop and postloop hooks."""
+ cls._validate_callable_param_count(func, 0)
+ # make sure there is no return notation
+ signature = inspect.signature(func)
+ if signature.return_annotation != None:
+ raise TypeError("{} must declare return a return type of 'None'".format(
+ func.__name__,
+ ))
+
+ def register_preloop_hook(self, func):
+ """Register a function to be called at the beginning of the command loop."""
+ self._validate_prepostloop_callable(func)
+ self._preloop_hooks.append(func)
+
+ def register_postloop_hook(self, func):
+ """Register a function to be called at the end of the command loop."""
+ self._validate_prepostloop_callable(func)
+ self._postloop_hooks.append(func)
+
+ @classmethod
+ def _validate_postparsing_callable(cls, func):
+ """Check parameter and return types for postparsing hooks"""
+ cls._validate_callable_param_count(func, 1)
+ signature = inspect.signature(func)
+ _, param = list(signature.parameters.items())[0]
+ if param.annotation != plugin.PostparsingData:
+ raise TypeError("{} must have one parameter declared with type 'cmd2.plugin.PostparsingData'".format(
+ func.__name__
+ ))
+ if signature.return_annotation != plugin.PostparsingData:
+ raise TypeError("{} must declare return a return type of 'cmd2.plugin.PostparsingData'".format(
+ func.__name__
+ ))
+
+ def register_postparsing_hook(self, func):
+ """Register a function to be called after parsing user input but before running the command"""
+ self._validate_postparsing_callable(func)
+ self._postparsing_hooks.append(func)
+
+ @classmethod
+ def _validate_prepostcmd_hook(cls, func, data_type):
+ """Check parameter and return types for pre and post command hooks."""
+ signature = inspect.signature(func)
+ # validate that the callable has the right number of parameters
+ cls._validate_callable_param_count(func, 1)
+ # validate the parameter has the right annotation
+ paramname = list(signature.parameters.keys())[0]
+ param = signature.parameters[paramname]
+ if param.annotation != data_type:
+ raise TypeError('argument 1 of {} has incompatible type {}, expected {}'.format(
+ func.__name__,
+ param.annotation,
+ data_type,
+ ))
+ # validate the return value has the right annotation
+ if signature.return_annotation == signature.empty:
+ raise TypeError('{} does not have a declared return type, expected {}'.format(
+ func.__name__,
+ data_type,
+ ))
+ if signature.return_annotation != data_type:
+ raise TypeError('{} has incompatible return type {}, expected {}'.format(
+ func.__name__,
+ signature.return_annotation,
+ data_type,
+ ))
+
+ def register_precmd_hook(self, func):
+ """Register a hook to be called before the command function."""
+ self._validate_prepostcmd_hook(func, plugin.PrecommandData)
+ self._precmd_hooks.append(func)
+
+ def register_postcmd_hook(self, func):
+ """Register a hook to be called after the command function."""
+ self._validate_prepostcmd_hook(func, plugin.PostcommandData)
+ self._postcmd_hooks.append(func)
+
+ @classmethod
+ def _validate_cmdfinalization_callable(cls, func):
+ """Check parameter and return types for command finalization hooks."""
+ cls._validate_callable_param_count(func, 1)
+ signature = inspect.signature(func)
+ _, param = list(signature.parameters.items())[0]
+ if param.annotation != plugin.CommandFinalizationData:
+ raise TypeError("{} must have one parameter declared with type 'cmd2.plugin.CommandFinalizationData'".format(
+ func.__name__
+ ))
+ if signature.return_annotation != plugin.CommandFinalizationData:
+ raise TypeError("{} must declare return a return type of 'cmd2.plugin.CommandFinalizationData'".format(
+ func.__name__
+ ))
+ pass
+
+ def register_cmdfinalization_hook(self, func):
+ """Register a hook to be called after a command is completed, whether it completes successfully or not."""
+ self._validate_cmdfinalization_callable(func)
+ self._cmdfinalization_hooks.append(func)
class History(list):
""" A list of HistoryItems that knows how to respond to user requests. """
diff --git a/cmd2/parsing.py b/cmd2/parsing.py
index 8d59aedb..b220f1c4 100644
--- a/cmd2/parsing.py
+++ b/cmd2/parsing.py
@@ -5,7 +5,7 @@
import os
import re
import shlex
-from typing import List, Tuple
+from typing import List, Tuple, Dict
from . import constants
from . import utils
@@ -21,6 +21,10 @@ class Statement(str):
need a place to capture the additional output of the command parsing, so we add
our own attributes to this subclass.
+ Instances of this class should not be created by anything other than the
+ `StatementParser.parse()` method, nor should any of the attributes be modified
+ once the object is created.
+
The string portion of the class contains the arguments, but not the command, nor
the output redirection clauses.
@@ -54,18 +58,39 @@ class Statement(str):
:type output_to: str or None
"""
- def __init__(self, obj):
- super().__init__()
- self.raw = str(obj)
- self.command = None
- self.multiline_command = None
- self.args = None
- self.argv = None
- self.terminator = None
- self.suffix = None
- self.pipe_to = None
- self.output = None
- self.output_to = None
+ def __new__(cls,
+ obj: object,
+ *,
+ raw: str = None,
+ command: str = None,
+ args: str = None,
+ argv: List[str] = None,
+ multiline_command: str = None,
+ terminator: str = None,
+ suffix: str = None,
+ pipe_to: str = None,
+ output: str = None,
+ output_to:str = None
+ ):
+ """Create a new instance of Statement
+
+ We must override __new__ because we are subclassing `str` which is
+ immutable.
+ """
+ stmt = str.__new__(cls, obj)
+ object.__setattr__(stmt, "raw", raw)
+ object.__setattr__(stmt, "command", command)
+ object.__setattr__(stmt, "args", args)
+ if argv is None:
+ argv = []
+ object.__setattr__(stmt, "argv", argv)
+ object.__setattr__(stmt, "multiline_command", multiline_command)
+ object.__setattr__(stmt, "terminator", terminator)
+ object.__setattr__(stmt, "suffix", suffix)
+ object.__setattr__(stmt, "pipe_to", pipe_to)
+ object.__setattr__(stmt, "output", output)
+ object.__setattr__(stmt, "output_to", output_to)
+ return stmt
@property
def command_and_args(self):
@@ -82,6 +107,13 @@ class Statement(str):
rtn = None
return rtn
+ def __setattr__(self, name, value):
+ """Statement instances should feel immutable; raise ValueError"""
+ raise ValueError
+
+ def __delattr__(self, name):
+ """Statement instances should feel immutable; raise ValueError"""
+ raise ValueError
class StatementParser:
"""Parse raw text into command components.
@@ -90,11 +122,11 @@ class StatementParser:
"""
def __init__(
self,
- allow_redirection=True,
- terminators=None,
- multiline_commands=None,
- aliases=None,
- shortcuts=None,
+ allow_redirection: bool = True,
+ terminators: List[str] = None,
+ multiline_commands: List[str] = None,
+ aliases: Dict[str, str] = None,
+ shortcuts: Dict[str, str] = None,
):
self.allow_redirection = allow_redirection
if terminators is None:
@@ -228,7 +260,7 @@ class StatementParser:
tokens = self._split_on_punctuation(list(lexer))
return tokens
- def parse(self, rawinput: str) -> Statement:
+ def parse(self, line: str) -> Statement:
"""Tokenize the input and parse it into a Statement object, stripping
comments, expanding aliases and shortcuts, and extracting output
redirection directives.
@@ -240,7 +272,7 @@ class StatementParser:
# we have to do this before we tokenize because tokenizing
# destroys all unquoted whitespace in the input
terminator = None
- if rawinput[-1:] == LINE_FEED:
+ if line[-1:] == LINE_FEED:
terminator = LINE_FEED
command = None
@@ -248,7 +280,7 @@ class StatementParser:
argv = None
# lex the input into a list of tokens
- tokens = self.tokenize(rawinput)
+ tokens = self.tokenize(line)
# of the valid terminators, find the first one to occur in the input
terminator_pos = len(tokens) + 1
@@ -360,19 +392,18 @@ class StatementParser:
# build the statement
# string representation of args must be an empty string instead of
# None for compatibility with standard library cmd
- statement = Statement('' if args is None else args)
- statement.raw = rawinput
- statement.command = command
- # if there are no args we will use None since we don't have to worry
- # about compatibility with standard library cmd
- statement.args = args
- statement.argv = list(map(lambda x: utils.strip_quotes(x), argv))
- statement.terminator = terminator
- statement.output = output
- statement.output_to = output_to
- statement.pipe_to = pipe_to
- statement.suffix = suffix
- statement.multiline_command = multiline_command
+ statement = Statement('' if args is None else args,
+ raw=line,
+ command=command,
+ args=args,
+ argv=list(map(lambda x: utils.strip_quotes(x), argv)),
+ multiline_command=multiline_command,
+ terminator=terminator,
+ suffix=suffix,
+ pipe_to=pipe_to,
+ output=output,
+ output_to=output_to,
+ )
return statement
def parse_command_only(self, rawinput: str) -> Statement:
@@ -422,10 +453,11 @@ class StatementParser:
# build the statement
# string representation of args must be an empty string instead of
# None for compatibility with standard library cmd
- statement = Statement('' if args is None else args)
- statement.raw = rawinput
- statement.command = command
- statement.args = args
+ statement = Statement('' if args is None else args,
+ raw=rawinput,
+ command=command,
+ args=args,
+ )
return statement
def _expand(self, line: str) -> str:
diff --git a/cmd2/plugin.py b/cmd2/plugin.py
new file mode 100644
index 00000000..5c68dfb9
--- /dev/null
+++ b/cmd2/plugin.py
@@ -0,0 +1,23 @@
+#
+# coding=utf-8
+"""Classes for the cmd2 plugin system"""
+import attr
+
+@attr.s
+class PostparsingData():
+ stop = attr.ib()
+ statement = attr.ib()
+
+@attr.s
+class PrecommandData():
+ statement = attr.ib()
+
+@attr.s
+class PostcommandData():
+ stop = attr.ib()
+ statement = attr.ib()
+
+@attr.s
+class CommandFinalizationData():
+ stop = attr.ib()
+ statement = attr.ib()