summaryrefslogtreecommitdiff
path: root/kafka/consumer/base.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer/base.py')
-rw-r--r--kafka/consumer/base.py169
1 files changed, 169 insertions, 0 deletions
diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py
new file mode 100644
index 0000000..506b405
--- /dev/null
+++ b/kafka/consumer/base.py
@@ -0,0 +1,169 @@
+from __future__ import absolute_import
+
+import logging
+import numbers
+from threading import Lock
+
+import kafka.common
+from kafka.common import (
+ OffsetRequest, OffsetCommitRequest, OffsetFetchRequest,
+ UnknownTopicOrPartitionError
+)
+
+from kafka.util import ReentrantTimer
+
+log = logging.getLogger("kafka")
+
+AUTO_COMMIT_MSG_COUNT = 100
+AUTO_COMMIT_INTERVAL = 5000
+
+FETCH_DEFAULT_BLOCK_TIMEOUT = 1
+FETCH_MAX_WAIT_TIME = 100
+FETCH_MIN_BYTES = 4096
+FETCH_BUFFER_SIZE_BYTES = 4096
+MAX_FETCH_BUFFER_SIZE_BYTES = FETCH_BUFFER_SIZE_BYTES * 8
+
+ITER_TIMEOUT_SECONDS = 60
+NO_MESSAGES_WAIT_TIME_SECONDS = 0.1
+
+
+class Consumer(object):
+ """
+ Base class to be used by other consumers. Not to be used directly
+
+ This base class provides logic for
+ * initialization and fetching metadata of partitions
+ * Auto-commit logic
+ * APIs for fetching pending message count
+ """
+ def __init__(self, client, group, topic, partitions=None, auto_commit=True,
+ auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
+ auto_commit_every_t=AUTO_COMMIT_INTERVAL):
+
+ self.client = client
+ self.topic = topic
+ self.group = group
+ self.client.load_metadata_for_topics(topic)
+ self.offsets = {}
+
+ if not partitions:
+ partitions = self.client.get_partition_ids_for_topic(topic)
+ else:
+ assert all(isinstance(x, numbers.Integral) for x in partitions)
+
+ # Variables for handling offset commits
+ self.commit_lock = Lock()
+ self.commit_timer = None
+ self.count_since_commit = 0
+ self.auto_commit = auto_commit
+ self.auto_commit_every_n = auto_commit_every_n
+ self.auto_commit_every_t = auto_commit_every_t
+
+ # Set up the auto-commit timer
+ if auto_commit is True and auto_commit_every_t is not None:
+ self.commit_timer = ReentrantTimer(auto_commit_every_t,
+ self.commit)
+ self.commit_timer.start()
+
+ if auto_commit:
+ self.fetch_last_known_offsets(partitions)
+ else:
+ for partition in partitions:
+ self.offsets[partition] = 0
+
+ def fetch_last_known_offsets(self, partitions=None):
+ if not partitions:
+ partitions = self.client.get_partition_ids_for_topic(self.topic)
+
+ def get_or_init_offset(resp):
+ try:
+ kafka.common.check_error(resp)
+ return resp.offset
+ except UnknownTopicOrPartitionError:
+ return 0
+
+ for partition in partitions:
+ req = OffsetFetchRequest(self.topic, partition)
+ (resp,) = self.client.send_offset_fetch_request(self.group, [req],
+ fail_on_error=False)
+ self.offsets[partition] = get_or_init_offset(resp)
+ self.fetch_offsets = self.offsets.copy()
+
+ def commit(self, partitions=None):
+ """
+ Commit offsets for this consumer
+
+ partitions: list of partitions to commit, default is to commit
+ all of them
+ """
+
+ # short circuit if nothing happened. This check is kept outside
+ # to prevent un-necessarily acquiring a lock for checking the state
+ if self.count_since_commit == 0:
+ return
+
+ with self.commit_lock:
+ # Do this check again, just in case the state has changed
+ # during the lock acquiring timeout
+ if self.count_since_commit == 0:
+ return
+
+ reqs = []
+ if not partitions: # commit all partitions
+ partitions = self.offsets.keys()
+
+ for partition in partitions:
+ offset = self.offsets[partition]
+ log.debug("Commit offset %d in SimpleConsumer: "
+ "group=%s, topic=%s, partition=%s" %
+ (offset, self.group, self.topic, partition))
+
+ reqs.append(OffsetCommitRequest(self.topic, partition,
+ offset, None))
+
+ resps = self.client.send_offset_commit_request(self.group, reqs)
+ for resp in resps:
+ kafka.common.check_error(resp)
+
+ self.count_since_commit = 0
+
+ def _auto_commit(self):
+ """
+ Check if we have to commit based on number of messages and commit
+ """
+
+ # Check if we are supposed to do an auto-commit
+ if not self.auto_commit or self.auto_commit_every_n is None:
+ return
+
+ if self.count_since_commit >= self.auto_commit_every_n:
+ self.commit()
+
+ def stop(self):
+ if self.commit_timer is not None:
+ self.commit_timer.stop()
+ self.commit()
+
+ def pending(self, partitions=None):
+ """
+ Gets the pending message count
+
+ partitions: list of partitions to check for, default is to check all
+ """
+ if not partitions:
+ partitions = self.offsets.keys()
+
+ total = 0
+ reqs = []
+
+ for partition in partitions:
+ reqs.append(OffsetRequest(self.topic, partition, -1, 1))
+
+ resps = self.client.send_offset_request(reqs)
+ for resp in resps:
+ partition = resp.partition
+ pending = resp.offsets[0]
+ offset = self.offsets[partition]
+ total += pending - offset - (1 if offset > 0 else 0)
+
+ return total