summaryrefslogtreecommitdiff
path: root/kafka/consumer/base.py
blob: 2059d92e92ca52788a8c3e4d677258ddb223d5f2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
from __future__ import absolute_import

import atexit
import logging
import numbers
from threading import Lock

import kafka.common
from kafka.common import (
    OffsetRequestPayload, OffsetCommitRequestPayload, OffsetFetchRequestPayload,
    UnknownTopicOrPartitionError, check_error, KafkaError
)

from kafka.util import ReentrantTimer


log = logging.getLogger('kafka.consumer')

AUTO_COMMIT_MSG_COUNT = 100
AUTO_COMMIT_INTERVAL = 5000

FETCH_DEFAULT_BLOCK_TIMEOUT = 1
FETCH_MAX_WAIT_TIME = 100
FETCH_MIN_BYTES = 4096
FETCH_BUFFER_SIZE_BYTES = 4096
MAX_FETCH_BUFFER_SIZE_BYTES = FETCH_BUFFER_SIZE_BYTES * 8

ITER_TIMEOUT_SECONDS = 60
NO_MESSAGES_WAIT_TIME_SECONDS = 0.1
FULL_QUEUE_WAIT_TIME_SECONDS = 0.1

MAX_BACKOFF_SECONDS = 60

class Consumer(object):
    """
    Base class to be used by other consumers. Not to be used directly

    This base class provides logic for

    * initialization and fetching metadata of partitions
    * Auto-commit logic
    * APIs for fetching pending message count

    """
    def __init__(self, client, group, topic, partitions=None, auto_commit=True,
                 auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
                 auto_commit_every_t=AUTO_COMMIT_INTERVAL):

        self.client = client
        self.topic = topic
        self.group = group
        self.client.load_metadata_for_topics(topic)
        self.offsets = {}

        if partitions is None:
            partitions = self.client.get_partition_ids_for_topic(topic)
        else:
            assert all(isinstance(x, numbers.Integral) for x in partitions)

        # Variables for handling offset commits
        self.commit_lock = Lock()
        self.commit_timer = None
        self.count_since_commit = 0
        self.auto_commit = auto_commit
        self.auto_commit_every_n = auto_commit_every_n
        self.auto_commit_every_t = auto_commit_every_t

        # Set up the auto-commit timer
        if auto_commit is True and auto_commit_every_t is not None:
            self.commit_timer = ReentrantTimer(auto_commit_every_t,
                                               self.commit)
            self.commit_timer.start()

        # Set initial offsets
        if self.group is not None:
            self.fetch_last_known_offsets(partitions)
        else:
            for partition in partitions:
                self.offsets[partition] = 0

        # Register a cleanup handler
        def cleanup(obj):
            obj.stop()
        self._cleanup_func = cleanup
        atexit.register(cleanup, self)

        self.partition_info = False     # Do not return partition info in msgs

    def provide_partition_info(self):
        """
        Indicates that partition info must be returned by the consumer
        """
        self.partition_info = True

    def fetch_last_known_offsets(self, partitions=None):
        if self.group is None:
            raise ValueError('SimpleClient.group must not be None')

        if partitions is None:
            partitions = self.client.get_partition_ids_for_topic(self.topic)

        responses = self.client.send_offset_fetch_request(
            self.group,
            [OffsetFetchRequestPayload(self.topic, p) for p in partitions],
            fail_on_error=False
        )

        for resp in responses:
            try:
                check_error(resp)
            # API spec says server wont set an error here
            # but 0.8.1.1 does actually...
            except UnknownTopicOrPartitionError:
                pass

            # -1 offset signals no commit is currently stored
            if resp.offset == -1:
                self.offsets[resp.partition] = 0

            # Otherwise we committed the stored offset
            # and need to fetch the next one
            else:
                self.offsets[resp.partition] = resp.offset

    def commit(self, partitions=None):
        """Commit stored offsets to Kafka via OffsetCommitRequest (v0)

        Keyword Arguments:
            partitions (list): list of partitions to commit, default is to commit
                all of them

        Returns: True on success, False on failure
        """

        # short circuit if nothing happened. This check is kept outside
        # to prevent un-necessarily acquiring a lock for checking the state
        if self.count_since_commit == 0:
            return

        with self.commit_lock:
            # Do this check again, just in case the state has changed
            # during the lock acquiring timeout
            if self.count_since_commit == 0:
                return

            reqs = []
            if partitions is None:  # commit all partitions
                partitions = list(self.offsets.keys())

            log.debug('Committing new offsets for %s, partitions %s',
                     self.topic, partitions)
            for partition in partitions:
                offset = self.offsets[partition]
                log.debug('Commit offset %d in SimpleConsumer: '
                          'group=%s, topic=%s, partition=%s',
                          offset, self.group, self.topic, partition)

                reqs.append(OffsetCommitRequestPayload(self.topic, partition,
                                                offset, None))

            try:
                self.client.send_offset_commit_request(self.group, reqs)
            except KafkaError as e:
                log.error('%s saving offsets: %s', e.__class__.__name__, e)
                return False
            else:
                self.count_since_commit = 0
                return True

    def _auto_commit(self):
        """
        Check if we have to commit based on number of messages and commit
        """

        # Check if we are supposed to do an auto-commit
        if not self.auto_commit or self.auto_commit_every_n is None:
            return

        if self.count_since_commit >= self.auto_commit_every_n:
            self.commit()

    def stop(self):
        if self.commit_timer is not None:
            self.commit_timer.stop()
            self.commit()

        if hasattr(self, '_cleanup_func'):
            # Remove cleanup handler now that we've stopped

            # py3 supports unregistering
            if hasattr(atexit, 'unregister'):
                atexit.unregister(self._cleanup_func) # pylint: disable=no-member

            # py2 requires removing from private attribute...
            else:

                # ValueError on list.remove() if the exithandler no longer
                # exists is fine here
                try:
                    atexit._exithandlers.remove(  # pylint: disable=no-member
                        (self._cleanup_func, (self,), {}))
                except ValueError:
                    pass

            del self._cleanup_func

    def pending(self, partitions=None):
        """
        Gets the pending message count

        Keyword Arguments:
            partitions (list): list of partitions to check for, default is to check all
        """
        if partitions is None:
            partitions = self.offsets.keys()

        total = 0
        reqs = []

        for partition in partitions:
            reqs.append(OffsetRequestPayload(self.topic, partition, -1, 1))

        resps = self.client.send_offset_request(reqs)
        for resp in resps:
            partition = resp.partition
            pending = resp.offsets[0]
            offset = self.offsets[partition]
            total += pending - offset

        return total