diff options
-rw-r--r-- | kafka/consumer/new.py | 38 |
1 files changed, 27 insertions, 11 deletions
diff --git a/kafka/consumer/new.py b/kafka/consumer/new.py index 8f243cd..ad45387 100644 --- a/kafka/consumer/new.py +++ b/kafka/consumer/new.py @@ -31,6 +31,7 @@ DEFAULT_CONSUMER_CONFIG = { 'deserializer_class': lambda msg: msg, 'auto_commit_enable': False, 'auto_commit_interval_ms': 60 * 1000, + 'auto_commit_interval_messages': None, 'consumer_timeout_ms': -1, # Currently unused @@ -139,6 +140,7 @@ class KafkaConsumer(object): deserializer_class=Event.from_bytes, auto_commit_enable=False, auto_commit_interval_ms=60 * 1000, + auto_commit_interval_messages=None, consumer_timeout_ms=-1 Configuration parameters are described in more detail at @@ -159,7 +161,7 @@ class KafkaConsumer(object): # Check auto-commit configuration if self._config['auto_commit_enable']: logger.info("Configuring consumer to auto-commit offsets") - self._set_next_auto_commit_time() + self._reset_auto_commit() if self._config['metadata_broker_list'] is None: raise KafkaConfigurationError('metadata_broker_list required to ' @@ -298,10 +300,6 @@ class KafkaConsumer(object): self._set_consumer_timeout_start() while True: - # Check for auto-commit - if self._should_auto_commit(): - self.commit() - try: return self._get_message_iterator().next() @@ -466,6 +464,13 @@ class KafkaConsumer(object): self._offsets.task_done[topic_partition] = offset + # Check for auto-commit + if self._config['auto_commit_enable']: + self._incr_auto_commit_message_count() + + if self._should_auto_commit(): + self.commit() + def commit(self): """ Store consumed message offsets (marked via task_done()) @@ -515,7 +520,7 @@ class KafkaConsumer(object): self._offsets.commit[topic_partition] = (task_done + 1) if self._config['auto_commit_enable']: - self._set_next_auto_commit_time() + self._reset_auto_commit() return True @@ -637,13 +642,24 @@ class KafkaConsumer(object): if not self._config['auto_commit_enable']: return False - if not self._next_commit: - return False + if self._config['auto_commit_interval_ms'] > 0: + if time.time() >= self._next_commit_time: + return True + + if self._config['auto_commit_interval_messages'] > 0: + if self._uncommitted_message_count >= self._config['auto_commit_interval_messages']: + return True + + return False - return (time.time() >= self._next_commit) + def _reset_auto_commit(self): + self._uncommitted_message_count = 0 + self._next_commit_time = None + if self._config['auto_commit_interval_ms'] > 0: + self._next_commit_time = time.time() + (self._config['auto_commit_interval_ms'] / 1000.0) - def _set_next_auto_commit_time(self): - self._next_commit = time.time() + (self._config['auto_commit_interval_ms'] / 1000.0) + def _incr_auto_commit_message_count(self, n=1): + self._uncommitted_message_count += n # # Message iterator private methods |