diff options
Diffstat (limited to 'cmd2/cmd2.py')
-rw-r--r-- | cmd2/cmd2.py | 224 |
1 files changed, 211 insertions, 13 deletions
diff --git a/cmd2/cmd2.py b/cmd2/cmd2.py index 06295dfd..875cef59 100644 --- a/cmd2/cmd2.py +++ b/cmd2/cmd2.py @@ -34,15 +34,17 @@ import cmd import collections from colorama import Fore import glob +import inspect import os import platform import re import shlex import sys -from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple, Union +from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple, Type, Union from . import constants from . import utils +from . import plugin from .argparse_completer import AutoCompleter, ACArgumentParser from .clipboard import can_clip, get_paste_buffer, write_to_paste_buffer from .parsing import StatementParser, Statement @@ -114,7 +116,7 @@ try: except ImportError: # pragma: no cover ipython_available = False -__version__ = '0.9.3' +__version__ = '0.9.4' # optional attribute, when tagged on a function, allows cmd2 to categorize commands @@ -369,6 +371,10 @@ class Cmd(cmd.Cmd): except AttributeError: pass + # initialize plugin system + # needs to be done before we call __init__(0) + self._initialize_plugin_system() + # Call super class constructor super().__init__(completekey=completekey, stdin=stdin, stdout=stdout) @@ -597,12 +603,13 @@ class Cmd(cmd.Cmd): :param msg: message to print to current stdout - anything convertible to a str with '{}'.format() is OK :param end: string appended after the end of the message if not already present, default a newline - :param chop: True -> causes lines longer than the screen width to be chopped (truncated) rather than wrapped + :param chop: True -> causes lines longer than the screen width to be chopped (truncated) rather than wrapped - truncated text is still accessible by scrolling with the right & left arrow keys - chopping is ideal for displaying wide tabular data as is done in utilities like pgcli False -> causes lines longer than the screen width to wrap to the next line - wrapping is ideal when you want to avoid users having to use horizontal scrolling - WARNING: On Windows, the text always wraps regardless of what the chop argument is set to + + WARNING: On Windows, the text always wraps regardless of what the chop argument is set to """ import subprocess if msg is not None and msg != '': @@ -1635,10 +1642,20 @@ class Cmd(cmd.Cmd): # noinspection PyMethodMayBeStatic def preparse(self, raw: str) -> str: - """Hook method executed just before the command line is interpreted, but after the input prompt is generated. + """Hook method executed before user input is parsed. + + WARNING: If it's a multiline command, `preparse()` may not get all the + user input. _complete_statement() really does two things: a) parse the + user input, and b) accept more input in case it's a multiline command + the passed string doesn't have a terminator. `preparse()` is currently + called before we know whether it's a multiline command, and before we + know whether the user input includes a termination character. + + If you want a reliable pre parsing hook method, register a postparsing + hook, modify the user input, and then reparse it. - :param raw: raw command line input - :return: potentially modified raw command line input + :param raw: raw command line input :return: potentially modified raw + command line input """ return raw @@ -1699,12 +1716,35 @@ class Cmd(cmd.Cmd): :return: True if cmdloop() should exit, False otherwise """ import datetime + stop = False try: statement = self._complete_statement(line) - (stop, statement) = self.postparsing_precmd(statement) + except EmptyStatement: + return self._run_cmdfinalization_hooks(stop, None) + except ValueError as ex: + # If shlex.split failed on syntax, let user know whats going on + self.perror("Invalid syntax: {}".format(ex), traceback_war=False) + return stop + + # now that we have a statement, run it with all the hooks + try: + # call the postparsing hooks + data = plugin.PostparsingData(False, statement) + for func in self._postparsing_hooks: + data = func(data) + if data.stop: + break + # postparsing_precmd is deprecated + if not data.stop: + (data.stop, data.statement) = self.postparsing_precmd(data.statement) + # unpack the data object + statement = data.statement + stop = data.stop if stop: - return self.postparsing_postcmd(stop) + # we should not run the command, but + # we need to run the finalization hooks + raise EmptyStatement try: if self.allow_redirection: @@ -1712,23 +1752,53 @@ class Cmd(cmd.Cmd): timestart = datetime.datetime.now() if self._in_py: self._last_result = None + + # precommand hooks + data = plugin.PrecommandData(statement) + for func in self._precmd_hooks: + data = func(data) + statement = data.statement + # call precmd() for compatibility with cmd.Cmd statement = self.precmd(statement) + + # go run the command function stop = self.onecmd(statement) + + # postcommand hooks + data = plugin.PostcommandData(stop, statement) + for func in self._postcmd_hooks: + data = func(data) + # retrieve the final value of stop, ignoring any statement modification from the hooks + stop = data.stop + # call postcmd() for compatibility with cmd.Cmd stop = self.postcmd(stop, statement) + if self.timing: self.pfeedback('Elapsed: %s' % str(datetime.datetime.now() - timestart)) finally: if self.allow_redirection and self.redirecting: self._restore_output(statement) except EmptyStatement: + # don't do anything, but do allow command finalization hooks to run pass - except ValueError as ex: - # If shlex.split failed on syntax, let user know whats going on - self.perror("Invalid syntax: {}".format(ex), traceback_war=False) except Exception as ex: self.perror(ex) finally: + return self._run_cmdfinalization_hooks(stop, statement) + + def _run_cmdfinalization_hooks(self, stop: bool, statement: Optional[Statement]) -> bool: + """Run the command finalization hooks""" + try: + data = plugin.CommandFinalizationData(stop, statement) + for func in self._cmdfinalization_hooks: + data = func(data) + # retrieve the final value of stop, ignoring any + # modifications to the statement + stop = data.stop + # postparsing_postcmd is deprecated return self.postparsing_postcmd(stop) + except Exception as ex: + self.perror(ex) def runcmds_plus_hooks(self, cmds: List[str]) -> bool: """Convenience method to run multiple commands by onecmd_plus_hooks. @@ -1780,7 +1850,7 @@ class Cmd(cmd.Cmd): pipe runs out. We can't refactor it because we need to retain backwards compatibility with the standard library version of cmd. """ - statement = self.statement_parser.parse(line) + statement = self.statement_parser.parse(self.preparse(line)) while statement.multiline_command and not statement.terminator: if not self.quit_on_sigint: try: @@ -3105,6 +3175,8 @@ Script should contain one command per line, just like command would be typed in self.cmdqueue.extend(callargs) # Always run the preloop first + for func in self._preloop_hooks: + func() self.preloop() # If transcript-based regression testing was requested, then do that instead of the main loop @@ -3123,8 +3195,134 @@ Script should contain one command per line, just like command would be typed in self._cmdloop() # Run the postloop() no matter what + for func in self._postloop_hooks: + func() self.postloop() + ### + # + # plugin related functions + # + ### + def _initialize_plugin_system(self): + """Initialize the plugin system""" + self._preloop_hooks = [] + self._postloop_hooks = [] + self._postparsing_hooks = [] + self._precmd_hooks = [] + self._postcmd_hooks = [] + self._cmdfinalization_hooks = [] + + @classmethod + def _validate_callable_param_count(cls, func: Callable, count: int): + """Ensure a function has the given number of parameters.""" + signature = inspect.signature(func) + # validate that the callable has the right number of parameters + nparam = len(signature.parameters) + if nparam != count: + raise TypeError('{} has {} positional arguments, expected {}'.format( + func.__name__, + nparam, + count, + )) + + @classmethod + def _validate_prepostloop_callable(cls, func: Callable): + """Check parameter and return types for preloop and postloop hooks.""" + cls._validate_callable_param_count(func, 0) + # make sure there is no return notation + signature = inspect.signature(func) + if signature.return_annotation is not None: + raise TypeError("{} must declare return a return type of 'None'".format( + func.__name__, + )) + + def register_preloop_hook(self, func: Callable): + """Register a function to be called at the beginning of the command loop.""" + self._validate_prepostloop_callable(func) + self._preloop_hooks.append(func) + + def register_postloop_hook(self, func: Callable): + """Register a function to be called at the end of the command loop.""" + self._validate_prepostloop_callable(func) + self._postloop_hooks.append(func) + + @classmethod + def _validate_postparsing_callable(cls, func: Callable): + """Check parameter and return types for postparsing hooks""" + cls._validate_callable_param_count(func, 1) + signature = inspect.signature(func) + _, param = list(signature.parameters.items())[0] + if param.annotation != plugin.PostparsingData: + raise TypeError("{} must have one parameter declared with type 'cmd2.plugin.PostparsingData'".format( + func.__name__ + )) + if signature.return_annotation != plugin.PostparsingData: + raise TypeError("{} must declare return a return type of 'cmd2.plugin.PostparsingData'".format( + func.__name__ + )) + + def register_postparsing_hook(self, func: Callable): + """Register a function to be called after parsing user input but before running the command""" + self._validate_postparsing_callable(func) + self._postparsing_hooks.append(func) + + @classmethod + def _validate_prepostcmd_hook(cls, func: Callable, data_type: Type): + """Check parameter and return types for pre and post command hooks.""" + signature = inspect.signature(func) + # validate that the callable has the right number of parameters + cls._validate_callable_param_count(func, 1) + # validate the parameter has the right annotation + paramname = list(signature.parameters.keys())[0] + param = signature.parameters[paramname] + if param.annotation != data_type: + raise TypeError('argument 1 of {} has incompatible type {}, expected {}'.format( + func.__name__, + param.annotation, + data_type, + )) + # validate the return value has the right annotation + if signature.return_annotation == signature.empty: + raise TypeError('{} does not have a declared return type, expected {}'.format( + func.__name__, + data_type, + )) + if signature.return_annotation != data_type: + raise TypeError('{} has incompatible return type {}, expected {}'.format( + func.__name__, + signature.return_annotation, + data_type, + )) + + def register_precmd_hook(self, func: Callable): + """Register a hook to be called before the command function.""" + self._validate_prepostcmd_hook(func, plugin.PrecommandData) + self._precmd_hooks.append(func) + + def register_postcmd_hook(self, func: Callable): + """Register a hook to be called after the command function.""" + self._validate_prepostcmd_hook(func, plugin.PostcommandData) + self._postcmd_hooks.append(func) + + @classmethod + def _validate_cmdfinalization_callable(cls, func: Callable): + """Check parameter and return types for command finalization hooks.""" + cls._validate_callable_param_count(func, 1) + signature = inspect.signature(func) + _, param = list(signature.parameters.items())[0] + if param.annotation != plugin.CommandFinalizationData: + raise TypeError("{} must have one parameter declared with type " + "'cmd2.plugin.CommandFinalizationData'".format(func.__name__)) + if signature.return_annotation != plugin.CommandFinalizationData: + raise TypeError("{} must declare return a return type of " + "'cmd2.plugin.CommandFinalizationData'".format(func.__name__)) + + def register_cmdfinalization_hook(self, func: Callable): + """Register a hook to be called after a command is completed, whether it completes successfully or not.""" + self._validate_cmdfinalization_callable(func) + self._cmdfinalization_hooks.append(func) + class History(list): """ A list of HistoryItems that knows how to respond to user requests. """ |