diff options
Diffstat (limited to 'cmd2')
-rw-r--r-- | cmd2/cmd2.py | 204 | ||||
-rw-r--r-- | cmd2/constants.py | 2 | ||||
-rw-r--r-- | cmd2/parsing.py | 115 | ||||
-rw-r--r-- | cmd2/plugin.py | 27 |
4 files changed, 298 insertions, 50 deletions
diff --git a/cmd2/cmd2.py b/cmd2/cmd2.py index 06295dfd..51f2f924 100644 --- a/cmd2/cmd2.py +++ b/cmd2/cmd2.py @@ -34,15 +34,17 @@ import cmd import collections from colorama import Fore import glob +import inspect import os import platform import re import shlex import sys -from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple, Union +from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple, Type, Union from . import constants from . import utils +from . import plugin from .argparse_completer import AutoCompleter, ACArgumentParser from .clipboard import can_clip, get_paste_buffer, write_to_paste_buffer from .parsing import StatementParser, Statement @@ -369,6 +371,10 @@ class Cmd(cmd.Cmd): except AttributeError: pass + # initialize plugin system + # needs to be done before we call __init__(0) + self._initialize_plugin_system() + # Call super class constructor super().__init__(completekey=completekey, stdin=stdin, stdout=stdout) @@ -597,12 +603,13 @@ class Cmd(cmd.Cmd): :param msg: message to print to current stdout - anything convertible to a str with '{}'.format() is OK :param end: string appended after the end of the message if not already present, default a newline - :param chop: True -> causes lines longer than the screen width to be chopped (truncated) rather than wrapped + :param chop: True -> causes lines longer than the screen width to be chopped (truncated) rather than wrapped - truncated text is still accessible by scrolling with the right & left arrow keys - chopping is ideal for displaying wide tabular data as is done in utilities like pgcli False -> causes lines longer than the screen width to wrap to the next line - wrapping is ideal when you want to avoid users having to use horizontal scrolling - WARNING: On Windows, the text always wraps regardless of what the chop argument is set to + + WARNING: On Windows, the text always wraps regardless of what the chop argument is set to """ import subprocess if msg is not None and msg != '': @@ -1699,12 +1706,35 @@ class Cmd(cmd.Cmd): :return: True if cmdloop() should exit, False otherwise """ import datetime + stop = False try: statement = self._complete_statement(line) - (stop, statement) = self.postparsing_precmd(statement) + except EmptyStatement: + return self._run_cmdfinalization_hooks(stop, None) + except ValueError as ex: + # If shlex.split failed on syntax, let user know whats going on + self.perror("Invalid syntax: {}".format(ex), traceback_war=False) + return stop + + # now that we have a statement, run it with all the hooks + try: + # call the postparsing hooks + data = plugin.PostparsingData(False, statement) + for func in self._postparsing_hooks: + data = func(data) + if data.stop: + break + # postparsing_precmd is deprecated + if not data.stop: + (data.stop, data.statement) = self.postparsing_precmd(data.statement) + # unpack the data object + statement = data.statement + stop = data.stop if stop: - return self.postparsing_postcmd(stop) + # we should not run the command, but + # we need to run the finalization hooks + raise EmptyStatement try: if self.allow_redirection: @@ -1712,23 +1742,53 @@ class Cmd(cmd.Cmd): timestart = datetime.datetime.now() if self._in_py: self._last_result = None + + # precommand hooks + data = plugin.PrecommandData(statement) + for func in self._precmd_hooks: + data = func(data) + statement = data.statement + # call precmd() for compatibility with cmd.Cmd statement = self.precmd(statement) + + # go run the command function stop = self.onecmd(statement) + + # postcommand hooks + data = plugin.PostcommandData(stop, statement) + for func in self._postcmd_hooks: + data = func(data) + # retrieve the final value of stop, ignoring any statement modification from the hooks + stop = data.stop + # call postcmd() for compatibility with cmd.Cmd stop = self.postcmd(stop, statement) + if self.timing: self.pfeedback('Elapsed: %s' % str(datetime.datetime.now() - timestart)) finally: if self.allow_redirection and self.redirecting: self._restore_output(statement) except EmptyStatement: + # don't do anything, but do allow command finalization hooks to run pass - except ValueError as ex: - # If shlex.split failed on syntax, let user know whats going on - self.perror("Invalid syntax: {}".format(ex), traceback_war=False) except Exception as ex: self.perror(ex) finally: + return self._run_cmdfinalization_hooks(stop, statement) + + def _run_cmdfinalization_hooks(self, stop: bool, statement: Optional[Statement]) -> bool: + """Run the command finalization hooks""" + try: + data = plugin.CommandFinalizationData(stop, statement) + for func in self._cmdfinalization_hooks: + data = func(data) + # retrieve the final value of stop, ignoring any + # modifications to the statement + stop = data.stop + # postparsing_postcmd is deprecated return self.postparsing_postcmd(stop) + except Exception as ex: + self.perror(ex) def runcmds_plus_hooks(self, cmds: List[str]) -> bool: """Convenience method to run multiple commands by onecmd_plus_hooks. @@ -3105,6 +3165,8 @@ Script should contain one command per line, just like command would be typed in self.cmdqueue.extend(callargs) # Always run the preloop first + for func in self._preloop_hooks: + func() self.preloop() # If transcript-based regression testing was requested, then do that instead of the main loop @@ -3123,8 +3185,134 @@ Script should contain one command per line, just like command would be typed in self._cmdloop() # Run the postloop() no matter what + for func in self._postloop_hooks: + func() self.postloop() + ### + # + # plugin related functions + # + ### + def _initialize_plugin_system(self): + """Initialize the plugin system""" + self._preloop_hooks = [] + self._postloop_hooks = [] + self._postparsing_hooks = [] + self._precmd_hooks = [] + self._postcmd_hooks = [] + self._cmdfinalization_hooks = [] + + @classmethod + def _validate_callable_param_count(cls, func: Callable, count: int): + """Ensure a function has the given number of parameters.""" + signature = inspect.signature(func) + # validate that the callable has the right number of parameters + nparam = len(signature.parameters) + if nparam != count: + raise TypeError('{} has {} positional arguments, expected {}'.format( + func.__name__, + nparam, + count, + )) + + @classmethod + def _validate_prepostloop_callable(cls, func: Callable): + """Check parameter and return types for preloop and postloop hooks.""" + cls._validate_callable_param_count(func, 0) + # make sure there is no return notation + signature = inspect.signature(func) + if signature.return_annotation is not None: + raise TypeError("{} must declare return a return type of 'None'".format( + func.__name__, + )) + + def register_preloop_hook(self, func: Callable): + """Register a function to be called at the beginning of the command loop.""" + self._validate_prepostloop_callable(func) + self._preloop_hooks.append(func) + + def register_postloop_hook(self, func: Callable): + """Register a function to be called at the end of the command loop.""" + self._validate_prepostloop_callable(func) + self._postloop_hooks.append(func) + + @classmethod + def _validate_postparsing_callable(cls, func: Callable): + """Check parameter and return types for postparsing hooks""" + cls._validate_callable_param_count(func, 1) + signature = inspect.signature(func) + _, param = list(signature.parameters.items())[0] + if param.annotation != plugin.PostparsingData: + raise TypeError("{} must have one parameter declared with type 'cmd2.plugin.PostparsingData'".format( + func.__name__ + )) + if signature.return_annotation != plugin.PostparsingData: + raise TypeError("{} must declare return a return type of 'cmd2.plugin.PostparsingData'".format( + func.__name__ + )) + + def register_postparsing_hook(self, func: Callable): + """Register a function to be called after parsing user input but before running the command""" + self._validate_postparsing_callable(func) + self._postparsing_hooks.append(func) + + @classmethod + def _validate_prepostcmd_hook(cls, func: Callable, data_type: Type): + """Check parameter and return types for pre and post command hooks.""" + signature = inspect.signature(func) + # validate that the callable has the right number of parameters + cls._validate_callable_param_count(func, 1) + # validate the parameter has the right annotation + paramname = list(signature.parameters.keys())[0] + param = signature.parameters[paramname] + if param.annotation != data_type: + raise TypeError('argument 1 of {} has incompatible type {}, expected {}'.format( + func.__name__, + param.annotation, + data_type, + )) + # validate the return value has the right annotation + if signature.return_annotation == signature.empty: + raise TypeError('{} does not have a declared return type, expected {}'.format( + func.__name__, + data_type, + )) + if signature.return_annotation != data_type: + raise TypeError('{} has incompatible return type {}, expected {}'.format( + func.__name__, + signature.return_annotation, + data_type, + )) + + def register_precmd_hook(self, func: Callable): + """Register a hook to be called before the command function.""" + self._validate_prepostcmd_hook(func, plugin.PrecommandData) + self._precmd_hooks.append(func) + + def register_postcmd_hook(self, func: Callable): + """Register a hook to be called after the command function.""" + self._validate_prepostcmd_hook(func, plugin.PostcommandData) + self._postcmd_hooks.append(func) + + @classmethod + def _validate_cmdfinalization_callable(cls, func: Callable): + """Check parameter and return types for command finalization hooks.""" + cls._validate_callable_param_count(func, 1) + signature = inspect.signature(func) + _, param = list(signature.parameters.items())[0] + if param.annotation != plugin.CommandFinalizationData: + raise TypeError("{} must have one parameter declared with type " + "'cmd2.plugin.CommandFinalizationData'".format(func.__name__)) + if signature.return_annotation != plugin.CommandFinalizationData: + raise TypeError("{} must declare return a return type of " + "'cmd2.plugin.CommandFinalizationData'".format(func.__name__)) + + def register_cmdfinalization_hook(self, func: Callable): + """Register a hook to be called after a command is completed, whether it completes successfully or not.""" + self._validate_cmdfinalization_callable(func) + self._cmdfinalization_hooks.append(func) + class History(list): """ A list of HistoryItems that knows how to respond to user requests. """ diff --git a/cmd2/constants.py b/cmd2/constants.py index b829000f..d3e8a125 100644 --- a/cmd2/constants.py +++ b/cmd2/constants.py @@ -15,3 +15,5 @@ REDIRECTION_TOKENS = [REDIRECTION_PIPE, REDIRECTION_OUTPUT, REDIRECTION_APPEND] # Regular expression to match ANSI escape codes ANSI_ESCAPE_RE = re.compile(r'\x1b[^m]*m') + +LINE_FEED = '\n' diff --git a/cmd2/parsing.py b/cmd2/parsing.py index 84c7e27a..0d480f0f 100644 --- a/cmd2/parsing.py +++ b/cmd2/parsing.py @@ -5,13 +5,11 @@ import os import re import shlex -from typing import List, Tuple +from typing import List, Tuple, Dict from . import constants from . import utils -LINE_FEED = '\n' - class Statement(str): """String subclass with additional attributes to store the results of parsing. @@ -21,6 +19,10 @@ class Statement(str): need a place to capture the additional output of the command parsing, so we add our own attributes to this subclass. + Instances of this class should not be created by anything other than the + `StatementParser.parse()` method, nor should any of the attributes be modified + once the object is created. + The string portion of the class contains the arguments, but not the command, nor the output redirection clauses. @@ -54,18 +56,39 @@ class Statement(str): :type output_to: str or None """ - def __init__(self, obj): - super().__init__() - self.raw = str(obj) - self.command = None - self.multiline_command = None - self.args = None - self.argv = None - self.terminator = None - self.suffix = None - self.pipe_to = None - self.output = None - self.output_to = None + def __new__(cls, + obj: object, + *, + raw: str = None, + command: str = None, + args: str = None, + argv: List[str] = None, + multiline_command: str = None, + terminator: str = None, + suffix: str = None, + pipe_to: str = None, + output: str = None, + output_to: str = None + ): + """Create a new instance of Statement + + We must override __new__ because we are subclassing `str` which is + immutable. + """ + stmt = str.__new__(cls, obj) + object.__setattr__(stmt, "raw", raw) + object.__setattr__(stmt, "command", command) + object.__setattr__(stmt, "args", args) + if argv is None: + argv = [] + object.__setattr__(stmt, "argv", argv) + object.__setattr__(stmt, "multiline_command", multiline_command) + object.__setattr__(stmt, "terminator", terminator) + object.__setattr__(stmt, "suffix", suffix) + object.__setattr__(stmt, "pipe_to", pipe_to) + object.__setattr__(stmt, "output", output) + object.__setattr__(stmt, "output_to", output_to) + return stmt @property def command_and_args(self): @@ -82,6 +105,14 @@ class Statement(str): rtn = None return rtn + def __setattr__(self, name, value): + """Statement instances should feel immutable; raise ValueError""" + raise ValueError + + def __delattr__(self, name): + """Statement instances should feel immutable; raise ValueError""" + raise ValueError + class StatementParser: """Parse raw text into command components. @@ -90,11 +121,11 @@ class StatementParser: """ def __init__( self, - allow_redirection=True, - terminators=None, - multiline_commands=None, - aliases=None, - shortcuts=None, + allow_redirection: bool = True, + terminators: List[str] = None, + multiline_commands: List[str] = None, + aliases: Dict[str, str] = None, + shortcuts: List[Tuple[str, str]] = None, ): self.allow_redirection = allow_redirection if terminators is None: @@ -228,7 +259,7 @@ class StatementParser: tokens = self._split_on_punctuation(list(lexer)) return tokens - def parse(self, rawinput: str) -> Statement: + def parse(self, line: str) -> Statement: """Tokenize the input and parse it into a Statement object, stripping comments, expanding aliases and shortcuts, and extracting output redirection directives. @@ -240,15 +271,15 @@ class StatementParser: # we have to do this before we tokenize because tokenizing # destroys all unquoted whitespace in the input terminator = None - if rawinput[-1:] == LINE_FEED: - terminator = LINE_FEED + if line[-1:] == constants.LINE_FEED: + terminator = constants.LINE_FEED command = None args = None argv = None # lex the input into a list of tokens - tokens = self.tokenize(rawinput) + tokens = self.tokenize(line) # of the valid terminators, find the first one to occur in the input terminator_pos = len(tokens) + 1 @@ -269,7 +300,7 @@ class StatementParser: break if terminator: - if terminator == LINE_FEED: + if terminator == constants.LINE_FEED: terminator_pos = len(tokens)+1 # everything before the first terminator is the command and the args @@ -360,19 +391,18 @@ class StatementParser: # build the statement # string representation of args must be an empty string instead of # None for compatibility with standard library cmd - statement = Statement('' if args is None else args) - statement.raw = rawinput - statement.command = command - # if there are no args we will use None since we don't have to worry - # about compatibility with standard library cmd - statement.args = args - statement.argv = list(map(lambda x: utils.strip_quotes(x), argv)) - statement.terminator = terminator - statement.output = output - statement.output_to = output_to - statement.pipe_to = pipe_to - statement.suffix = suffix - statement.multiline_command = multiline_command + statement = Statement('' if args is None else args, + raw=line, + command=command, + args=args, + argv=list(map(lambda x: utils.strip_quotes(x), argv)), + multiline_command=multiline_command, + terminator=terminator, + suffix=suffix, + pipe_to=pipe_to, + output=output, + output_to=output_to, + ) return statement def parse_command_only(self, rawinput: str) -> Statement: @@ -422,10 +452,11 @@ class StatementParser: # build the statement # string representation of args must be an empty string instead of # None for compatibility with standard library cmd - statement = Statement('' if args is None else args) - statement.raw = rawinput - statement.command = command - statement.args = args + statement = Statement('' if args is None else args, + raw=rawinput, + command=command, + args=args, + ) return statement def _expand(self, line: str) -> str: diff --git a/cmd2/plugin.py b/cmd2/plugin.py new file mode 100644 index 00000000..dc9ec297 --- /dev/null +++ b/cmd2/plugin.py @@ -0,0 +1,27 @@ +# +# coding=utf-8 +"""Classes for the cmd2 plugin system""" +import attr + + +@attr.s +class PostparsingData: + stop = attr.ib() + statement = attr.ib() + + +@attr.s +class PrecommandData: + statement = attr.ib() + + +@attr.s +class PostcommandData: + stop = attr.ib() + statement = attr.ib() + + +@attr.s +class CommandFinalizationData: + stop = attr.ib() + statement = attr.ib() |