summaryrefslogtreecommitdiff
path: root/cmd2/cmd2.py
diff options
context:
space:
mode:
Diffstat (limited to 'cmd2/cmd2.py')
-rw-r--r--cmd2/cmd2.py224
1 files changed, 211 insertions, 13 deletions
diff --git a/cmd2/cmd2.py b/cmd2/cmd2.py
index 06295dfd..875cef59 100644
--- a/cmd2/cmd2.py
+++ b/cmd2/cmd2.py
@@ -34,15 +34,17 @@ import cmd
import collections
from colorama import Fore
import glob
+import inspect
import os
import platform
import re
import shlex
import sys
-from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple, Union
+from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple, Type, Union
from . import constants
from . import utils
+from . import plugin
from .argparse_completer import AutoCompleter, ACArgumentParser
from .clipboard import can_clip, get_paste_buffer, write_to_paste_buffer
from .parsing import StatementParser, Statement
@@ -114,7 +116,7 @@ try:
except ImportError: # pragma: no cover
ipython_available = False
-__version__ = '0.9.3'
+__version__ = '0.9.4'
# optional attribute, when tagged on a function, allows cmd2 to categorize commands
@@ -369,6 +371,10 @@ class Cmd(cmd.Cmd):
except AttributeError:
pass
+ # initialize plugin system
+ # needs to be done before we call __init__(0)
+ self._initialize_plugin_system()
+
# Call super class constructor
super().__init__(completekey=completekey, stdin=stdin, stdout=stdout)
@@ -597,12 +603,13 @@ class Cmd(cmd.Cmd):
:param msg: message to print to current stdout - anything convertible to a str with '{}'.format() is OK
:param end: string appended after the end of the message if not already present, default a newline
- :param chop: True -> causes lines longer than the screen width to be chopped (truncated) rather than wrapped
+ :param chop: True -> causes lines longer than the screen width to be chopped (truncated) rather than wrapped
- truncated text is still accessible by scrolling with the right & left arrow keys
- chopping is ideal for displaying wide tabular data as is done in utilities like pgcli
False -> causes lines longer than the screen width to wrap to the next line
- wrapping is ideal when you want to avoid users having to use horizontal scrolling
- WARNING: On Windows, the text always wraps regardless of what the chop argument is set to
+
+ WARNING: On Windows, the text always wraps regardless of what the chop argument is set to
"""
import subprocess
if msg is not None and msg != '':
@@ -1635,10 +1642,20 @@ class Cmd(cmd.Cmd):
# noinspection PyMethodMayBeStatic
def preparse(self, raw: str) -> str:
- """Hook method executed just before the command line is interpreted, but after the input prompt is generated.
+ """Hook method executed before user input is parsed.
+
+ WARNING: If it's a multiline command, `preparse()` may not get all the
+ user input. _complete_statement() really does two things: a) parse the
+ user input, and b) accept more input in case it's a multiline command
+ the passed string doesn't have a terminator. `preparse()` is currently
+ called before we know whether it's a multiline command, and before we
+ know whether the user input includes a termination character.
+
+ If you want a reliable pre parsing hook method, register a postparsing
+ hook, modify the user input, and then reparse it.
- :param raw: raw command line input
- :return: potentially modified raw command line input
+ :param raw: raw command line input :return: potentially modified raw
+ command line input
"""
return raw
@@ -1699,12 +1716,35 @@ class Cmd(cmd.Cmd):
:return: True if cmdloop() should exit, False otherwise
"""
import datetime
+
stop = False
try:
statement = self._complete_statement(line)
- (stop, statement) = self.postparsing_precmd(statement)
+ except EmptyStatement:
+ return self._run_cmdfinalization_hooks(stop, None)
+ except ValueError as ex:
+ # If shlex.split failed on syntax, let user know whats going on
+ self.perror("Invalid syntax: {}".format(ex), traceback_war=False)
+ return stop
+
+ # now that we have a statement, run it with all the hooks
+ try:
+ # call the postparsing hooks
+ data = plugin.PostparsingData(False, statement)
+ for func in self._postparsing_hooks:
+ data = func(data)
+ if data.stop:
+ break
+ # postparsing_precmd is deprecated
+ if not data.stop:
+ (data.stop, data.statement) = self.postparsing_precmd(data.statement)
+ # unpack the data object
+ statement = data.statement
+ stop = data.stop
if stop:
- return self.postparsing_postcmd(stop)
+ # we should not run the command, but
+ # we need to run the finalization hooks
+ raise EmptyStatement
try:
if self.allow_redirection:
@@ -1712,23 +1752,53 @@ class Cmd(cmd.Cmd):
timestart = datetime.datetime.now()
if self._in_py:
self._last_result = None
+
+ # precommand hooks
+ data = plugin.PrecommandData(statement)
+ for func in self._precmd_hooks:
+ data = func(data)
+ statement = data.statement
+ # call precmd() for compatibility with cmd.Cmd
statement = self.precmd(statement)
+
+ # go run the command function
stop = self.onecmd(statement)
+
+ # postcommand hooks
+ data = plugin.PostcommandData(stop, statement)
+ for func in self._postcmd_hooks:
+ data = func(data)
+ # retrieve the final value of stop, ignoring any statement modification from the hooks
+ stop = data.stop
+ # call postcmd() for compatibility with cmd.Cmd
stop = self.postcmd(stop, statement)
+
if self.timing:
self.pfeedback('Elapsed: %s' % str(datetime.datetime.now() - timestart))
finally:
if self.allow_redirection and self.redirecting:
self._restore_output(statement)
except EmptyStatement:
+ # don't do anything, but do allow command finalization hooks to run
pass
- except ValueError as ex:
- # If shlex.split failed on syntax, let user know whats going on
- self.perror("Invalid syntax: {}".format(ex), traceback_war=False)
except Exception as ex:
self.perror(ex)
finally:
+ return self._run_cmdfinalization_hooks(stop, statement)
+
+ def _run_cmdfinalization_hooks(self, stop: bool, statement: Optional[Statement]) -> bool:
+ """Run the command finalization hooks"""
+ try:
+ data = plugin.CommandFinalizationData(stop, statement)
+ for func in self._cmdfinalization_hooks:
+ data = func(data)
+ # retrieve the final value of stop, ignoring any
+ # modifications to the statement
+ stop = data.stop
+ # postparsing_postcmd is deprecated
return self.postparsing_postcmd(stop)
+ except Exception as ex:
+ self.perror(ex)
def runcmds_plus_hooks(self, cmds: List[str]) -> bool:
"""Convenience method to run multiple commands by onecmd_plus_hooks.
@@ -1780,7 +1850,7 @@ class Cmd(cmd.Cmd):
pipe runs out. We can't refactor it because we need to retain
backwards compatibility with the standard library version of cmd.
"""
- statement = self.statement_parser.parse(line)
+ statement = self.statement_parser.parse(self.preparse(line))
while statement.multiline_command and not statement.terminator:
if not self.quit_on_sigint:
try:
@@ -3105,6 +3175,8 @@ Script should contain one command per line, just like command would be typed in
self.cmdqueue.extend(callargs)
# Always run the preloop first
+ for func in self._preloop_hooks:
+ func()
self.preloop()
# If transcript-based regression testing was requested, then do that instead of the main loop
@@ -3123,8 +3195,134 @@ Script should contain one command per line, just like command would be typed in
self._cmdloop()
# Run the postloop() no matter what
+ for func in self._postloop_hooks:
+ func()
self.postloop()
+ ###
+ #
+ # plugin related functions
+ #
+ ###
+ def _initialize_plugin_system(self):
+ """Initialize the plugin system"""
+ self._preloop_hooks = []
+ self._postloop_hooks = []
+ self._postparsing_hooks = []
+ self._precmd_hooks = []
+ self._postcmd_hooks = []
+ self._cmdfinalization_hooks = []
+
+ @classmethod
+ def _validate_callable_param_count(cls, func: Callable, count: int):
+ """Ensure a function has the given number of parameters."""
+ signature = inspect.signature(func)
+ # validate that the callable has the right number of parameters
+ nparam = len(signature.parameters)
+ if nparam != count:
+ raise TypeError('{} has {} positional arguments, expected {}'.format(
+ func.__name__,
+ nparam,
+ count,
+ ))
+
+ @classmethod
+ def _validate_prepostloop_callable(cls, func: Callable):
+ """Check parameter and return types for preloop and postloop hooks."""
+ cls._validate_callable_param_count(func, 0)
+ # make sure there is no return notation
+ signature = inspect.signature(func)
+ if signature.return_annotation is not None:
+ raise TypeError("{} must declare return a return type of 'None'".format(
+ func.__name__,
+ ))
+
+ def register_preloop_hook(self, func: Callable):
+ """Register a function to be called at the beginning of the command loop."""
+ self._validate_prepostloop_callable(func)
+ self._preloop_hooks.append(func)
+
+ def register_postloop_hook(self, func: Callable):
+ """Register a function to be called at the end of the command loop."""
+ self._validate_prepostloop_callable(func)
+ self._postloop_hooks.append(func)
+
+ @classmethod
+ def _validate_postparsing_callable(cls, func: Callable):
+ """Check parameter and return types for postparsing hooks"""
+ cls._validate_callable_param_count(func, 1)
+ signature = inspect.signature(func)
+ _, param = list(signature.parameters.items())[0]
+ if param.annotation != plugin.PostparsingData:
+ raise TypeError("{} must have one parameter declared with type 'cmd2.plugin.PostparsingData'".format(
+ func.__name__
+ ))
+ if signature.return_annotation != plugin.PostparsingData:
+ raise TypeError("{} must declare return a return type of 'cmd2.plugin.PostparsingData'".format(
+ func.__name__
+ ))
+
+ def register_postparsing_hook(self, func: Callable):
+ """Register a function to be called after parsing user input but before running the command"""
+ self._validate_postparsing_callable(func)
+ self._postparsing_hooks.append(func)
+
+ @classmethod
+ def _validate_prepostcmd_hook(cls, func: Callable, data_type: Type):
+ """Check parameter and return types for pre and post command hooks."""
+ signature = inspect.signature(func)
+ # validate that the callable has the right number of parameters
+ cls._validate_callable_param_count(func, 1)
+ # validate the parameter has the right annotation
+ paramname = list(signature.parameters.keys())[0]
+ param = signature.parameters[paramname]
+ if param.annotation != data_type:
+ raise TypeError('argument 1 of {} has incompatible type {}, expected {}'.format(
+ func.__name__,
+ param.annotation,
+ data_type,
+ ))
+ # validate the return value has the right annotation
+ if signature.return_annotation == signature.empty:
+ raise TypeError('{} does not have a declared return type, expected {}'.format(
+ func.__name__,
+ data_type,
+ ))
+ if signature.return_annotation != data_type:
+ raise TypeError('{} has incompatible return type {}, expected {}'.format(
+ func.__name__,
+ signature.return_annotation,
+ data_type,
+ ))
+
+ def register_precmd_hook(self, func: Callable):
+ """Register a hook to be called before the command function."""
+ self._validate_prepostcmd_hook(func, plugin.PrecommandData)
+ self._precmd_hooks.append(func)
+
+ def register_postcmd_hook(self, func: Callable):
+ """Register a hook to be called after the command function."""
+ self._validate_prepostcmd_hook(func, plugin.PostcommandData)
+ self._postcmd_hooks.append(func)
+
+ @classmethod
+ def _validate_cmdfinalization_callable(cls, func: Callable):
+ """Check parameter and return types for command finalization hooks."""
+ cls._validate_callable_param_count(func, 1)
+ signature = inspect.signature(func)
+ _, param = list(signature.parameters.items())[0]
+ if param.annotation != plugin.CommandFinalizationData:
+ raise TypeError("{} must have one parameter declared with type "
+ "'cmd2.plugin.CommandFinalizationData'".format(func.__name__))
+ if signature.return_annotation != plugin.CommandFinalizationData:
+ raise TypeError("{} must declare return a return type of "
+ "'cmd2.plugin.CommandFinalizationData'".format(func.__name__))
+
+ def register_cmdfinalization_hook(self, func: Callable):
+ """Register a hook to be called after a command is completed, whether it completes successfully or not."""
+ self._validate_cmdfinalization_callable(func)
+ self._cmdfinalization_hooks.append(func)
+
class History(list):
""" A list of HistoryItems that knows how to respond to user requests. """