summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2014-09-15 22:36:12 -0700
committerDana Powers <dana.powers@rd.io>2014-12-15 12:42:55 -0800
commit07ff623392d1398f801c95d9af3e0a388b049068 (patch)
treee0e2d70db2a44ab578f80e61e2f29c1d16bc0a66
parent7caf9bef491b368fd1dec4430c38332fec3dc1b6 (diff)
downloadkafka-python-07ff623392d1398f801c95d9af3e0a388b049068.tar.gz
Move auto-commit checks to task_done; add support for auto_commit_interval_messages
-rw-r--r--kafka/consumer/new.py38
1 files changed, 27 insertions, 11 deletions
diff --git a/kafka/consumer/new.py b/kafka/consumer/new.py
index 8f243cd..ad45387 100644
--- a/kafka/consumer/new.py
+++ b/kafka/consumer/new.py
@@ -31,6 +31,7 @@ DEFAULT_CONSUMER_CONFIG = {
'deserializer_class': lambda msg: msg,
'auto_commit_enable': False,
'auto_commit_interval_ms': 60 * 1000,
+ 'auto_commit_interval_messages': None,
'consumer_timeout_ms': -1,
# Currently unused
@@ -139,6 +140,7 @@ class KafkaConsumer(object):
deserializer_class=Event.from_bytes,
auto_commit_enable=False,
auto_commit_interval_ms=60 * 1000,
+ auto_commit_interval_messages=None,
consumer_timeout_ms=-1
Configuration parameters are described in more detail at
@@ -159,7 +161,7 @@ class KafkaConsumer(object):
# Check auto-commit configuration
if self._config['auto_commit_enable']:
logger.info("Configuring consumer to auto-commit offsets")
- self._set_next_auto_commit_time()
+ self._reset_auto_commit()
if self._config['metadata_broker_list'] is None:
raise KafkaConfigurationError('metadata_broker_list required to '
@@ -298,10 +300,6 @@ class KafkaConsumer(object):
self._set_consumer_timeout_start()
while True:
- # Check for auto-commit
- if self._should_auto_commit():
- self.commit()
-
try:
return self._get_message_iterator().next()
@@ -466,6 +464,13 @@ class KafkaConsumer(object):
self._offsets.task_done[topic_partition] = offset
+ # Check for auto-commit
+ if self._config['auto_commit_enable']:
+ self._incr_auto_commit_message_count()
+
+ if self._should_auto_commit():
+ self.commit()
+
def commit(self):
"""
Store consumed message offsets (marked via task_done())
@@ -515,7 +520,7 @@ class KafkaConsumer(object):
self._offsets.commit[topic_partition] = (task_done + 1)
if self._config['auto_commit_enable']:
- self._set_next_auto_commit_time()
+ self._reset_auto_commit()
return True
@@ -637,13 +642,24 @@ class KafkaConsumer(object):
if not self._config['auto_commit_enable']:
return False
- if not self._next_commit:
- return False
+ if self._config['auto_commit_interval_ms'] > 0:
+ if time.time() >= self._next_commit_time:
+ return True
+
+ if self._config['auto_commit_interval_messages'] > 0:
+ if self._uncommitted_message_count >= self._config['auto_commit_interval_messages']:
+ return True
+
+ return False
- return (time.time() >= self._next_commit)
+ def _reset_auto_commit(self):
+ self._uncommitted_message_count = 0
+ self._next_commit_time = None
+ if self._config['auto_commit_interval_ms'] > 0:
+ self._next_commit_time = time.time() + (self._config['auto_commit_interval_ms'] / 1000.0)
- def _set_next_auto_commit_time(self):
- self._next_commit = time.time() + (self._config['auto_commit_interval_ms'] / 1000.0)
+ def _incr_auto_commit_message_count(self, n=1):
+ self._uncommitted_message_count += n
#
# Message iterator private methods