diff options
-rw-r--r-- | kafka/consumer/group.py | 20 |
1 files changed, 13 insertions, 7 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 7b59567..77b0b96 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -303,7 +303,7 @@ class KafkaConsumer(six.Iterator): 'sasl_kerberos_service_name': 'kafka', 'sasl_kerberos_domain_name': None, 'sasl_oauth_token_provider': None, - 'legacy_iterator': True, # experimental feature + 'legacy_iterator': False, # enable to revert to < 1.4.7 iterator } DEFAULT_SESSION_TIMEOUT_MS_0_9 = 30000 @@ -598,7 +598,7 @@ class KafkaConsumer(six.Iterator): partitions = cluster.partitions_for_topic(topic) return partitions - def poll(self, timeout_ms=0, max_records=None): + def poll(self, timeout_ms=0, max_records=None, update_offsets=True): """Fetch data from assigned topics / partitions. Records are fetched and returned in batches by topic-partition. @@ -622,6 +622,12 @@ class KafkaConsumer(six.Iterator): dict: Topic to list of records since the last fetch for the subscribed list of topics and partitions. """ + # Note: update_offsets is an internal-use only argument. It is used to + # support the python iterator interface, and which wraps consumer.poll() + # and requires that the partition offsets tracked by the fetcher are not + # updated until the iterator returns each record to the user. As such, + # the argument is not documented and should not be relied on by library + # users to not break in the future. assert timeout_ms >= 0, 'Timeout must not be negative' if max_records is None: max_records = self.config['max_poll_records'] @@ -632,7 +638,7 @@ class KafkaConsumer(six.Iterator): start = time.time() remaining = timeout_ms while True: - records = self._poll_once(remaining, max_records) + records = self._poll_once(remaining, max_records, update_offsets=update_offsets) if records: return records @@ -642,7 +648,7 @@ class KafkaConsumer(six.Iterator): if remaining <= 0: return {} - def _poll_once(self, timeout_ms, max_records): + def _poll_once(self, timeout_ms, max_records, update_offsets=True): """Do one round of polling. In addition to checking for new data, this does any needed heart-beating, auto-commits, and offset updates. @@ -661,7 +667,7 @@ class KafkaConsumer(six.Iterator): # If data is available already, e.g. from a previous network client # poll() call to commit, then just return it immediately - records, partial = self._fetcher.fetched_records(max_records, update_offsets=bool(self._iterator)) + records, partial = self._fetcher.fetched_records(max_records, update_offsets=update_offsets) if records: # Before returning the fetched records, we can send off the # next round of fetches and avoid block waiting for their @@ -681,7 +687,7 @@ class KafkaConsumer(six.Iterator): if self._coordinator.need_rejoin(): return {} - records, _ = self._fetcher.fetched_records(max_records, update_offsets=bool(self._iterator)) + records, _ = self._fetcher.fetched_records(max_records, update_offsets=update_offsets) return records def position(self, partition): @@ -1089,7 +1095,7 @@ class KafkaConsumer(six.Iterator): def _message_generator_v2(self): timeout_ms = 1000 * (self._consumer_timeout - time.time()) - record_map = self.poll(timeout_ms=timeout_ms) + record_map = self.poll(timeout_ms=timeout_ms, update_offsets=False) for tp, records in six.iteritems(record_map): # Generators are stateful, and it is possible that the tp / records # here may become stale during iteration -- i.e., we seek to a |