summaryrefslogtreecommitdiff
path: root/pelican/log.py
blob: be176ea89dd01b378d98e2cc7be9f41735290481 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import logging
from collections import defaultdict

from rich.console import Console
from rich.logging import RichHandler

__all__ = [
    'init'
]

console = Console()


class LimitFilter(logging.Filter):
    """
    Remove duplicates records, and limit the number of records in the same
    group.

    Groups are specified by the message to use when the number of records in
    the same group hit the limit.
    E.g.: log.warning(('43 is not the answer', 'More erroneous answers'))
    """

    LOGS_DEDUP_MIN_LEVEL = logging.WARNING

    _ignore = set()
    _raised_messages = set()
    _threshold = 5
    _group_count = defaultdict(int)

    def filter(self, record):
        # don't limit log messages for anything above "warning"
        if record.levelno > self.LOGS_DEDUP_MIN_LEVEL:
            return True

        # extract group
        group = record.__dict__.get('limit_msg', None)
        group_args = record.__dict__.get('limit_args', ())

        # ignore record if it was already raised
        message_key = (record.levelno, record.getMessage())
        if message_key in self._raised_messages:
            return False
        else:
            self._raised_messages.add(message_key)

        # ignore LOG_FILTER records by templates or messages
        # when "debug" isn't enabled
        logger_level = logging.getLogger().getEffectiveLevel()
        if logger_level > logging.DEBUG:
            template_key = (record.levelno, record.msg)
            message_key = (record.levelno, record.getMessage())
            if (template_key in self._ignore or message_key in self._ignore):
                return False

        # check if we went over threshold
        if group:
            key = (record.levelno, group)
            self._group_count[key] += 1
            if self._group_count[key] == self._threshold:
                record.msg = group
                record.args = group_args
            elif self._group_count[key] > self._threshold:
                return False
        return True


class LimitLogger(logging.Logger):
    """
    A logger which adds LimitFilter automatically
    """

    limit_filter = LimitFilter()

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.enable_filter()

    def disable_filter(self):
        self.removeFilter(LimitLogger.limit_filter)

    def enable_filter(self):
        self.addFilter(LimitLogger.limit_filter)


class FatalLogger(LimitLogger):
    warnings_fatal = False
    errors_fatal = False

    def warning(self, *args, **kwargs):
        super().warning(*args, **kwargs)
        if FatalLogger.warnings_fatal:
            raise RuntimeError('Warning encountered')

    def error(self, *args, **kwargs):
        super().error(*args, **kwargs)
        if FatalLogger.errors_fatal:
            raise RuntimeError('Error encountered')


logging.setLoggerClass(FatalLogger)
# force root logger to be of our preferred class
logging.getLogger().__class__ = FatalLogger


def init(level=None, fatal='', handler=RichHandler(console=console), name=None,
         logs_dedup_min_level=None):
    FatalLogger.warnings_fatal = fatal.startswith('warning')
    FatalLogger.errors_fatal = bool(fatal)

    LOG_FORMAT = "%(message)s"
    logging.basicConfig(
        level=level,
        format=LOG_FORMAT,
        datefmt="[%H:%M:%S]",
        handlers=[handler]
    )

    logger = logging.getLogger(name)

    if level:
        logger.setLevel(level)
    if logs_dedup_min_level:
        LimitFilter.LOGS_DEDUP_MIN_LEVEL = logs_dedup_min_level


def log_warnings():
    import warnings
    logging.captureWarnings(True)
    warnings.simplefilter("default", DeprecationWarning)
    init(logging.DEBUG, name='py.warnings')


if __name__ == '__main__':
    init(level=logging.DEBUG, name=__name__)

    root_logger = logging.getLogger(__name__)
    root_logger.debug('debug')
    root_logger.info('info')
    root_logger.warning('warning')
    root_logger.error('error')
    root_logger.critical('critical')