summaryrefslogtreecommitdiff
path: root/cmd2
diff options
context:
space:
mode:
Diffstat (limited to 'cmd2')
-rw-r--r--cmd2/cmd2.py204
-rw-r--r--cmd2/constants.py2
-rw-r--r--cmd2/parsing.py115
-rw-r--r--cmd2/plugin.py27
4 files changed, 298 insertions, 50 deletions
diff --git a/cmd2/cmd2.py b/cmd2/cmd2.py
index 06295dfd..51f2f924 100644
--- a/cmd2/cmd2.py
+++ b/cmd2/cmd2.py
@@ -34,15 +34,17 @@ import cmd
import collections
from colorama import Fore
import glob
+import inspect
import os
import platform
import re
import shlex
import sys
-from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple, Union
+from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple, Type, Union
from . import constants
from . import utils
+from . import plugin
from .argparse_completer import AutoCompleter, ACArgumentParser
from .clipboard import can_clip, get_paste_buffer, write_to_paste_buffer
from .parsing import StatementParser, Statement
@@ -369,6 +371,10 @@ class Cmd(cmd.Cmd):
except AttributeError:
pass
+ # initialize plugin system
+ # needs to be done before we call __init__(0)
+ self._initialize_plugin_system()
+
# Call super class constructor
super().__init__(completekey=completekey, stdin=stdin, stdout=stdout)
@@ -597,12 +603,13 @@ class Cmd(cmd.Cmd):
:param msg: message to print to current stdout - anything convertible to a str with '{}'.format() is OK
:param end: string appended after the end of the message if not already present, default a newline
- :param chop: True -> causes lines longer than the screen width to be chopped (truncated) rather than wrapped
+ :param chop: True -> causes lines longer than the screen width to be chopped (truncated) rather than wrapped
- truncated text is still accessible by scrolling with the right & left arrow keys
- chopping is ideal for displaying wide tabular data as is done in utilities like pgcli
False -> causes lines longer than the screen width to wrap to the next line
- wrapping is ideal when you want to avoid users having to use horizontal scrolling
- WARNING: On Windows, the text always wraps regardless of what the chop argument is set to
+
+ WARNING: On Windows, the text always wraps regardless of what the chop argument is set to
"""
import subprocess
if msg is not None and msg != '':
@@ -1699,12 +1706,35 @@ class Cmd(cmd.Cmd):
:return: True if cmdloop() should exit, False otherwise
"""
import datetime
+
stop = False
try:
statement = self._complete_statement(line)
- (stop, statement) = self.postparsing_precmd(statement)
+ except EmptyStatement:
+ return self._run_cmdfinalization_hooks(stop, None)
+ except ValueError as ex:
+ # If shlex.split failed on syntax, let user know whats going on
+ self.perror("Invalid syntax: {}".format(ex), traceback_war=False)
+ return stop
+
+ # now that we have a statement, run it with all the hooks
+ try:
+ # call the postparsing hooks
+ data = plugin.PostparsingData(False, statement)
+ for func in self._postparsing_hooks:
+ data = func(data)
+ if data.stop:
+ break
+ # postparsing_precmd is deprecated
+ if not data.stop:
+ (data.stop, data.statement) = self.postparsing_precmd(data.statement)
+ # unpack the data object
+ statement = data.statement
+ stop = data.stop
if stop:
- return self.postparsing_postcmd(stop)
+ # we should not run the command, but
+ # we need to run the finalization hooks
+ raise EmptyStatement
try:
if self.allow_redirection:
@@ -1712,23 +1742,53 @@ class Cmd(cmd.Cmd):
timestart = datetime.datetime.now()
if self._in_py:
self._last_result = None
+
+ # precommand hooks
+ data = plugin.PrecommandData(statement)
+ for func in self._precmd_hooks:
+ data = func(data)
+ statement = data.statement
+ # call precmd() for compatibility with cmd.Cmd
statement = self.precmd(statement)
+
+ # go run the command function
stop = self.onecmd(statement)
+
+ # postcommand hooks
+ data = plugin.PostcommandData(stop, statement)
+ for func in self._postcmd_hooks:
+ data = func(data)
+ # retrieve the final value of stop, ignoring any statement modification from the hooks
+ stop = data.stop
+ # call postcmd() for compatibility with cmd.Cmd
stop = self.postcmd(stop, statement)
+
if self.timing:
self.pfeedback('Elapsed: %s' % str(datetime.datetime.now() - timestart))
finally:
if self.allow_redirection and self.redirecting:
self._restore_output(statement)
except EmptyStatement:
+ # don't do anything, but do allow command finalization hooks to run
pass
- except ValueError as ex:
- # If shlex.split failed on syntax, let user know whats going on
- self.perror("Invalid syntax: {}".format(ex), traceback_war=False)
except Exception as ex:
self.perror(ex)
finally:
+ return self._run_cmdfinalization_hooks(stop, statement)
+
+ def _run_cmdfinalization_hooks(self, stop: bool, statement: Optional[Statement]) -> bool:
+ """Run the command finalization hooks"""
+ try:
+ data = plugin.CommandFinalizationData(stop, statement)
+ for func in self._cmdfinalization_hooks:
+ data = func(data)
+ # retrieve the final value of stop, ignoring any
+ # modifications to the statement
+ stop = data.stop
+ # postparsing_postcmd is deprecated
return self.postparsing_postcmd(stop)
+ except Exception as ex:
+ self.perror(ex)
def runcmds_plus_hooks(self, cmds: List[str]) -> bool:
"""Convenience method to run multiple commands by onecmd_plus_hooks.
@@ -3105,6 +3165,8 @@ Script should contain one command per line, just like command would be typed in
self.cmdqueue.extend(callargs)
# Always run the preloop first
+ for func in self._preloop_hooks:
+ func()
self.preloop()
# If transcript-based regression testing was requested, then do that instead of the main loop
@@ -3123,8 +3185,134 @@ Script should contain one command per line, just like command would be typed in
self._cmdloop()
# Run the postloop() no matter what
+ for func in self._postloop_hooks:
+ func()
self.postloop()
+ ###
+ #
+ # plugin related functions
+ #
+ ###
+ def _initialize_plugin_system(self):
+ """Initialize the plugin system"""
+ self._preloop_hooks = []
+ self._postloop_hooks = []
+ self._postparsing_hooks = []
+ self._precmd_hooks = []
+ self._postcmd_hooks = []
+ self._cmdfinalization_hooks = []
+
+ @classmethod
+ def _validate_callable_param_count(cls, func: Callable, count: int):
+ """Ensure a function has the given number of parameters."""
+ signature = inspect.signature(func)
+ # validate that the callable has the right number of parameters
+ nparam = len(signature.parameters)
+ if nparam != count:
+ raise TypeError('{} has {} positional arguments, expected {}'.format(
+ func.__name__,
+ nparam,
+ count,
+ ))
+
+ @classmethod
+ def _validate_prepostloop_callable(cls, func: Callable):
+ """Check parameter and return types for preloop and postloop hooks."""
+ cls._validate_callable_param_count(func, 0)
+ # make sure there is no return notation
+ signature = inspect.signature(func)
+ if signature.return_annotation is not None:
+ raise TypeError("{} must declare return a return type of 'None'".format(
+ func.__name__,
+ ))
+
+ def register_preloop_hook(self, func: Callable):
+ """Register a function to be called at the beginning of the command loop."""
+ self._validate_prepostloop_callable(func)
+ self._preloop_hooks.append(func)
+
+ def register_postloop_hook(self, func: Callable):
+ """Register a function to be called at the end of the command loop."""
+ self._validate_prepostloop_callable(func)
+ self._postloop_hooks.append(func)
+
+ @classmethod
+ def _validate_postparsing_callable(cls, func: Callable):
+ """Check parameter and return types for postparsing hooks"""
+ cls._validate_callable_param_count(func, 1)
+ signature = inspect.signature(func)
+ _, param = list(signature.parameters.items())[0]
+ if param.annotation != plugin.PostparsingData:
+ raise TypeError("{} must have one parameter declared with type 'cmd2.plugin.PostparsingData'".format(
+ func.__name__
+ ))
+ if signature.return_annotation != plugin.PostparsingData:
+ raise TypeError("{} must declare return a return type of 'cmd2.plugin.PostparsingData'".format(
+ func.__name__
+ ))
+
+ def register_postparsing_hook(self, func: Callable):
+ """Register a function to be called after parsing user input but before running the command"""
+ self._validate_postparsing_callable(func)
+ self._postparsing_hooks.append(func)
+
+ @classmethod
+ def _validate_prepostcmd_hook(cls, func: Callable, data_type: Type):
+ """Check parameter and return types for pre and post command hooks."""
+ signature = inspect.signature(func)
+ # validate that the callable has the right number of parameters
+ cls._validate_callable_param_count(func, 1)
+ # validate the parameter has the right annotation
+ paramname = list(signature.parameters.keys())[0]
+ param = signature.parameters[paramname]
+ if param.annotation != data_type:
+ raise TypeError('argument 1 of {} has incompatible type {}, expected {}'.format(
+ func.__name__,
+ param.annotation,
+ data_type,
+ ))
+ # validate the return value has the right annotation
+ if signature.return_annotation == signature.empty:
+ raise TypeError('{} does not have a declared return type, expected {}'.format(
+ func.__name__,
+ data_type,
+ ))
+ if signature.return_annotation != data_type:
+ raise TypeError('{} has incompatible return type {}, expected {}'.format(
+ func.__name__,
+ signature.return_annotation,
+ data_type,
+ ))
+
+ def register_precmd_hook(self, func: Callable):
+ """Register a hook to be called before the command function."""
+ self._validate_prepostcmd_hook(func, plugin.PrecommandData)
+ self._precmd_hooks.append(func)
+
+ def register_postcmd_hook(self, func: Callable):
+ """Register a hook to be called after the command function."""
+ self._validate_prepostcmd_hook(func, plugin.PostcommandData)
+ self._postcmd_hooks.append(func)
+
+ @classmethod
+ def _validate_cmdfinalization_callable(cls, func: Callable):
+ """Check parameter and return types for command finalization hooks."""
+ cls._validate_callable_param_count(func, 1)
+ signature = inspect.signature(func)
+ _, param = list(signature.parameters.items())[0]
+ if param.annotation != plugin.CommandFinalizationData:
+ raise TypeError("{} must have one parameter declared with type "
+ "'cmd2.plugin.CommandFinalizationData'".format(func.__name__))
+ if signature.return_annotation != plugin.CommandFinalizationData:
+ raise TypeError("{} must declare return a return type of "
+ "'cmd2.plugin.CommandFinalizationData'".format(func.__name__))
+
+ def register_cmdfinalization_hook(self, func: Callable):
+ """Register a hook to be called after a command is completed, whether it completes successfully or not."""
+ self._validate_cmdfinalization_callable(func)
+ self._cmdfinalization_hooks.append(func)
+
class History(list):
""" A list of HistoryItems that knows how to respond to user requests. """
diff --git a/cmd2/constants.py b/cmd2/constants.py
index b829000f..d3e8a125 100644
--- a/cmd2/constants.py
+++ b/cmd2/constants.py
@@ -15,3 +15,5 @@ REDIRECTION_TOKENS = [REDIRECTION_PIPE, REDIRECTION_OUTPUT, REDIRECTION_APPEND]
# Regular expression to match ANSI escape codes
ANSI_ESCAPE_RE = re.compile(r'\x1b[^m]*m')
+
+LINE_FEED = '\n'
diff --git a/cmd2/parsing.py b/cmd2/parsing.py
index 84c7e27a..0d480f0f 100644
--- a/cmd2/parsing.py
+++ b/cmd2/parsing.py
@@ -5,13 +5,11 @@
import os
import re
import shlex
-from typing import List, Tuple
+from typing import List, Tuple, Dict
from . import constants
from . import utils
-LINE_FEED = '\n'
-
class Statement(str):
"""String subclass with additional attributes to store the results of parsing.
@@ -21,6 +19,10 @@ class Statement(str):
need a place to capture the additional output of the command parsing, so we add
our own attributes to this subclass.
+ Instances of this class should not be created by anything other than the
+ `StatementParser.parse()` method, nor should any of the attributes be modified
+ once the object is created.
+
The string portion of the class contains the arguments, but not the command, nor
the output redirection clauses.
@@ -54,18 +56,39 @@ class Statement(str):
:type output_to: str or None
"""
- def __init__(self, obj):
- super().__init__()
- self.raw = str(obj)
- self.command = None
- self.multiline_command = None
- self.args = None
- self.argv = None
- self.terminator = None
- self.suffix = None
- self.pipe_to = None
- self.output = None
- self.output_to = None
+ def __new__(cls,
+ obj: object,
+ *,
+ raw: str = None,
+ command: str = None,
+ args: str = None,
+ argv: List[str] = None,
+ multiline_command: str = None,
+ terminator: str = None,
+ suffix: str = None,
+ pipe_to: str = None,
+ output: str = None,
+ output_to: str = None
+ ):
+ """Create a new instance of Statement
+
+ We must override __new__ because we are subclassing `str` which is
+ immutable.
+ """
+ stmt = str.__new__(cls, obj)
+ object.__setattr__(stmt, "raw", raw)
+ object.__setattr__(stmt, "command", command)
+ object.__setattr__(stmt, "args", args)
+ if argv is None:
+ argv = []
+ object.__setattr__(stmt, "argv", argv)
+ object.__setattr__(stmt, "multiline_command", multiline_command)
+ object.__setattr__(stmt, "terminator", terminator)
+ object.__setattr__(stmt, "suffix", suffix)
+ object.__setattr__(stmt, "pipe_to", pipe_to)
+ object.__setattr__(stmt, "output", output)
+ object.__setattr__(stmt, "output_to", output_to)
+ return stmt
@property
def command_and_args(self):
@@ -82,6 +105,14 @@ class Statement(str):
rtn = None
return rtn
+ def __setattr__(self, name, value):
+ """Statement instances should feel immutable; raise ValueError"""
+ raise ValueError
+
+ def __delattr__(self, name):
+ """Statement instances should feel immutable; raise ValueError"""
+ raise ValueError
+
class StatementParser:
"""Parse raw text into command components.
@@ -90,11 +121,11 @@ class StatementParser:
"""
def __init__(
self,
- allow_redirection=True,
- terminators=None,
- multiline_commands=None,
- aliases=None,
- shortcuts=None,
+ allow_redirection: bool = True,
+ terminators: List[str] = None,
+ multiline_commands: List[str] = None,
+ aliases: Dict[str, str] = None,
+ shortcuts: List[Tuple[str, str]] = None,
):
self.allow_redirection = allow_redirection
if terminators is None:
@@ -228,7 +259,7 @@ class StatementParser:
tokens = self._split_on_punctuation(list(lexer))
return tokens
- def parse(self, rawinput: str) -> Statement:
+ def parse(self, line: str) -> Statement:
"""Tokenize the input and parse it into a Statement object, stripping
comments, expanding aliases and shortcuts, and extracting output
redirection directives.
@@ -240,15 +271,15 @@ class StatementParser:
# we have to do this before we tokenize because tokenizing
# destroys all unquoted whitespace in the input
terminator = None
- if rawinput[-1:] == LINE_FEED:
- terminator = LINE_FEED
+ if line[-1:] == constants.LINE_FEED:
+ terminator = constants.LINE_FEED
command = None
args = None
argv = None
# lex the input into a list of tokens
- tokens = self.tokenize(rawinput)
+ tokens = self.tokenize(line)
# of the valid terminators, find the first one to occur in the input
terminator_pos = len(tokens) + 1
@@ -269,7 +300,7 @@ class StatementParser:
break
if terminator:
- if terminator == LINE_FEED:
+ if terminator == constants.LINE_FEED:
terminator_pos = len(tokens)+1
# everything before the first terminator is the command and the args
@@ -360,19 +391,18 @@ class StatementParser:
# build the statement
# string representation of args must be an empty string instead of
# None for compatibility with standard library cmd
- statement = Statement('' if args is None else args)
- statement.raw = rawinput
- statement.command = command
- # if there are no args we will use None since we don't have to worry
- # about compatibility with standard library cmd
- statement.args = args
- statement.argv = list(map(lambda x: utils.strip_quotes(x), argv))
- statement.terminator = terminator
- statement.output = output
- statement.output_to = output_to
- statement.pipe_to = pipe_to
- statement.suffix = suffix
- statement.multiline_command = multiline_command
+ statement = Statement('' if args is None else args,
+ raw=line,
+ command=command,
+ args=args,
+ argv=list(map(lambda x: utils.strip_quotes(x), argv)),
+ multiline_command=multiline_command,
+ terminator=terminator,
+ suffix=suffix,
+ pipe_to=pipe_to,
+ output=output,
+ output_to=output_to,
+ )
return statement
def parse_command_only(self, rawinput: str) -> Statement:
@@ -422,10 +452,11 @@ class StatementParser:
# build the statement
# string representation of args must be an empty string instead of
# None for compatibility with standard library cmd
- statement = Statement('' if args is None else args)
- statement.raw = rawinput
- statement.command = command
- statement.args = args
+ statement = Statement('' if args is None else args,
+ raw=rawinput,
+ command=command,
+ args=args,
+ )
return statement
def _expand(self, line: str) -> str:
diff --git a/cmd2/plugin.py b/cmd2/plugin.py
new file mode 100644
index 00000000..dc9ec297
--- /dev/null
+++ b/cmd2/plugin.py
@@ -0,0 +1,27 @@
+#
+# coding=utf-8
+"""Classes for the cmd2 plugin system"""
+import attr
+
+
+@attr.s
+class PostparsingData:
+ stop = attr.ib()
+ statement = attr.ib()
+
+
+@attr.s
+class PrecommandData:
+ statement = attr.ib()
+
+
+@attr.s
+class PostcommandData:
+ stop = attr.ib()
+ statement = attr.ib()
+
+
+@attr.s
+class CommandFinalizationData:
+ stop = attr.ib()
+ statement = attr.ib()